首页 > 解决方案 > 从 kafka (MSK) AWS 向 elasticsearch 发送数据

问题描述

您好,我目前正在尝试通过融合连接器将数据从 kafka(MSK)(AWS) 发送到 elasticsearch。我有一个来自 sql 数据库的数据流。当我运行 confluent 连接器时,它会运行一段时间将数据发送到 elasticsearch,但会在一两分钟后停止,并出现以下错误。

sending LeaveGroup request to coordinator b-2.*****.amazonaws.com:9092 due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

有很多数据需要流式传输,并且想知道是否有更好的方法或特定设置来一次发送多个批次,或者修复此错误的方法可能会有所帮助。我的弹性搜索服务器太小了吗?

任何其他信息请告诉我谢谢。

标签: amazon-web-serviceselasticsearchapache-kafkaapache-kafka-connectaws-msk

解决方案


正如错误消息所暗示的那样,向 Elasticsearch 写入消息(我假设这是一个接收器连接器)似乎花费了太多时间,因此 - 在这种情况下是连接器的消费者客户端由于以下原因而被迫离开消费者组超时。要隔离这一点,您应该增加属性的值max.poll.interval.ms并查看问题是否仍然存在。如果是这样,这可能意味着 Elasticsearch 中的索引耗时太长,这可能表明您的 Elasticsearch 集群过于繁忙或规模过小。

  • 或者,您可以验证indices.indexing.index_time_in_millisElasticsearch 上的指标,以仔细检查索引时间是否确实花费了太长时间。

检查 Kafka Connect 集群的运行状况也很重要。如果由于某种原因不稳定,来自 Elasticsearch 连接器的任务将重新平衡,这也迫使消费者客户端离开消费者组。

这是一个消除选项的游戏,直到你找到问题的根本原因¯_(ツ)_/¯


推荐阅读