elasticsearch - KAFKA SINK CONNECT: WARN 批量请求 167 失败。重试请求
问题描述
我有一个数据处理,其中输入主题、Kafka 流和输出主题连接到 Elasticsearch 的接收器连接。
在这个操作开始时,数据摄取是令人满意的,但是当进程运行了更长的时间时,从连接器的 Elasticsearch 摄取开始失败。我一直在检查所有工人日志,我收到以下消息,我怀疑这可能是原因:
[2021-10-21 11:22:14,246] WARN Bulk request 168 failed. Retrying request. (io.confluent.connect.elasticsearch.ElasticsearchClient:335)
java.net.SocketTimeoutException: 3,000 milliseconds timeout on connection http-outgoing-643 [ACTIVE]
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387)
at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492)
at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
at java.base/java.lang.Thread.run(Thread.java:829)
[2021-10-21 11:27:23,858] INFO [Consumer clientId=connector-consumer-ElasticsearchSinkConnector-topic01-0, groupId=connect-ElasticsearchSinkConnector-topic01] Member connector-consumer-ElasticsearchSinkConnector-topic01-0-41b68d34-0f00-4887-b54e-79561fffb5e5 sending LeaveGroup request to coordinator kafka1:9092 (id: 2147483646 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:1042)
我已尝试更改连接器配置,但我不明白修复此问题的主要原因。
连接器配置:
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
connection.password=xxxxx
topics=output_topic
value.converter.schemas.enable=false
connection.username=user-x
name=ElasticsearchSinkConnector-output_topic
connection.url=xxxxxxx
value.converter=org.apache.kafka.connect.json.JsonConverter
key.ignore=true
key.converter=org.apache.kafka.connect.storage.StringConverter
schema.ignore=true
批量警告是否可能导致数据丢失?任何帮助,将不胜感激
解决方案
你可以尝试添加
"flush.timeout.ms": 30000
推荐阅读
- ios - UITableView.reload 将旧单元格留在 TableView 中但隐藏
- python - 如何在python中有效地将带有条件的操作(如if)应用于大型numpy数组?
- angular - Ngrx:是否可以部分使用异步管道?
- javascript - 在 Javascript 中生成对象数组的所有可能变体的列表
- excel - 检查范围中的单元格是否包含日期
- sql - SQL Server 表扫描时间是否取决于查询?
- python - 如何从 SpeechID 创建 .wav 文件,例如 3D5FFCDA-4D8B-4F2E-AE95-D86B560F4F85
- c++ - 类声明自己 (*this) 私有以避免竞争条件/放弃 gcc 中的 threadprivate
- python - 使用 for 循环创建数组时没有 else 分支的 if 语句
- node.js - TS2585:“承诺”仅指一种类型,但在此处用作值