amazon-s3 - Kafka 连接器记录写入器因缺少要分配的内存而卡在 S3OutputStream 中,但在几个小时内保持空闲状态并没有失败
问题描述
我的行为我不完全知道如何修改。我正在测试 s3 Kafka 同步连接器,但我的主题中的数据很少。
目前,我可以通过使用 Kafka 管理器看到主题中有数据,但我的连接器读取数据并且从不移动偏移量,也从不将其推送到 Kafka。在其他主题中这有效,但在此特定主题中则无效。我认为与超时有关,但我找不到要设置的正确配置属性,以便刷新更快一些。
这是我的配置:
curl -X PUT -s -o /dev/null -H ""Content-Type:application/json""
http://localhost:$$CONNECT_REST_PORT/connectors/s3_connector_doc_cmg/config
\
-d '{
""connector.class"": ""io.confluent.connect.s3.S3SinkConnector"",
""storage.class"": ""io.confluent.connect.s3.storage.S3Storage"",
""s3.region"": ""us-east-1"",
""s3.bucket.name"": ""confluent-pipeline"",
""topics.dir"": ""topics"",
""topics"": ""com.acp.bde.doc_cmg"",
""flush.size"": ""25"",
""rotate.interval.ms"": ""5000"",
""auto.register.schemas"": ""false"",
""tasks.max"": ""1"",
""s3.part.size"": ""5242880"",
""timezone"": ""UTC"",
""parquet.codec"": ""snappy"",
""offset.flush.interval.ms"": ""5000"",
""offset.flush.timeout.ms"": ""1000"",
""s3.credentials.provider.class"": ""com.amazonaws.auth.DefaultAWSCredentialsProviderChain"",
""format.class"": ""io.confluent.connect.s3.format.avro.AvroFormat"",
""value.converter"": ""com.insight.connect.protobuf.ProtobufConverter"",
""key.converter"": ""org.apache.kafka.connect.storage.StringConverter"",
""partitioner.class"": ""io.confluent.connect.storage.partitioner.DailyPartitioner"",
""locale"": ""de-CH"",
""timezone"": ""Europe/Zurich"",
""store.url"": ""http://minio-server-svc:9000/""
}'"
这就是我在日志中看到的:
[2020-10-23 10:35:47,594] INFO Opening record writer for: topics/com.acp.bde.doc_cmg/year=2020/month=10/day=23/com.acp.bde.doc_cmg+1+0000000000.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
[2020-10-23 10:35:48,017] INFO Opening record writer for: topics/com.acp.bde.doc_cmg/year=2020/month=10/day=23/com.acp.bde.doc_cmg+3+0000000000.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
[2020-10-23 10:35:48,075] INFO Opening record writer for: topics/com.acp.bde.doc_cmg/year=2020/month=10/day=23/com.acp.bde.doc_cmg+2+0000000000.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
[2020-10-23 11:35:37,989] INFO [Worker clientId=connect-1, groupId=kafka-connect-01] Session key updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
所以它们已经开放了将近 1 小时,并且没有真正发生任何事情,我想知道我的配置是否完全错误,或者我需要一些属性和配置,因此这种数据推送速度要快一些。
更新:我仍然没有正确修复这个问题,但实际上它似乎是内存不足的问题。
程序只是卡在这一行 this.buffer = ByteBuffer.allocate(this.partSize);
困扰我的部分是它根本没有抱怨,只是卡在那里。它不应该因内存不足而崩溃吗?还是不应该更快地释放内存?它几乎可以在没有任何反馈的情况下停留在该呼叫中超过 3 或 4 小时。
我仍然认为我的配置可能有问题,但我不知道应该看什么或在哪里看。
解决方案
您的分区程序是基于时间的。因此,当rotate.schedule.interval.ms参数不存在时,这可能会发生。看看以下主题https://stackoverflow.com/a/51160834/1551246
推荐阅读
- python - 使用 Dataframe 列中的 DateTime 类型更新 SQL 列 Type Text
- c# - 将继承接口的类与接口引用进行比较
- xml - 类型不匹配:预期地图但在 apoc 中是 List{map}
- javascript - 打字稿中JavaScript符号类型的接口或类型?
- javascript - 将数据从脚本发送到脚本
- hadoop - HBase 表二级索引问题
- python - 预期的浏览器二进制位置,但无法在默认位置找到二进制文件,没有使用 GeckoDriver 提供的“moz:firefoxOptions.binary”功能
- python - 如何在 Flask render_template 中为变量渲染 html
- python - pytorch Dataloader,bertModel 出现暗淡错误
- java - Java 日志记录问题。它行不通。找不到原因