google-cloud-dataflow - 如何在 Apache Beam 管道中记录传入消息
问题描述
我正在编写一个简单的 apache 光束流管道,从 pubsub 主题获取输入并将其存储到 bigquery 中。几个小时以来,我以为我什至无法阅读消息,因为我只是试图将输入记录到控制台:
events = p | 'Read PubSub' >> ReadFromPubSub(subscription=SUBSCRIPTION)
logging.info(events)
当我将其写入文本时,它工作正常!然而我的电话logger
从来没有发生过。
人们如何开发/调试这些流媒体管道?
我尝试添加以下行:
events | 'Log' >> logging.info(events)
使用print()
也不会在控制台中产生任何结果。
解决方案
这是因为events
是 aPCollection
所以你需要PTransform
对它应用 a 。
最简单的方法是将 aParDo
应用于events
:
events | 'Log results' >> beam.ParDo(LogResults())
定义为:
class LogResults(beam.DoFn):
"""Just log the results"""
def process(self, element):
logging.info("Pub/Sub event: %s", element)
yield element
请注意,如果您想在下游应用进一步的步骤,例如在记录元素后写入接收器,我也会生成元素。例如,请参阅此处的问题。
推荐阅读
- dockerfile - 为什么docker容器会自行退出
- php - 从 PHP 中的格式化纯文本中提取数据
- ios - 处理具有相同签名但不同参数的多个函数的最有效方法
- python - PyCharm 无法从 Docker 容器打开文件:没有这样的文件或目录
- unix - 防止在 AIX Unix 中创建空的 Tar 文件
- javascript - 当停止值之间的间隙相等时,为什么画布 gradient.addColorStop() 不居中?
- ios - 如果应用程序进入后台,如何运行计时器?
- r - 如何通过字符变量在 ddply 中指定列名?
- ios - 如何使用适用于 iOS 的 AWS 设备场将警报自动接受设置为真/假?
- javascript - 按可选字段对对象进行排序而不会丢失键