apache-kafka - 使用 Beam Python SDK 和 PortableRunner 通过 SSL 连接到 Kafka
问题描述
我有下面的代码,用于使用python beam sdk连接到 kafka 。我知道ReadFromKafka
转换是在 java sdk 线束(docker 容器)中运行的,但我无法弄清楚如何在 sdk 线束的 docker 环境中进行制作ssl.truststore.location
和访问。ssl.keystore.location
论据job_endpoint
指向java -jar beam-runners-flink-1.10-job-server-2.27.0.jar --flink-master localhost:8081
pipeline_args.extend([
'--job_name=paul_test',
'--runner=PortableRunner',
'--sdk_location=container',
'--job_endpoint=localhost:8099',
'--streaming',
"--environment_type=DOCKER",
f"--sdk_harness_container_image_overrides=.*java.*,{my_beam_sdk_docker_image}:{my_beam_docker_tag}",
])
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline:
kafka = pipeline | ReadFromKafka(
consumer_config={
"bootstrap.servers": "bootstrap-server:17032",
"security.protocol": "SSL",
"ssl.truststore.location": "/opt/keys/client.truststore.jks", # how do I make this available to the Java SDK harness
"ssl.truststore.password": "password",
"ssl.keystore.type": "PKCS12",
"ssl.keystore.location": "/opt/keys/client.keystore.p12", # how do I make this available to the Java SDK harness
"ssl.keystore.password": "password",
"group.id": "group",
"basic.auth.credentials.source": "USER_INFO",
"schema.registry.basic.auth.user.info": "user:password"
},
topics=["topic"],
max_num_records=2,
# expansion_service="localhost:56938"
)
kafka | beam.Map(lambda x: print(x))
我尝试将图像覆盖选项指定为--sdk_harness_container_image_overrides='.*java.*,beam_java_sdk:latest'
-beam_java_sdk:latest
我基于apache/beam_java11_sdk:2.27.0
的 docker 图像在哪里,它在其 entrypoint.sh 中提取凭据。但是 Beam 似乎没有使用它,我明白了
INFO org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory - Still waiting for startup of environment apache/beam_java11_sdk:2.27.0 for worker id 1-1
在日志中。很快不可避免地紧随其后
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /opt/keys/client.keystore.p12 of type PKCS12
总之,我的问题是,在 Apache Beam 中,是否可以在 python 梁 sdk 的 java sdk 工具 docker 容器中提供文件?如果是这样,它会如何完成?
非常感谢。
解决方案
目前,没有直接的方法来实现这一点。正在进行讨论和跟踪问题以支持这种扩展服务定制(请参阅此处、此处、BEAM-12538和BEAM-12539)。这是简短的答案。
长答案是肯定的,你可以这样做。您必须将ExpansionService.java复制 & 粘贴到您的代码库中并构建您的自定义扩展服务,您可以在此处指定默认环境 (DOCKER) 和默认环境配置(您的图像)。然后,您必须手动运行此扩展服务并使用expansion_service
ReadFromKafka 的参数指定其地址。
推荐阅读
- java - 在eclipse中的tomcat 9上运行jersey-quickstart-webapp时得到404
- sql - 仅插入唯一行并更新已存在的行
- data-structures - Arduino:中缀到后缀
- python - 循环内的 Python 列表理解会破坏 Pycharm 调试器?
- php - 获取 CRUD 上的最后一个 ID
- python - 使用 Python 和 Raspberry 以及 nobile 网络进行实时网络摄像头流式传输而没有更大的延迟?
- node.js - 我有一个员工表和一个任务表(posgresql)。我需要将许多任务分配给一名员工
- java - 当我调用一个方法 10000 次时,它是如何抛出内存不足错误的?
- regex - 如何在我的正则表达式中删除周围的引号/代码块格式?
- azure-devops - Nuget 安装失败:包不包含任何与该框架兼容的程序集引用或内容文件