首页 > 解决方案 > 使用 Apache Beam(GCP 数据流)写入 Kafka

问题描述

我正在尝试WriteToKafka使用 Dataflow 作为运行器通过 Apache Beam 将数据发送到 Python 中的 Kafka 主题。

通过运行以下脚本:

    with beam.Pipeline(options=beam_options) as p:
        (p
        | beam.Impulse()
        | beam.Map(lambda input: (1, input))
        | WriteToKafka(
                    producer_config={
                        'bootstrap.servers': 'ip:9092,',
                    },
                    topic='testclient',
                    key_serializer='org.apache.kafka.common.serialization.LongSerializer',
                    value_serializer='org.apache.kafka.common.serialization.ByteArraySerializer',
                )
         )

我收到此错误:

Traceback (most recent call last):
  File "/home/denes/data-science/try_write_to_kafka.py", line 75, in <module>
    run_pipeline(beam_options)
  File "/home/denes/data-science/try_write_to_kafka.py", line 38, in run_pipeline
    (p
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 582, in __exit__
    self.result = self.run()
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 529, in run
    return Pipeline.from_runner_api(
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 904, in from_runner_api
    p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1236, in from_runner_api
    transform = ptransform.PTransform.from_runner_api(proto, context)
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py", line 700, in from_runner_api
    return constructor(
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1419, in from_runner_api_parameter
    DoFnInfo.from_runner_api(
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1493, in from_runner_api
    raise ValueError('Unexpected DoFn type: %s' % spec.urn)
ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1

请注意,我已经通过pip install 'apache-beam[gcp]'.

如果我没记错的话,问题出在序列化方法上。我尝试了在页面上找到的各种组合。

我错过了什么,我应该做些什么不同的事情?

标签: pythongoogle-cloud-platformapache-kafkagoogle-cloud-dataflow

解决方案


解决方案是对键和值都使用显式类型转换。

    | 'Convert dict to byte string' >> beam.Map(lambda x: (b'', json.dumps(x).encode('utf-8')))
      .with_output_types(typing.Tuple[bytes, bytes])
    | "Write to Kafka topic" >> WriteToKafka(
        producer_config={'bootstrap.servers': consumer_servers},
        topic='testclient')
      )

推荐阅读