首页 > 解决方案 > Camel aws-s3 Source Connector Error - 应该如何更改配置

问题描述

我正在使用我们的 Confluent (5.5.1) 安装定义 Camel S3 Source 连接器。创建连接器并将状态检查为“正在运行”后,我将文件上传到我的 S3 存储桶。即使我对存储桶进行 ls 操作,它也是空的,这表明该文件已被处理和删除。但是,我没有看到该主题中的消息。我基本上按照这个示例尝试一个简单的 4 行文件,而不是独立的 kafka,而是在一个融合集群上执行它。

这是我的配置

{
    "name": "CamelAWSS3SourceConnector",
    "connector.class": "org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector",
    "bootstrap.servers": "broker1-dev:9092,broker2-dev:9092,broker3-dev:9092",
    "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\"  password=\"password\";",
    "security.protocol": "SASL_SSL",
    "ssl.truststore.location": "/config/client.truststore.jks",
    "ssl.truststore.password": "password",
    "ssl.keystore.location": "/config/client.keystore.jks",
    "ssl.keystore.password": "password",
    "ssl.key.password": "password",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "errors.tolerance": "all",
    "offset.flush.timeout.ms": "60000",
    "offset.flush.interval.ms": "10000",
    "max.request.size": "10485760",
    "flush.size": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.camel.kafkaconnector.awss3.converters.S3ObjectConverter",
    "camel.source.maxPollDuration": "10000",
    "topics": "TEST-CAMEL-S3-SOURCE-POC",
    "camel.source.path.bucketNameOrArn": "arn:aws:s3:::my-bucket",
    "camel.component.aws-s3.region": "US_EAST_1",
    "tasks.max": "1",
    "camel.source.endpoint.useIAMCredentials": "true",
    "camel.source.endpoint.autocloseBody": "true"
}

我在日志中看到了这些错误

[2020-12-23 09:05:01,876] ERROR WorkerSourceTask{id=CamelAWSS3SourceConnector-0} Failed to flush, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:448)
[2020-12-23 09:05:01,876] ERROR WorkerSourceTask{id=CamelAWSS3SourceConnector-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:116)


[2020-12-23 09:20:58,685] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] Received successful Heartbeat response (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:1045)
[2020-12-23 09:20:58,688] DEBUG WorkerSourceTask{id=CamelAWSS3SourceConnector-0} Committing offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:111)
[2020-12-23 09:20:58,688] INFO WorkerSourceTask{id=CamelAWSS3SourceConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:426)
[2020-12-23 09:20:58,688] INFO WorkerSourceTask{id=CamelAWSS3SourceConnector-0} flushing 1 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:443)

如果我对连接器的状态进行 curl 请求,我会收到此状态错误

trace: org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
        at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:111)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:438)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

我在几个链接中看到了以下解决方案,但这也没有帮助。它建议在配置中添加以下键

"offset.flush.timeout.ms": "60000",
"offset.flush.interval.ms": "10000",
"max.request.size": "10485760",

谢谢

更新

我将配置削减到最小,但仍然得到相同的错误

{
    "connector.class": "org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.camel.kafkaconnector.awss3.converters.S3ObjectConverter",
    "camel.source.maxPollDuration": "10000",
    "topics": "TEST-S3-SOURCE-MINIMAL-POC",
    "camel.source.path.bucketNameOrArn": "pruvpcaws003-np-use1-push-json-poc",
    "camel.component.aws-s3.region": "US_EAST_1",
    "tasks.max": "1",
    "camel.source.endpoint.useIAMCredentials": "true",
    "camel.source.endpoint.autocloseBody": "true"
}

仍然得到同样的错误

trace: org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
    at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:111)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:438)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

不知道我还应该在哪里寻找根本原因

标签: apache-kafka-connectconfluent-platformapache-camel-awss3-kafka-connector

解决方案


推荐阅读