首页 > 解决方案 > Confluent dataproc Si​​nk Connector可以直接写入谷歌云存储桶吗

问题描述

我正在使用 kafka-connect、Confluent dataproc sink 连接器将数据写入 google dataproc 集群。我的 dataproc 集群配置了谷歌云存储桶。创建后,dataproc sink 连接器通过 datanode 将数据写入 dataproc 的 HDFS 文件系统,并将数据存储在 dataproc 节点的持久磁盘中。

有什么方法可以配置 confluent dataproc sink 连接器将数据直接写入 dataproc 集群的云存储桶,而不是 HDFS 文件系统?

尝试的解决方案没有成功

Dataproc 接收器连接器配置

{
    "name": "dataproc-sink-connector",
    "connector.class": "io.confluent.connect.gcp.dataproc.DataprocSinkConnector",
    "tasks.max": "1",
    "topics": "poc-input-topic",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "gcp.dataproc.projectId": "******",
    "gcp.dataproc.region": "asia-southeast1",
    "gcp.dataproc.cluster": "******-dataproc",
    "gcp.dataproc.credentials.json": "******",
    "format.class": "io.confluent.connect.gcp.dataproc.hdfs.parquet.ParquetFormat",
    "flush.size": "330",
    "storage.class": " io.confluent.connect.gcp.dataproc.hdfs.storage.HdfsStorage",
    "topics.dir": "data",
    "logs.dir": "logs",
    "store.url": "gs://*****-dataproc-bucket/",
    "partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
    "locale": "en-US",
    "timezone": "UTC",
    "hive.integration": "true",
    "hive.metastore.uris": "thrift://*******:9083",
    "hive.database": "fnidb",
    "schema.compatibility": "BACKWARD",
    "confluent.topic.ssl.endpoint.identification.algorithm": "https",
    "confluent.topic.security.protocol": "SASL_SSL",
    "confluent.topic.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='*****' password='*****'",
    "confluent.topic.sasl.mechanism": "PLAIN",
    "confluent.topic.request.timeout.ms": "20000",
    "confluent.topic.retry.backoff.ms": "500",
    "confluent.topic.sasl.kerberos.service.name": "kafka",
    "confluent.topic.bootstrap.servers": "******.asia-southeast1.gcp.confluent.cloud:9092",
    "value.converter.basic.auth.credentials.source": "USER_INFO",
    "value.converter.schema.registry.basic.auth.user.info": "*****:*************",
    "value.converter.schema.registry.url": "https://******.australia-southeast1.gcp.confluent.cloud",
    "key.converter.basic.auth.credentials.source": "USER_INFO",
    "key.converter.schema.registry.basic.auth.user.info": "*****:*************",
    "key.converter.schema.registry.url": "https://******.australia-southeast1.gcp.confluent.cloud",
    "hdfs.authentication.kerberos": "true",
    "connect.hdfs.principal": "hdfs/****@REALM.COM",
    "connect.hdfs.keytab": "/etc/security/keytab/hdfs.service.keytab",
    "hdfs.namenode.principal": "hdfs/****@REALM.COM",
    "kerberos.ticket.renew.period.ms": "3600000",
    "hive.conf.dir": "/etc/hive/conf"
}

标签: apache-kafkaapache-kafka-connectgoogle-cloud-dataproc

解决方案


推荐阅读