首页 > 解决方案 > 运行 GCP Dataflow 时出现 Python 包错误

问题描述

这个错误是突然发生的。数据流上没有任何变化。我们看到错误“NameError: global name 'firestore' is not defined [while running 'generatedPtransform-12478']”看起来在工作节点上安装软件包时出现问题

我在“DirectRunner”上本地尝试了相同的管道,效果很好。我们参考了链接上的“NameErrors”文档“ https://cloud.google.com/dataflow/docs/resources/faq#how-can-i-tell-what-version-of-the-cloud-dataflow- sdk-is-installedrunning-in-my-environment ”并尝试了以下几件事

1.'save_main_session':真正的管道参数

2.将所有包“导入”命令从全局移动到函数范围

我们在 requirements.txt 中有以下包,

    import datetime
    import json
    import apache_beam as beam
    from apache_beam.options.pipeline_options import PipelineOptions
    from google.cloud import firestore
    import yaml
    from functools import reduce
    from dateutil.parser import parse

    class PubSubToDict(beam.DoFn):
         <...to process elements>

    class WriteToFS(beam.DoFn):
         <...to write data to firestore>

    pipeline_options = {
        'project': PROJECT,
        'staging_location': 'gs://' + BUCKET + '/staging',
        'temp_location': 'gs://' + BUCKET + '/temp',
        'runner': 'DataflowRunner',
        'job_name': JOB_NAME,
        'disk_size_gb': 100,
        'save_main_session': True,
        'region': 'europe-west1',
        'requirements_file': 'requirements.txt',
        'streaming': True
    }

    with beam.Pipeline(options=options) as p:

        lines = (p | "Read from PubSub" >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
                   | "Transformation" >> beam.ParDo(PubSubToDict()))

        FSWrite = (lines | 'Write To Firestore' >> beam.ParDo(WriteToFS()))```

标签: pipstreaminggoogle-cloud-dataflow

解决方案


推荐阅读