首页 > 解决方案 > 在 Google Dataflow 上使用 KafkaIO 通过 SSL 连接到 Kafka

问题描述

从服务器,我能够连接并从配置了 SSL 的远程 kafka 服务器主题中获取数据。

从 GCP,如何使用 Google Dataflow 管道通过 SSL 信任库、密钥库证书位置和 Google 服务帐户 json 连接到远程 kafka 服务器?

我正在为数据流运行器选项使用 Eclipse 插件。

如果我指向 GCS 上的证书,则当证书指向 Google 存储桶时会引发错误。


Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

Caused by: org.apache.kafka.common.KafkaException:
 java.io.FileNotFoundException: 
gs:/bucket/folder/truststore-client.jks (No such file or directory)

紧随其后:Truststore 和 Google Cloud Dataflow

更新了将 SSL 信任库、密钥库位置指向本地机器的 /tmp 目录证书的代码,以防 KafkaIO 需要从文件路径中读取。它没有抛出 FileNotFoundError。

尝试从 GCP 帐户运行服务器 Java 客户端代码并使用 Dataflow - Beam Java 管道,我收到以下错误。


ssl.truststore.location = <LOCAL MACHINE CERTICATE FILE PATH>
    ssl.truststore.password = [hidden]
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka version : 1.0.0
org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka commitId : aaa7af6d4a11b29d
org.apache.kafka.common.network.SslTransportLayer close
WARNING: Failed to send SSL Close message 
java.io.IOException: Broken pipe

org.apache.beam.runners.direct.RootProviderRegistry.getInitialInputs(RootProviderRegistry.java:81)
    at org.apache.beam.runners.direct.ExecutorServiceParallelExecutor.start(ExecutorServiceParallelExecutor.java:153)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:205)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
    at 

org.apache.kafka.common.utils.LogContext$KafkaLogger warn
WARNING: [Consumer clientId=consumer-1, groupId=test-group] Connection to node -2 terminated during authentication. This may indicate that authentication failed due to invalid credentials.

任何建议或示例表示赞赏。

标签: javasslapache-kafkagoogle-cloud-dataflowapache-beam

解决方案


Git 从本地机器克隆或上传 Java Maven 项目到 GCP Cloud Shell 主目录。在 Cloud Shell 终端上使用 Dataflow 运行器命令编译项目。

mvn -Pdataflow-runner compile exec:java \
      -Dexec.mainClass=com.packagename.JavaClass \
      -Dexec.args="--project=PROJECT_ID \
      --stagingLocation=gs://BUCKET/PATH/ \
      --tempLocation=gs://BUCKET/temp/ \
      --output=gs://BUCKET/PATH/output \
      --runner=DataflowRunner"

确保运行器设置为 DataflowRunnner.class,并且在云上运行时您会在 Dataflow 控制台上看到该作业。DirectRunner 执行不会显示在云数据流控制台上。

将证书放在 Maven 项目的资源文件夹中,并使用 ClassLoader 读取文件。

ClassLoader classLoader = getClass().getClassLoader();
File file = new File(classLoader.getResource("keystore.jks").getFile());    
resourcePath.put("keystore.jks",file.getAbsoluteFile().getPath());

编写 ConsumerFactoryFn() 以复制 Dataflow 的“/tmp/”目录中的证书,如https://stackoverflow.com/a/53549757/4250322中所述

将 KafkaIO 与资源路径属性一起使用。

Properties props = new Properties();
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/tmp/truststore.jks");    
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/tmp/keystore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,  PASSWORD);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,  PASSWORD); 
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,  PASSWORD);

//other properties
...

PCollection<String> collection = p.apply(KafkaIO.<String, String>read()
                .withBootstrapServers(BOOTSTRAP_SERVERS)
                .withTopic(TOPIC)                                
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)                
                .updateConsumerProperties(props)
                .withConsumerFactoryFn(new ConsumerFactoryFn())
                .withMaxNumRecords(50)
                .withoutMetadata()
        ).apply(Values.<String>create());

// Apply Beam transformations and write to output.


推荐阅读