amazon-web-services - 从 kafka (MSK) AWS 向 elasticsearch 发送数据
问题描述
您好,我目前正在尝试通过融合连接器将数据从 kafka(MSK)(AWS) 发送到 elasticsearch。我有一个来自 sql 数据库的数据流。当我运行 confluent 连接器时,它会运行一段时间将数据发送到 elasticsearch,但会在一两分钟后停止,并出现以下错误。
sending LeaveGroup request to coordinator b-2.*****.amazonaws.com:9092 due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
有很多数据需要流式传输,并且想知道是否有更好的方法或特定设置来一次发送多个批次,或者修复此错误的方法可能会有所帮助。我的弹性搜索服务器太小了吗?
任何其他信息请告诉我谢谢。
解决方案
正如错误消息所暗示的那样,向 Elasticsearch 写入消息(我假设这是一个接收器连接器)似乎花费了太多时间,因此 - 在这种情况下是连接器的消费者客户端由于以下原因而被迫离开消费者组超时。要隔离这一点,您应该增加属性的值max.poll.interval.ms
并查看问题是否仍然存在。如果是这样,这可能意味着 Elasticsearch 中的索引耗时太长,这可能表明您的 Elasticsearch 集群过于繁忙或规模过小。
- 或者,您可以验证
indices.indexing.index_time_in_millis
Elasticsearch 上的指标,以仔细检查索引时间是否确实花费了太长时间。
检查 Kafka Connect 集群的运行状况也很重要。如果由于某种原因不稳定,来自 Elasticsearch 连接器的任务将重新平衡,这也迫使消费者客户端离开消费者组。
这是一个消除选项的游戏,直到你找到问题的根本原因¯_(ツ)_/¯
推荐阅读
- sql-server - 如何将数据导入 CTE 并进行更新
- python - Python从满足特定条件的列表中查找数字
- javascript - 问题 - 如何在 Puppeteer 中传递“RegExp Object ??
- actions-on-google - 如何向用户发送 Google 助理操作的推送通知?
- r - R 如果滞后条件匹配,则用 NA 填充新列
- mysql - 错误:无法添加外键约束-
- java - Android DAO 添加查询“'@Query 不适用于字段”
- flutter - 是否可以在颤振引擎中的颤振引擎删除本机闪屏之前运行初始化代码
- ios - 识别哪个 UIImageView 被点击并在数组中获取它的索引
- elasticsearch - min_score 排除分数较高的文档