python - 通过依赖管道处理 Dataflow/Apache Beam 中的拒绝
问题描述
我有一个从 BigQuery 获取数据并将其写入 GCS 的管道,但是,如果我发现任何拒绝,我想将它们正确地写入 Bigquery 表。我将拒绝收集到全局列表变量中,然后将列表加载到 BigQuery 表中。当我在本地运行它时,这个过程运行良好,因为管道以正确的顺序运行。当我使用dataflowrunner运行它时,它不能保证顺序(我希望pipeline1在pipeline2之前运行。有没有办法使用python在Dataflow中拥有依赖管道?或者也请建议是否可以用更好的方法解决这个问题。提前致谢。
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:
data = (pipeline1
| 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))
| 'combine output to list' >> beam.combiners.ToList()
| 'tranform' >> beam.Map(lambda x: somefunction) # Collecting rejects in the except block of this method to a global list variable
....etc
| 'to gcs' >> beam.io.WriteToText(output)
)
# Loading the rejects gathered in the above pipeline to Biquery
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline2:
rejects = (pipeline2
| 'create pipeline' >> beam.Create(reject_list)
| 'to json format' >> beam.Map(lambda data: {.....})
| 'to bq' >> beam.io.WriteToBigQuery(....)
)
解决方案
您可以做类似的事情,但只有 1 个管道,以及转换中的一些额外代码。
应该有两个输出:beam.Map(lambda x: somefunction)
一个被写入 GCS,被拒绝的元素最终将被写入 BigQuery。
为此,您的转换函数必须返回一个TaggedOutput
.
Beam 编程指南中有一个示例:https ://beam.apache.org/documentation/programming-guide/#multiple-outputs-dofn
第二个PCollection
,然后您可以写入 BigQuery。
您不需要Create
在管道的第二个分支中有一个。
管道将是这样的:
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:
data = (pipeline1
| 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))
| 'combine output to list' >> beam.combiners.ToList()
| 'tranform' >> beam.Map(transform) # Tagged output produced here
pcoll_to_gcs = data.gcs_output
pcoll_to_bq = data.rejected
pcoll_to_gcs | "to gcs" >> beam.io.WriteToText(output)
pcoll_to_bq | "to bq" >> beam.io.WriteToBigQuery(....)
那么transform
函数将是这样的
def transform(element):
if something_is_wrong_with_element:
yield pvalue.TaggedOutput('rejected', element)
transformed_element = ....
yield pvalue.TaggedOutput('gcs_output', transformed_element)
推荐阅读
- jquery - 在表格ajax上显示json
- kubernetes - OKD 无法在通过 Jenkinsx 部署微服务后立即从内部注册表中提取较大的图像
- c++ - 如果用户输入对字符串变量无效,如何重复输入命令 - C++
- sql - 在 SQL Server 中连接多个表时如何正确创建别名
- angular - 如何以角度处理动态图片库
- visual-studio - 无法在 IIS Express 上运行为 Docker 准备的项目。CreateHostBuilder: DirectoryNotFoundException
- html - 如何移动这个盒子阴影?
- node.js - Sequelize v4 强制使用 writer db 实例进行 findAll 读取查询
- python - 在 Python 中将两个列表合二为一
- java - 如何使用 React js 在 SpringBoot 中的 DTO 中发送 MultipartFile