amazon-web-services - Kafka-connect s3 源连接器配置问题
问题描述
我已经使用 kafka-connect s3 sink 连接器从一个主题上传了一些 avro 消息,比如说my.topic
,到一个 amazon s3 存储桶。s3-bucket
接收器连接器的配置如下:
{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"key.converter": "org.apache.kafka.connect.converters.LongConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schemaregistry:8099",
"value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
"tasks.max": "1",
"topics": "my.topic",
"s3.region": "eu-west-2",
"s3.bucket.name": "s3-bucket",
"flush.size": "5",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"schema.compatibility": "NONE",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner"
}
这按预期工作,所有消息都是具有相同架构版本的相同记录,我将 5 写入主题并在我的存储桶中看到一个带有路径的 s3 对象
/topics/my.topic/partition=0/my.topic+0+0000000000.avro
现在我想把这些存储的消息放到另一个空主题上。我使用以下配置启动 s3 源连接器:
{
"confluent.topic.bootstrap.servers": "kafka:9092",
"confluent.topic.replication.factor": 1,
"connector.class": "io.confluent.connect.s3.source.S3SourceConnector",
"s3.region": "eu-west-2",
"s3.bucket.name": "s3-bucket",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": ".*",
"transforms.AddPrefix.replacement": "recovery_$0"
}
当我查看由 kafka-connect 生成的日志(在 docker 容器内运行)时,它似乎很高兴,没有错误,它正确识别了我的存储桶,并且它内部的目录路径被分配为被监视
/topics/my.topic/partition=0/
但是它永远不会检测到里面的文件,也永远不会向预期的主题写入任何recovery_my.topic
内容。它反复记录
kafka-connect | [2020-07-05 15:31:46,311] INFO PartitionCheckingTask - Checking if Partitions have changed. (io.confluent.connect.cloud.storage.source.util.PartitionCheckingTask)
kafka-connect | [2020-07-05 15:31:47,963] INFO WorkerSourceTask{id=tx-s3-restore-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
kafka-connect | [2020-07-05 15:31:47,964] INFO WorkerSourceTask{id=tx-s3-restore-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
kafka-connect | [2020-07-05 15:31:50,483] INFO AvroDataConfig values:
kafka-connect | schemas.cache.config = 50
kafka-connect | enhanced.avro.schema.support = false
kafka-connect | connect.meta.data = true
kafka-connect | (io.confluent.connect.avro.AvroDataConfig)
kafka-connect | [2020-07-05 15:31:50,483] INFO AvroDataConfig values:
kafka-connect | schemas.cache.config = 50
kafka-connect | enhanced.avro.schema.support = false
kafka-connect | connect.meta.data = true
kafka-connect | (io.confluent.connect.avro.AvroDataConfig)
kafka-connect | [2020-07-05 15:31:50,537] INFO AvroDataConfig values:
kafka-connect | schemas.cache.config = 50
kafka-connect | enhanced.avro.schema.support = false
kafka-connect | connect.meta.data = true
kafka-connect | (io.confluent.connect.avro.AvroDataConfig)
kafka-connect | [2020-07-05 15:31:50,589] INFO No new files ready after scan task assigned folders (io.confluent.connect.cloud.storage.source.StorageSourceTask)
这向我表明它出于某种原因忽略了该文件?这是从日志中提取的完整 s3 源连接器配置
kafka-connect | [2020-07-05 15:10:49,427] INFO S3SourceConnectorConfig values:
kafka-connect | behavior.on.error = fail
kafka-connect | confluent.license =
kafka-connect | confluent.topic = _confluent-command
kafka-connect | confluent.topic.bootstrap.servers = [kafka:9092]
kafka-connect | confluent.topic.replication.factor = 1
kafka-connect | directory.delim = /
kafka-connect | filename.regex = (.+)\+(\d+)\+.+$
kafka-connect | folders = [topics/my.topic/partition=0/]
kafka-connect | format.bytearray.extension = .bin
kafka-connect | format.bytearray.separator =
kafka-connect | format.class = class io.confluent.connect.s3.format.avro.AvroFormat
kafka-connect | partition.field.name = []
kafka-connect | partitioner.class = class io.confluent.connect.storage.partitioner.DefaultPartitioner
kafka-connect | path.format =
kafka-connect | record.batch.max.size = 200
kafka-connect | s3.bucket.name = s3-bucket
kafka-connect | s3.credentials.provider.class = class com.amazonaws.auth.DefaultAWSCredentialsProviderChain
kafka-connect | s3.http.send.expect.continue = true
kafka-connect | s3.part.retries = 3
kafka-connect | s3.poll.interval.ms = 60000
kafka-connect | s3.proxy.password = [hidden]
kafka-connect | s3.proxy.url =
kafka-connect | s3.proxy.user = null
kafka-connect | s3.region = eu-west-2
kafka-connect | s3.retry.backoff.ms = 200
kafka-connect | s3.sse.customer.key = [hidden]
kafka-connect | s3.ssea.name =
kafka-connect | s3.wan.mode = false
kafka-connect | schema.cache.size = 50
kafka-connect | store.url = null
kafka-connect | topics.dir = topics
kafka-connect | (io.confluent.connect.s3.source.S3SourceConnectorConfig)
kafka-connect | [2020-07-05 15:10:49,428] INFO [Producer clientId=connector-producer-tx-s3-restore-0] Cluster ID: nlQYzBVYRbWozKk54-Qx_A (org.apache.kafka.clients.Metadata)
kafka-connect | [2020-07-05 15:10:49,432] INFO AvroDataConfig values:
kafka-connect | schemas.cache.config = 50
kafka-connect | enhanced.avro.schema.support = false
kafka-connect | connect.meta.data = true
kafka-connect | (io.confluent.connect.avro.AvroDataConfig)
kafka-connect | [2020-07-05 15:10:49,434] INFO Starting source connector task with assigned folders [topics/my.topic/partition=0/] using partitioner io.confluent.connect.storage.partitioner.DefaultPartitioner (io.confluent.connect.cloud.storage.source.StorageSourceTask)
如果有人对我的文件被忽略的原因有任何想法,我将不胜感激。
解决方案
由于 confluent s3 源连接器不是开源的并且需要许可证,因此您需要在源连接器配置中添加 30 天试用期的 confluent 许可证:
"confluent.license": ""
我尝试了我的用例并且它正在工作。
推荐阅读
- google-chrome - Chrome 更新后出现“Permissions-Policy 标头错误”
- django - 姜戈。通过模型与手动中间模型的 M2M 领域?
- python - Keras 多模型 API
- ruby-on-rails - 我应该创建另一个引用唯一 ID 的表还是只在 Ruby on Rails 中添加唯一 ID 列?
- python - 查找集合字典的有效方法在python中具有零交集
- javascript - ajax preventDefault() 不适用于 django 表单
- angular - 在 Angular 表单验证期间添加 CSS 类
- mysql - 我的查询不起作用,请帮我找出错误
- c++ - LOAD_LIBRARY_AS_IMAGE_RESOURCE 和 LOAD_LIBRARY_AS_DATAFILE 有什么区别?
- sql - SQL Rank 和 Partition BY