首页 > 解决方案 > Kafka 连接器记录写入器因缺少要分配的内存而卡在 S3OutputStream 中,但在几个小时内保持空闲状态并没有失败

问题描述

我的行为我不完全知道如何修改。我正在测试 s3 Kafka 同步连接器,但我的主题中的数据很少。

目前,我可以通过使用 Kafka 管理器看到主题中有数据,但我的连接器读取数据并且从不移动偏移量,也从不将其推送到 Kafka。在其他主题中这有效,但在此特定主题中则无效。我认为与超时有关,但我找不到要设置的正确配置属性,以便刷新更快一些。

这是我的配置:

      curl -X PUT -s -o /dev/null -H ""Content-Type:application/json""
      http://localhost:$$CONNECT_REST_PORT/connectors/s3_connector_doc_cmg/config
      \
        -d '{
          ""connector.class"": ""io.confluent.connect.s3.S3SinkConnector"",
          ""storage.class"": ""io.confluent.connect.s3.storage.S3Storage"",
          ""s3.region"": ""us-east-1"",
          ""s3.bucket.name"": ""confluent-pipeline"",
          ""topics.dir"": ""topics"",
          ""topics"": ""com.acp.bde.doc_cmg"",
          ""flush.size"": ""25"",
          ""rotate.interval.ms"": ""5000"",
          ""auto.register.schemas"": ""false"",
          ""tasks.max"": ""1"",
          ""s3.part.size"": ""5242880"",
          ""timezone"": ""UTC"",
          ""parquet.codec"": ""snappy"",
          ""offset.flush.interval.ms"": ""5000"",
          ""offset.flush.timeout.ms"": ""1000"",
          ""s3.credentials.provider.class"": ""com.amazonaws.auth.DefaultAWSCredentialsProviderChain"",
          ""format.class"": ""io.confluent.connect.s3.format.avro.AvroFormat"",
          ""value.converter"": ""com.insight.connect.protobuf.ProtobufConverter"",
          ""key.converter"": ""org.apache.kafka.connect.storage.StringConverter"",
          ""partitioner.class"": ""io.confluent.connect.storage.partitioner.DailyPartitioner"",
          ""locale"": ""de-CH"",
          ""timezone"": ""Europe/Zurich"",
          ""store.url"": ""http://minio-server-svc:9000/""
        }'"

这就是我在日志中看到的:

[2020-10-23 10:35:47,594] INFO Opening record writer for: topics/com.acp.bde.doc_cmg/year=2020/month=10/day=23/com.acp.bde.doc_cmg+1+0000000000.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
[2020-10-23 10:35:48,017] INFO Opening record writer for: topics/com.acp.bde.doc_cmg/year=2020/month=10/day=23/com.acp.bde.doc_cmg+3+0000000000.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
[2020-10-23 10:35:48,075] INFO Opening record writer for: topics/com.acp.bde.doc_cmg/year=2020/month=10/day=23/com.acp.bde.doc_cmg+2+0000000000.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
[2020-10-23 11:35:37,989] INFO [Worker clientId=connect-1, groupId=kafka-connect-01] Session key updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

所以它们已经开放了将近 1 小时,并且没有真正发生任何事情,我想知道我的配置是否完全错误,或者我需要一些属性和配置,因此这种数据推送速度要快一些。

更新:我仍然没有正确修复这个问题,但实际上它似乎是内存不足的问题。

程序只是卡在这一行 this.buffer = ByteBuffer.allocate(this.partSize);

https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/S3OutputStream.java#L85

困扰我的部分是它根本没有抱怨,只是卡在那里。它不应该因内存不足而崩溃吗?还是不应该更快地释放内存?它几乎可以在没有任何反馈的情况下停留在该呼叫中超过 3 或 4 小时。

我仍然认为我的配置可能有问题,但我不知道应该看什么或在哪里看。

标签: amazon-s3apache-kafkaapache-kafka-connectminio

解决方案


您的分区程序是基于时间的。因此,当rotate.schedule.interval.ms参数不存在时,这可能会发生。看看以下主题https://stackoverflow.com/a/51160834/1551246


推荐阅读