apache-kafka - 为什么GCS Connector在GCS中创建Kafka分区目录但不写入kafka主题数据
问题描述
我是 Confluent GCS 连接器的新手。我遵循了https://docs.confluent.io/4.1.2/connect/kafka-connect-gcs/gcs_connector.html中概述的文档,除了没有出现在 GCS 存储桶中的 Kafka 主题数据之外,一切都按计划进行。目录结构类似于 kafka 主题名称,但没有实际的主题记录。我正在使用 Avro 格式架构的上述文档中包含的默认属性文件,并且刷新大小为 3。我下载了凭据文件,并且可以从 VM 实例中看到。我还可以将本地文件从我的 Google VM 实例复制到存储桶,这样我就知道我拥有正确的权限。
{"name": "gcs",
"config": {
"connector.class": "io.confluent.connect.gcs.GcsSinkConnector",
"tasks.max": "1",
"topics": "gcs_topic",
"gcs.bucket.name": "#bucket-name",
"gcs.part.size": "5242880",
"flush.size": "3",
"gcs.credentials.path": "#/path/to/credentials/keys.json",
"storage.class": "io.confluent.connect.gcs.storage.GcsStorage",
"format.class": "io.confluent.connect.gcs.format.avro.AvroFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility": "NONE",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"name": "gcs"
},
"tasks": [],
"type": null
}
连接属性文件如下:
bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
Plugin.path=~/confluent/share/java,~/confluent/share/confluent-hub-components,~/confluent/etc,~/confluent/etc/
rest.extension.classes=io.confluent.connect.replicator.monitoring.ReplicatorMonitoringExtension
producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
解决方案
推荐阅读
- python-3.x - 如何计算指定索引以外的元素的总和?
- php - 随机化对象数组的一部分
- php - 将用户从年龄验证重定向到原始网址
- android - Android Workmanger PeriodicWorkRequest API 只能工作一次?
- react-native - 如何自定义 expo 默认启动屏幕并将其隐藏
- php - 如何在 larvel 5.4 中使 JWT 令牌无效或列入黑名单
- javascript - 多次订阅 observable 有时不起作用
- java - Eclipse - 为什么推断 Java 数组的泛型建议
- java - eclipse 无法从打开的文件系统导入项目?
- java - 识别对象类型(即类)