google-cloud-dataflow - 如何(单元)在 python 的 apache-beam 中测试流管道?
问题描述
我写了一些流式传输管道(从 Pub/Sub 开始),我想向它添加一些窗口机制。我现在想以某种适当的方式对其进行测试,那么如何创建一些“虚拟”流?
我的代码:
pipeline_options = PipelineOptions()
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=pipeline_options, runner=DirectRunner())
xmls_beam = beam.Create(xmls)
x = p | xmls_beam | beam.FlatMap(process_xmls) | beam.ParDo(FilterTI()) | beam.WindowInto(window.FixedWindows(200)) | beam.GroupByKey()
result = p.run()
result.wait_until_finish()
解决方案
您可以使用 TimestampedValue 的 PCollection 模拟“虚拟流”。
例如,如果您的输入是:
l = [window.TimestampedValue('a', 100), window.TimestampedValue('b', 300)]
pc = p | beam.Create(l) | ...
在您的情况下(宽度为 200 的固定窗口),您可以期望输出元素“a”落入第一个窗口,“b”落入第二个窗口。
推荐阅读
- javascript - How do I sort by alphabetical order this array of objects
- google-cloud-platform - what kind of bigtable replication do my datastore entities use?
- amazon-web-services - How to setup automated deployment from CodeCommit to a Lambda Function?
- c++ - C++ program to find prime numbers in a fibonacci series
- flutter - How to get publicKey in RSA_encrypt key-pair generation algorithm in STRING format in flutter
- java - How to find all properties that i can use in the properties file in intellij
- amazon-web-services - How can I set up an Ingress to connect to a ClusterIP service?
- reactjs - React Native Component Exception
- pygame - Pygame- enemies
- mysql - 使用 group by 从子查询中获取聚合值