python - 来自 pubsub 的 Apache Beam 读取模式
问题描述
我对传输和读取流数据非常陌生,所以我希望我的问题不是太琐碎。
在将数据写入其他文件之前,我正在使用梁 Python SDK 从 PubSub 读取数据。由于我收到的数据始终采用相同的格式,因此我尝试利用架构特性来解析我从 PubSub 收到的数据。
接收到的数据始终是一个字典name: "my_name", value: 42
,所以我的管道如下所示:
import typing
import apache_beam as beam
from apache_beam.io import ReadFromPubSub
class MySchema(typing.NamedTuple):
name: str
value: int
with beam.Pipeline() as pipeline:
pipeline | ReadFromPubSub(topic=<my_subscription>).with_output_types(MySchema)
但是,然后我得到错误apache_beam.typehints.decorators.TypeCheckError: Output type hint violation at ReadFromPubSub: expected <class '__main__.MassificationImage'>, got <class 'bytes'>
这是有道理的,因为 PubSub 自然会获取字节:我可以将数据解析到字典中,然后它似乎就可以工作了。
with beam.Pipeline() as pipeline:
(pipeline
| ReadFromPubSub(topic=<my_subscription>)
| beam.Map(lambda x: json.loads(x.decode("utf8"))).with_output_types(MySchema)
它似乎工作正常,但不必将数据解析成字典会破坏模式的目的吗?有没有更直接的方法来做到这一点?
解决方案
https://beam.apache.org/documentation/programming-guide/#schemas描述了 Beam 模式的用途。假设是
通常,正在处理的记录类型具有明显的结构。常见的 Beam 源产生 JSON、Avro、Protocol Buffer 或数据库行对象;所有这些类型都有明确定义的结构......
并且模式使进一步的转换更容易。
这里的困惑是 Pub/Sub 作为源不直接提供结构化数据。或者至少 Pub/Sub 的 Beam IO 读取字节。
Pub/Sub 确实支持架构:https ://cloud.google.com/pubsub/docs/schemas 。您可以创建一个新的 IO 来合并字节读取和模式解析。
推荐阅读
- c++ - heap out of memory, noscript_shared_function_infos 持有的所有保留内存
- python - 根据文件名列表和目录列表将文件复制到新位置,但保留目录结构
- symfony4 - Symfony 4 / FosREST:API 路由返回 404 但不是 web 路由
- python - 如何计算熊猫列中带有日期时间的日期的剩余月份?
- c++ - 如何从http响应中删除垃圾数据
- .net - 如何修复应用服务的 Azure 部署槽问题?
- angular - 如何获取以前的搜索结果?
- asp.net-core - Razor Pages ASP.NET 不显眼的 ajax 发布部分更新
- php - 如何转换 1.5xxE-5
- javascript - ng-model 绑定没有正确更新