首页 > 解决方案 > 使用 Beam Python SDK 和 PortableRunner 通过 SSL 连接到 Kafka

问题描述

我有下面的代码,用于使用python beam sdk连接到 kafka 。我知道ReadFromKafka转换是在 java sdk 线束(docker 容器)中运行的,但我无法弄清楚如何在 sdk 线束的 docker 环境中进行制作ssl.truststore.location和访问。ssl.keystore.location论据job_endpoint指向java -jar beam-runners-flink-1.10-job-server-2.27.0.jar --flink-master localhost:8081

pipeline_args.extend([
    '--job_name=paul_test',
    '--runner=PortableRunner',
    '--sdk_location=container',
    '--job_endpoint=localhost:8099',
    '--streaming',
    "--environment_type=DOCKER",
    f"--sdk_harness_container_image_overrides=.*java.*,{my_beam_sdk_docker_image}:{my_beam_docker_tag}",
])

with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline:
    kafka = pipeline | ReadFromKafka(
        consumer_config={
            "bootstrap.servers": "bootstrap-server:17032",
            "security.protocol": "SSL",
            "ssl.truststore.location": "/opt/keys/client.truststore.jks", # how do I make this available to the Java SDK harness 
            "ssl.truststore.password": "password",
            "ssl.keystore.type": "PKCS12",
            "ssl.keystore.location": "/opt/keys/client.keystore.p12", # how do I make this available to the Java SDK harness 
            "ssl.keystore.password": "password",
            "group.id": "group",
            "basic.auth.credentials.source": "USER_INFO",
            "schema.registry.basic.auth.user.info": "user:password"
        },
        topics=["topic"],
        max_num_records=2,
        # expansion_service="localhost:56938"
    )

    kafka | beam.Map(lambda x: print(x))

我尝试将图像覆盖选项指定为--sdk_harness_container_image_overrides='.*java.*,beam_java_sdk:latest'-beam_java_sdk:latest我基于apache/beam_java11_sdk:2.27.0的 docker 图像在哪里,它在其 entrypoint.sh 中提取凭据。但是 Beam 似乎没有使用它,我明白了

INFO  org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory  - Still waiting for startup of environment apache/beam_java11_sdk:2.27.0 for worker id 1-1

在日志中。很快不可避免地紧随其后

Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /opt/keys/client.keystore.p12 of type PKCS12

总之,我的问题是,在 Apache Beam 中,是否可以在 python 梁 sdk 的 java sdk 工具 docker 容器中提供文件?如果是这样,它会如何完成?

非常感谢。

标签: apache-kafkaapache-flinkapache-beamapache-beam-kafkaio

解决方案


目前,没有直接的方法来实现这一点。正在进行讨论和跟踪问题以支持这种扩展服务定制(请参阅此处此处BEAM-12538BEAM-12539)。这是简短的答案。

长答案是肯定的,你可以这样做。您必须将ExpansionService.java复制 & 粘贴到您的代码库中并构建您的自定义扩展服务,您可以在此处指定默认环境 (DOCKER) 和默认环境配置(您的图像。然后,您必须手动运行此扩展服务并使用expansion_serviceReadFromKafka 的参数指定其地址。


推荐阅读