message-hub - Message Hub & Confluent Kafka Connect S3
问题描述
I have requirement to consume messages from IBM MHub topic into IBM Object Storage.
I got it working with local Kafka server with Confluent Kafka Connect S3 plugin as standalone worker for sink Amazon S3 bucket and file. Both was a success.
If I configure Confluent Kafka Connect S3 as distributed worker for IBM MHub cluster I get no errors but still no messages end up to Amazon S3 bucket. I tried file sink also, no luck either.
Is it possible at all?
解决方案
来自:https ://kafka.apache.org/documentation/#connect_running
此处配置的参数旨在供 Kafka Connect 使用的生产者和消费者访问配置、偏移和状态主题。Kafka source 和 Kafka sink 任务的配置,可以使用相同的参数,但需要以consumer为前缀。和制作人。分别。从worker配置继承的唯一参数是bootstrap.servers,在大多数情况下就足够了,因为同一个集群通常用于所有目的。一个值得注意的例外是安全集群,它需要额外的参数才能允许连接。这些参数最多需要在工作器配置中设置 3 次,一次用于管理访问,一次用于 Kafka 接收器,一次用于 Kafka 源。
所以解决方案是向消费者添加重复配置。在工作人员配置中添加前缀,以便在接收器使用者上进行所需的 sasl_ssl 设置而不是默认设置。
IBM Cloud Object Storage 也可以使用。需要凭据,例如。环境变量:AWS_ACCESS_KEY_ID="查看 cos 凭证" & AWS_SECRET_ACCESS_KEY="查看 cos 凭证"
连接器配置:
{
"name": "s3-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "5",
"topics": "your-topic",
"s3.region": "eu-central-1",
"store.url": "https://s3.eu-geo.objectstorage.softlayer.net",
"s3.bucket.name": "your-bucket",
"s3.part.size": "5242880",
"flush.size": "1",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility": "NONE",
"name": "s3-sink"
}
}
推荐阅读
- scala - 为什么它不解码为 ADT 类型?
- mysql - 将大型 CSV 文件导入 MySQL 时出错
- ios - 来自远程 URL 的音频开始播放需要将近 10 秒
- r - 在 Y 轴上的 ggplot2 中引入显式换行符(箱线图)
- html - 在输入文本之间添加填充
- javascript - 更新要在另一个变量中使用的变量
- angular - 如何使用 Ionics 离子幻灯片在 HTML5 画布之间切换?
- apache-spark - 通过 R 本地或外部访问 spark
- android - 无法使用 Android 绑定在 RecyclerView 上绑定 RatingBar
- c - 在下面的代码中分叉后发送信号有什么问题