apache-kafka - Confluent dataproc Sink Connector可以直接写入谷歌云存储桶吗
问题描述
我正在使用 kafka-connect、Confluent dataproc sink 连接器将数据写入 google dataproc 集群。我的 dataproc 集群配置了谷歌云存储桶。创建后,dataproc sink 连接器通过 datanode 将数据写入 dataproc 的 HDFS 文件系统,并将数据存储在 dataproc 节点的持久磁盘中。
有什么方法可以配置 confluent dataproc sink 连接器将数据直接写入 dataproc 集群的云存储桶,而不是 HDFS 文件系统?
尝试的解决方案没有成功
- 我尝试将“fs.defaultFS”网址更改为“gs://<bucket_name>”,但连接器写入 HDFS 文件系统
- 我可以使用谷歌云存储连接器而不是 dataproc 接收器连接器,但云存储接收器连接器中缺少配置单元集成支持
Dataproc 接收器连接器配置
{
"name": "dataproc-sink-connector",
"connector.class": "io.confluent.connect.gcp.dataproc.DataprocSinkConnector",
"tasks.max": "1",
"topics": "poc-input-topic",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"gcp.dataproc.projectId": "******",
"gcp.dataproc.region": "asia-southeast1",
"gcp.dataproc.cluster": "******-dataproc",
"gcp.dataproc.credentials.json": "******",
"format.class": "io.confluent.connect.gcp.dataproc.hdfs.parquet.ParquetFormat",
"flush.size": "330",
"storage.class": " io.confluent.connect.gcp.dataproc.hdfs.storage.HdfsStorage",
"topics.dir": "data",
"logs.dir": "logs",
"store.url": "gs://*****-dataproc-bucket/",
"partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
"locale": "en-US",
"timezone": "UTC",
"hive.integration": "true",
"hive.metastore.uris": "thrift://*******:9083",
"hive.database": "fnidb",
"schema.compatibility": "BACKWARD",
"confluent.topic.ssl.endpoint.identification.algorithm": "https",
"confluent.topic.security.protocol": "SASL_SSL",
"confluent.topic.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='*****' password='*****'",
"confluent.topic.sasl.mechanism": "PLAIN",
"confluent.topic.request.timeout.ms": "20000",
"confluent.topic.retry.backoff.ms": "500",
"confluent.topic.sasl.kerberos.service.name": "kafka",
"confluent.topic.bootstrap.servers": "******.asia-southeast1.gcp.confluent.cloud:9092",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.schema.registry.basic.auth.user.info": "*****:*************",
"value.converter.schema.registry.url": "https://******.australia-southeast1.gcp.confluent.cloud",
"key.converter.basic.auth.credentials.source": "USER_INFO",
"key.converter.schema.registry.basic.auth.user.info": "*****:*************",
"key.converter.schema.registry.url": "https://******.australia-southeast1.gcp.confluent.cloud",
"hdfs.authentication.kerberos": "true",
"connect.hdfs.principal": "hdfs/****@REALM.COM",
"connect.hdfs.keytab": "/etc/security/keytab/hdfs.service.keytab",
"hdfs.namenode.principal": "hdfs/****@REALM.COM",
"kerberos.ticket.renew.period.ms": "3600000",
"hive.conf.dir": "/etc/hive/conf"
}
解决方案
推荐阅读
- python - numpy vectorize 函数接收像素的 r,g,b 值
- pandas - 分组后在熊猫中填充NaN值
- c# - 将二维json数组转换为C#类
- python - 如何生成均匀分布在矩形上的二维点?
- javascript - 使用 Tailwind CSS 使用绝对位置将图像放置在另一个图像上方
- wordpress - Nginx try_files index.php 无法处理
- python - lazy 和 uselist 在 flask-sqlalchemy 中做了什么?
- mongodb - 数据模型中的嵌入和引用 - mongodb
- java - java.lang.ClassCastException:org.hibernate.mapping.SingleTableSubclass 不能转换为 org.hibernate.mapping.RootClass
- python - 替换预期的至少 2 个参数得到 1 个错误