首页 > 解决方案 > GCP Composer/Airflow 调用 Dataflow/beam throwing 错误

问题描述

我有一个 GCP Cloud Composer 环境,composer-1.10.0-airflow-1.10.6准确地说是气流版本和 python 3、3.6。我正在使用python_operator.PythonOperator, 运算符在 Dataflow 上调用 apache-beam 管道。这是代码片段

调用管道函数

test_greeting = python_operator.PythonOperator(
    task_id='python_pipeline',
    python_callable=run_pipeline
)

流水线功能如下

def run_pipeline():
    print("Test Pipeline")

    pipeline_args=[
        "--runner","DataflowRunner",
        "--project","*****",
        "--temp_location","gs://******/temp",
        "--region","us-east1",
        "--job_name","job1199",
        "--zone","us-east1-b"

    ]

    pipeline_options=PipelineOptions(pipeline_args)
    pipe=beam.Pipeline(options=pipeline_options)

    small_sum = (
        pipe
        | beam.Create([18,5,7,7,9,23,13,5])
        | "Combine Globally" >> beam.CombineGlobally(AverageFn())
        | 'Write results' >> beam.io.WriteToText('gs://******/ouptut_from_pipline/combine')
    )

    run_result=pipe.run()
    run_result.wait_until_finish()

    return "True"

当我运行它时,管道执行在数据流中运行,但失败并出现以下错误

    Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line 648, in do_work
    work_executor.execute()
  File "/usr/local/lib/python3.6/site-packages/dataflow_worker/executor.py", line 150, in execute
    test_shuffle_sink=self._test_shuffle_sink)
  File "/usr/local/lib/python3.6/site-packages/dataflow_worker/executor.py", line 116, in create_operation
    is_streaming=False)
  File "apache_beam/runners/worker/operations.py", line 1032, in apache_beam.runners.worker.operations.create_operation
  File "apache_beam/runners/worker/operations.py", line 845, in apache_beam.runners.worker.operations.create_pgbk_op
  File "apache_beam/runners/worker/operations.py", line 903, in apache_beam.runners.worker.operations.PGBKCVOperation.__init__
  File "/usr/local/lib/python3.6/site-packages/apache_beam/internal/pickler.py", line 290, in loads
    return dill.loads(s)
  File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 275, in loads
    return load(file, ignore, **kwds)
  File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 270, in load
    return Unpickler(file, ignore=ignore, **kwds).load()
  File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 472, in load
    obj = StockUnpickler.load(self)
  File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 462, in find_class
    return StockUnpickler.find_class(self, module, name)
ModuleNotFoundError: No module named 'unusual_prefix_162ac8b7030d5bd1ff5f128a26483932d3968a4d_python_bash'

光束版本是Apache Beam Python 3.6 SDK 2.19.0. 我怀疑 Python 3.6 的版本可能是问题,因为从我的本地系统直接(作为运行器)调用管道工作正常,并且我的本地系统正在运行 python 3.7。

不过,我找不到测试这个理论的方法。

获取有关如何解决此问题的提示会很有帮助。

标签: airflowgoogle-cloud-dataflowapache-beamgoogle-cloud-composer

解决方案


推荐阅读