首页 > 解决方案 > 这是将 Apache Beam PCollection 写入多个接收器的正确方法吗?

问题描述

我正在构建一个最终写入两个接收器(MongoDB、BigQuery)的管道。我在下面包含了一段管道,这给了我一些问题。这是发生的事情:文件内容(json 对象)读入 PCollection elements,然后应用一系列转换,导致另一个 PCollection 称为transformed. 这个 PCollectiontransformed被写入 MongoDB 没有任何问题。transformed现在,在将PCollection 写入 BigQuery 之前,我对它应用了一个额外的转换。这是我执行管道时发生错误的地方:

TypeError:无法将 ObjectId('5ee110559926384724ff5a83') 转换为 JSON 值。[在运行“WriteToBigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)”时]

我发现当我写到 MongoDb 时,它会自动为每个插入的文档添加一个属性“_id”(这里没问题)。但不知何故,稍后当我尝试写入 BigQuery 时,PCollection 中的元素transformed现在有了这个额外的“_id”属性。这有多奇怪?PCollections 应该是不可变的吧?

到目前为止我已经尝试过 - 注释掉它写入 BigQuery 的部分以查看会发生什么。当我这样做时,它成功地将 PCollection 写入transformedMongoDB,但出现了另一个奇怪的错误:

线程 Thread-18 中的异常:回溯(最后一次调用):_bootstrap_inner self.run() 文件中的文件“/Users/user/anaconda3/envs/project/lib/python3.7/threading.py”,第 926 行“/Users/user/anaconda3/envs/project/lib/python3.7/threading.py”,第 1177 行,运行 self.function(*self.args, **self.kwargs) 文件“/Users/user/ anaconda3/envs/project/lib/python3.7/site-packages/apache_beam/runners/direct/sdf_direct_runner.py”,第 467 行,initiate_checkpoint checkpoint_state.residual_restriction = tracker.checkpoint() AttributeError: '_SDFBoundedSourceRestrictionTracker' 对象没有属性'检查点'

elements, files_read = (
                p
                | 'ReadFromGCS' >> beam.io.ReadFromTextWithFilename(file_pattern=file_pattern, coder=JsonCoder())
                | 'aTransformWithTaggedOutput' >> beam.ParDo(aTransform()).with_outputs('taggedOutputFilesRead',
                                                                        main='elements')
        )

deferred_side_input_1 = beam.pvalue.AsIter((
                p
                | 'QueryFromBigQueryTable' >> beam.io.Read(beam.io.BigQuerySource(query='SELECT col1 from dataset.table'))
        ))

deferred_side_input_2 = beam.pvalue.AsIter((
                p
                | 'ReadFromBigQueryTable' >> beam.io.Read(beam.io.BigQuerySource(dataset='bq_dataset', table='bq_table'))
        ))

transformed, tagged_output = (
                    elements
                    | 'Series' >> beam.ParDo(aTransform())
                    | 'of' >> beam.ParDo(anotherTransform())
                    | 'transforms' >> beam.ParDo(anotherTransform())
                    | '...' >> beam.ParDo(anotherTransform())
                    | '...' >> beam.ParDo(anotherTransform())
                    | '...' >> beam.ParDo(anotherTransform(), deferred_side_input1)
                    | 'transformWithTaggedOutput' >> beam.ParDo(transformWithTaggedOutput(), deferred_side_input_2).with_outputs('tagged_output',
                                                                                 main='transformed')
            )


"""Write `transformed` PCollection to MongoDB"""
transformed | 'WriteToMongo' >> beam.io.WriteToMongoDB(uri='mongoURI',
                                                       db='mongoDB',
                                                       coll='mongoCollection')

"""Perform an additional transform to `transformed` PCollection, Write to BigQuery"""
_ = (
transformed
| 'AdditionalTransform' >> beam.ParDo(additionalTransform())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
                      table='bigqueryTable',
                      dataset='bigqueryDataset',
                      schema=beam.io.gcp.bigquery_tools.parse_table_schema_from_json(bq_schema),
                      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                      write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                      validate=True)
            )

"""(No issues with this write to BQ) Write `tagged_output` PCollection to BigQuery"""
    tagged_output | 'WriteTaggedOutputToBigQuery' >> beam.io.WriteToBigQuery(
        table='other_bq_table,
        dataset='bq_dataset,
        schema=beam.io.gcp.bigquery_tools.parse_table_schema_from_json(other_bq_schema),
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        validate=True)

标签: pythonmongodbapache-beam

解决方案


推荐阅读