首页 > 解决方案 > 来自 pubsub 的 Apache Beam 读取模式

问题描述

我对传输和读取流数据非常陌生,所以我希望我的问题不是太琐碎。

在将数据写入其他文件之前,我正在使用梁 Python SDK 从 PubSub 读取数据。由于我收到的数据始终采用相同的格式,因此我尝试利用架构特性来解析我从 PubSub 收到的数据。

接收到的数据始终是一个字典name: "my_name", value: 42,所以我的管道如下所示:

import typing

import apache_beam as beam
from apache_beam.io import ReadFromPubSub


class MySchema(typing.NamedTuple):
    name: str
    value: int

with beam.Pipeline() as pipeline:
    pipeline | ReadFromPubSub(topic=<my_subscription>).with_output_types(MySchema)

但是,然后我得到错误apache_beam.typehints.decorators.TypeCheckError: Output type hint violation at ReadFromPubSub: expected <class '__main__.MassificationImage'>, got <class 'bytes'>

这是有道理的,因为 PubSub 自然会获取字节:我可以将数据解析到字典中,然后它似乎就可以工作了。

with beam.Pipeline() as pipeline:
    (pipeline 
        | ReadFromPubSub(topic=<my_subscription>)
        | beam.Map(lambda x: json.loads(x.decode("utf8"))).with_output_types(MySchema)

它似乎工作正常,但不必将数据解析成字典会破坏模式的目的吗?有没有更直接的方法来做到这一点?

标签: pythonapache-beam

解决方案


https://beam.apache.org/documentation/programming-guide/#schemas描述了 Beam 模式的用途。假设是

通常,正在处理的记录类型具有明显的结构。常见的 Beam 源产生 JSON、Avro、Protocol Buffer 或数据库行对象;所有这些类型都有明确定义的结构......

并且模式使进一步的转换更容易。

这里的困惑是 Pub/Sub 作为源不直接提供结构化数据。或者至少 Pub/Sub 的 Beam IO 读取字节。

Pub/Sub 确实支持架构:https ://cloud.google.com/pubsub/docs/schemas 。您可以创建一个新的 IO 来合并字节读取和模式解析。


推荐阅读