首页 > 解决方案 > KAFKA SINK CONNECT: WARN 批量请求 167 失败。重试请求

问题描述

我有一个数据处理,其中输入主题、Kafka 流和输出主题连接到 Elasticsearch 的接收器连接。

在这个操作开始时,数据摄取是令人满意的,但是当进程运行了更长的时间时,从连接器的 Elasticsearch 摄取开始失败。我一直在检查所有工人日志,我收到以下消息,我怀疑这可能是原因:

    [2021-10-21 11:22:14,246] WARN Bulk request 168 failed. Retrying request. (io.confluent.connect.elasticsearch.ElasticsearchClient:335)
java.net.SocketTimeoutException: 3,000 milliseconds timeout on connection http-outgoing-643 [ACTIVE]
    at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387)
    at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
    at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
    at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
    at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492)
    at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
    at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
    at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
    at java.base/java.lang.Thread.run(Thread.java:829)

[2021-10-21 11:27:23,858] INFO [Consumer clientId=connector-consumer-ElasticsearchSinkConnector-topic01-0, groupId=connect-ElasticsearchSinkConnector-topic01] Member connector-consumer-ElasticsearchSinkConnector-topic01-0-41b68d34-0f00-4887-b54e-79561fffb5e5 sending LeaveGroup request to coordinator kafka1:9092 (id: 2147483646 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:1042)

我已尝试更改连接器配置,但我不明白修复此问题的主要原因。

连接器配置:

connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
connection.password=xxxxx
topics=output_topic
value.converter.schemas.enable=false
connection.username=user-x
name=ElasticsearchSinkConnector-output_topic
connection.url=xxxxxxx
value.converter=org.apache.kafka.connect.json.JsonConverter
key.ignore=true
key.converter=org.apache.kafka.connect.storage.StringConverter
schema.ignore=true

批量警告是否可能导致数据丢失?任何帮助,将不胜感激

标签: elasticsearchapache-kafkaapache-kafka-connectconfluent-platform

解决方案


你可以尝试添加

"flush.timeout.ms": 30000

推荐阅读