python - 使用 Apache Beam(GCP 数据流)写入 Kafka
问题描述
我正在尝试WriteToKafka
使用 Dataflow 作为运行器通过 Apache Beam 将数据发送到 Python 中的 Kafka 主题。
通过运行以下脚本:
with beam.Pipeline(options=beam_options) as p:
(p
| beam.Impulse()
| beam.Map(lambda input: (1, input))
| WriteToKafka(
producer_config={
'bootstrap.servers': 'ip:9092,',
},
topic='testclient',
key_serializer='org.apache.kafka.common.serialization.LongSerializer',
value_serializer='org.apache.kafka.common.serialization.ByteArraySerializer',
)
)
我收到此错误:
Traceback (most recent call last):
File "/home/denes/data-science/try_write_to_kafka.py", line 75, in <module>
run_pipeline(beam_options)
File "/home/denes/data-science/try_write_to_kafka.py", line 38, in run_pipeline
(p
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 582, in __exit__
self.result = self.run()
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 529, in run
return Pipeline.from_runner_api(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 904, in from_runner_api
p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
part = context.transforms.get_by_id(transform_id)
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
part = context.transforms.get_by_id(transform_id)
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
part = context.transforms.get_by_id(transform_id)
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
part = context.transforms.get_by_id(transform_id)
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1236, in from_runner_api
transform = ptransform.PTransform.from_runner_api(proto, context)
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py", line 700, in from_runner_api
return constructor(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1419, in from_runner_api_parameter
DoFnInfo.from_runner_api(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1493, in from_runner_api
raise ValueError('Unexpected DoFn type: %s' % spec.urn)
ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1
请注意,我已经通过pip install 'apache-beam[gcp]'
.
- 阿帕奇梁==2.27.0
- 谷歌云核心==1.5.0
如果我没记错的话,问题出在序列化方法上。我尝试了在此页面上找到的各种组合。
我错过了什么,我应该做些什么不同的事情?
解决方案
解决方案是对键和值都使用显式类型转换。
| 'Convert dict to byte string' >> beam.Map(lambda x: (b'', json.dumps(x).encode('utf-8')))
.with_output_types(typing.Tuple[bytes, bytes])
| "Write to Kafka topic" >> WriteToKafka(
producer_config={'bootstrap.servers': consumer_servers},
topic='testclient')
)
推荐阅读
- react-native - secureTextEntry 没有散列我的密码 React Native
- sql - sqoop 增量计数差异
- python - 如何简化python中分布的列表理解
- powershell - 如何使用 cmd 或 powershell 更改 BIOS 启动顺序?
- c# - Unity - 如何从另一个项目访问一个类?
- java - 通过字段“clientRepository”表示的不满足的依赖关系;
- java - 无法使用 spring 5 自动装配 hibernate 5 sessionfactory
- windows - Windows 命令行日期
- python - 仅在 python 中使用 csv reader 读取前 N 行 csv 文件
- magento2 - Magento-2 结账时需要增值税号