python - 在 apache 梁管道 Python 中跳过步骤
问题描述
所以我正在构建一个 apache 光束管道,并且在跳过 python SDK 中的其余步骤时遇到了一些麻烦。这是一个我遇到问题的简化示例:
import apache_beam as beam
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = API_KEY
def foo(message):
pass
options = {
'streaming': True
}
runner = 'DirectRunner'
opts = beam.pipeline.PipelineOptions(flags=[], **options)
with beam.Pipeline(runner, options=opts) as p:
sub_message = (p | 'sub' >> beam.io.ReadFromPubSub(subscription=my_sub))
result = (sub_message | 'foo' >> beam.Map(foo))
result | 'print' >> beam.Map(print)
job = p.run()
if runner == 'DirectRunner':
job.wait_until_finish()
所以根据这个:Apache Beam -如果我的函数没有返回任何内容,则跳过 Java 中的管道步骤,那么 apache_beam 应该跳过其余的步骤。如果我错了,请纠正我,但在 python 中这与返回 None 相同,因此我的pass
可以替换return None
为完全相同。pass
但是当我用or运行这段代码时,return None
结果确实会进入下一步。也就是说,None
当它不应该打印任何东西时,它会继续打印,因为它应该跳过所有后续步骤。任何帮助表示赞赏。
解决方案
有趣的是,一旦我发布了这个,我就在文档中找到了答案。所以看起来在我提供的链接中,我提供的等价物是像我一样使用 ParDo 而不是地图。所以真的应该是这样的:
import apache_beam as beam
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials
class TestFn(beam.DoFn):
def process(self, element):
print('hi')
pass
options = {
'streaming': True
}
runner = 'DirectRunner'
opts = beam.pipeline.PipelineOptions(flags=[], **options)
with beam.Pipeline(runner, options=opts) as p:
sub_message = (p | 'sub' >> beam.io.ReadFromPubSub(subscription=mysub))
result = (sub_message | 'foo' >> beam.ParDo(TestFn()))
result | 'print' >> beam.Map(print)
job = p.run()
if runner == 'DirectRunner':
job.wait_until_finish()
推荐阅读
- laravel - 雄辩的 'with()' 返回 null
- r - 对具有最少条目的变量的排名求和
- java - 我可以从创建通用类对象的主方法访问插入数据吗
- tensorflow - 如果在数据集或每个数据集样本上运行,keras model.predict() 是否返回相同的值?
- r - 根据另一个数据框中的数据删除行?
- python - 如何使用 PDFplumber 在 pdf 文件中仅提取没有表格的文本?
- c# - 如何读取数据库表并将其显示在文本框中
- php - PrestaShop:创建特定价格后如何刷新购物车
- ios - 如何将一个数组中的单个项目附加到另一个数组?
- java - 检测特定块何时中断 [Spigot Plugin]