python - 这是将 Apache Beam PCollection 写入多个接收器的正确方法吗?
问题描述
我正在构建一个最终写入两个接收器(MongoDB、BigQuery)的管道。我在下面包含了一段管道,这给了我一些问题。这是发生的事情:文件内容(json 对象)读入 PCollection elements
,然后应用一系列转换,导致另一个 PCollection 称为transformed
. 这个 PCollectiontransformed
被写入 MongoDB 没有任何问题。transformed
现在,在将PCollection 写入 BigQuery 之前,我对它应用了一个额外的转换。这是我执行管道时发生错误的地方:
TypeError:无法将 ObjectId('5ee110559926384724ff5a83') 转换为 JSON 值。[在运行“WriteToBigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)”时]
我发现当我写到 MongoDb 时,它会自动为每个插入的文档添加一个属性“_id”(这里没问题)。但不知何故,稍后当我尝试写入 BigQuery 时,PCollection 中的元素transformed
现在有了这个额外的“_id”属性。这有多奇怪?PCollections 应该是不可变的吧?
到目前为止我已经尝试过 - 注释掉它写入 BigQuery 的部分以查看会发生什么。当我这样做时,它成功地将 PCollection 写入transformed
MongoDB,但出现了另一个奇怪的错误:
线程 Thread-18 中的异常:回溯(最后一次调用):_bootstrap_inner self.run() 文件中的文件“/Users/user/anaconda3/envs/project/lib/python3.7/threading.py”,第 926 行“/Users/user/anaconda3/envs/project/lib/python3.7/threading.py”,第 1177 行,运行 self.function(*self.args, **self.kwargs) 文件“/Users/user/ anaconda3/envs/project/lib/python3.7/site-packages/apache_beam/runners/direct/sdf_direct_runner.py”,第 467 行,initiate_checkpoint checkpoint_state.residual_restriction = tracker.checkpoint() AttributeError: '_SDFBoundedSourceRestrictionTracker' 对象没有属性'检查点'
elements, files_read = (
p
| 'ReadFromGCS' >> beam.io.ReadFromTextWithFilename(file_pattern=file_pattern, coder=JsonCoder())
| 'aTransformWithTaggedOutput' >> beam.ParDo(aTransform()).with_outputs('taggedOutputFilesRead',
main='elements')
)
deferred_side_input_1 = beam.pvalue.AsIter((
p
| 'QueryFromBigQueryTable' >> beam.io.Read(beam.io.BigQuerySource(query='SELECT col1 from dataset.table'))
))
deferred_side_input_2 = beam.pvalue.AsIter((
p
| 'ReadFromBigQueryTable' >> beam.io.Read(beam.io.BigQuerySource(dataset='bq_dataset', table='bq_table'))
))
transformed, tagged_output = (
elements
| 'Series' >> beam.ParDo(aTransform())
| 'of' >> beam.ParDo(anotherTransform())
| 'transforms' >> beam.ParDo(anotherTransform())
| '...' >> beam.ParDo(anotherTransform())
| '...' >> beam.ParDo(anotherTransform())
| '...' >> beam.ParDo(anotherTransform(), deferred_side_input1)
| 'transformWithTaggedOutput' >> beam.ParDo(transformWithTaggedOutput(), deferred_side_input_2).with_outputs('tagged_output',
main='transformed')
)
"""Write `transformed` PCollection to MongoDB"""
transformed | 'WriteToMongo' >> beam.io.WriteToMongoDB(uri='mongoURI',
db='mongoDB',
coll='mongoCollection')
"""Perform an additional transform to `transformed` PCollection, Write to BigQuery"""
_ = (
transformed
| 'AdditionalTransform' >> beam.ParDo(additionalTransform())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
table='bigqueryTable',
dataset='bigqueryDataset',
schema=beam.io.gcp.bigquery_tools.parse_table_schema_from_json(bq_schema),
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
validate=True)
)
"""(No issues with this write to BQ) Write `tagged_output` PCollection to BigQuery"""
tagged_output | 'WriteTaggedOutputToBigQuery' >> beam.io.WriteToBigQuery(
table='other_bq_table,
dataset='bq_dataset,
schema=beam.io.gcp.bigquery_tools.parse_table_schema_from_json(other_bq_schema),
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
validate=True)
解决方案
推荐阅读
- reactjs - React Native - 模态 - 动态最大高度
- reactjs - 在个人选项卡中打开 React 组件
- python - 为什么终端给我这个错误并且所有模块都被导入了?
- android - SSO 谷歌登录无法登录
- .htaccess - 重写规则不适用于 apache2,lucee tomcat
- woocommerce - 将自定义 display_type 添加到 WooCommerce 类别(product_cat 分类)
- go - 使用可选查询参数构造 SQL 查询
- java - Java RecyclerView 重新排序和保留序列
- windows-10 - ICL 19.2:更新 VS2019 现在开始使用 is_always_equal _CXX17_DEPRECATE_OLD_ALLOCATOR_MEMBERS = true_type
- excel - Excel向上查找