java - 在 Google Dataflow 上使用 KafkaIO 通过 SSL 连接到 Kafka
问题描述
从服务器,我能够连接并从配置了 SSL 的远程 kafka 服务器主题中获取数据。
从 GCP,如何使用 Google Dataflow 管道通过 SSL 信任库、密钥库证书位置和 Google 服务帐户 json 连接到远程 kafka 服务器?
我正在为数据流运行器选项使用 Eclipse 插件。
如果我指向 GCS 上的证书,则当证书指向 Google 存储桶时会引发错误。
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
Caused by: org.apache.kafka.common.KafkaException:
java.io.FileNotFoundException:
gs:/bucket/folder/truststore-client.jks (No such file or directory)
紧随其后:Truststore 和 Google Cloud Dataflow
更新了将 SSL 信任库、密钥库位置指向本地机器的 /tmp 目录证书的代码,以防 KafkaIO 需要从文件路径中读取。它没有抛出 FileNotFoundError。
尝试从 GCP 帐户运行服务器 Java 客户端代码并使用 Dataflow - Beam Java 管道,我收到以下错误。
ssl.truststore.location = <LOCAL MACHINE CERTICATE FILE PATH>
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka version : 1.0.0
org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka commitId : aaa7af6d4a11b29d
org.apache.kafka.common.network.SslTransportLayer close
WARNING: Failed to send SSL Close message
java.io.IOException: Broken pipe
org.apache.beam.runners.direct.RootProviderRegistry.getInitialInputs(RootProviderRegistry.java:81)
at org.apache.beam.runners.direct.ExecutorServiceParallelExecutor.start(ExecutorServiceParallelExecutor.java:153)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:205)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at
org.apache.kafka.common.utils.LogContext$KafkaLogger warn
WARNING: [Consumer clientId=consumer-1, groupId=test-group] Connection to node -2 terminated during authentication. This may indicate that authentication failed due to invalid credentials.
任何建议或示例表示赞赏。
解决方案
Git 从本地机器克隆或上传 Java Maven 项目到 GCP Cloud Shell 主目录。在 Cloud Shell 终端上使用 Dataflow 运行器命令编译项目。
mvn -Pdataflow-runner compile exec:java \
-Dexec.mainClass=com.packagename.JavaClass \
-Dexec.args="--project=PROJECT_ID \
--stagingLocation=gs://BUCKET/PATH/ \
--tempLocation=gs://BUCKET/temp/ \
--output=gs://BUCKET/PATH/output \
--runner=DataflowRunner"
确保运行器设置为 DataflowRunnner.class,并且在云上运行时您会在 Dataflow 控制台上看到该作业。DirectRunner 执行不会显示在云数据流控制台上。
将证书放在 Maven 项目的资源文件夹中,并使用 ClassLoader 读取文件。
ClassLoader classLoader = getClass().getClassLoader();
File file = new File(classLoader.getResource("keystore.jks").getFile());
resourcePath.put("keystore.jks",file.getAbsoluteFile().getPath());
编写 ConsumerFactoryFn() 以复制 Dataflow 的“/tmp/”目录中的证书,如https://stackoverflow.com/a/53549757/4250322中所述
将 KafkaIO 与资源路径属性一起使用。
Properties props = new Properties();
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/tmp/truststore.jks");
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/tmp/keystore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, PASSWORD);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, PASSWORD);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, PASSWORD);
//other properties
...
PCollection<String> collection = p.apply(KafkaIO.<String, String>read()
.withBootstrapServers(BOOTSTRAP_SERVERS)
.withTopic(TOPIC)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(props)
.withConsumerFactoryFn(new ConsumerFactoryFn())
.withMaxNumRecords(50)
.withoutMetadata()
).apply(Values.<String>create());
// Apply Beam transformations and write to output.
推荐阅读
- php - Xampp 在其服务中显示十字标志
- php - Vuejs从从php服务器收到的日期开始修复日期
- vue.js - 如何根据 API 获取的内容创建列表 + 详细信息页面
- c# - 'Sap.Data.Hana.HanaConnection' 的类型初始化程序引发了异常。---> System.IO.FileNotFoundException: 找不到 libADONETHDB.dll
- node.js - 如何使用节点 oidc 提供程序获取用户信息
- api - 使用 C# 计算 bitbns 加密交换 API 的有效负载和签名(C# 等效于 javascript 示例代码不起作用)
- docker - Docker 容器不健康但日志中没有错误
- javascript - 在chartjs中绘制数学函数
- google-apps-script - 你如何从谷歌脚本执行 .bat 文件?
- java - 在android 7及更高版本中运行我自己的应用程序时如何禁用语音通话和移动数据