airflow - GCP Composer/Airflow 调用 Dataflow/beam throwing 错误
问题描述
我有一个 GCP Cloud Composer 环境,composer-1.10.0-airflow-1.10.6
准确地说是气流版本和 python 3、3.6。我正在使用python_operator.PythonOperator
, 运算符在 Dataflow 上调用 apache-beam 管道。这是代码片段
调用管道函数
test_greeting = python_operator.PythonOperator(
task_id='python_pipeline',
python_callable=run_pipeline
)
流水线功能如下
def run_pipeline():
print("Test Pipeline")
pipeline_args=[
"--runner","DataflowRunner",
"--project","*****",
"--temp_location","gs://******/temp",
"--region","us-east1",
"--job_name","job1199",
"--zone","us-east1-b"
]
pipeline_options=PipelineOptions(pipeline_args)
pipe=beam.Pipeline(options=pipeline_options)
small_sum = (
pipe
| beam.Create([18,5,7,7,9,23,13,5])
| "Combine Globally" >> beam.CombineGlobally(AverageFn())
| 'Write results' >> beam.io.WriteToText('gs://******/ouptut_from_pipline/combine')
)
run_result=pipe.run()
run_result.wait_until_finish()
return "True"
当我运行它时,管道执行在数据流中运行,但失败并出现以下错误
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line 648, in do_work
work_executor.execute()
File "/usr/local/lib/python3.6/site-packages/dataflow_worker/executor.py", line 150, in execute
test_shuffle_sink=self._test_shuffle_sink)
File "/usr/local/lib/python3.6/site-packages/dataflow_worker/executor.py", line 116, in create_operation
is_streaming=False)
File "apache_beam/runners/worker/operations.py", line 1032, in apache_beam.runners.worker.operations.create_operation
File "apache_beam/runners/worker/operations.py", line 845, in apache_beam.runners.worker.operations.create_pgbk_op
File "apache_beam/runners/worker/operations.py", line 903, in apache_beam.runners.worker.operations.PGBKCVOperation.__init__
File "/usr/local/lib/python3.6/site-packages/apache_beam/internal/pickler.py", line 290, in loads
return dill.loads(s)
File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 275, in loads
return load(file, ignore, **kwds)
File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 270, in load
return Unpickler(file, ignore=ignore, **kwds).load()
File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 472, in load
obj = StockUnpickler.load(self)
File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 462, in find_class
return StockUnpickler.find_class(self, module, name)
ModuleNotFoundError: No module named 'unusual_prefix_162ac8b7030d5bd1ff5f128a26483932d3968a4d_python_bash'
光束版本是Apache Beam Python 3.6 SDK 2.19.0
. 我怀疑 Python 3.6 的版本可能是问题,因为从我的本地系统直接(作为运行器)调用管道工作正常,并且我的本地系统正在运行 python 3.7。
不过,我找不到测试这个理论的方法。
获取有关如何解决此问题的提示会很有帮助。
解决方案
推荐阅读
- git - 在哪里可以找到 redhat 7 的 git-svn 缺少的依赖项
- elasticsearch - 为什么 Kibana 不监听 5061 端口?
- abap - SELECT 的内联数据后出错:“不包含结构...”
- c# - 在 asp.net core 3 中设置 httpcontext
- javascript - 当按下 ENTER 或按下它的按钮时,使表单运行 javascript,而不提交
- c# - 绑定 WPF C# 后,MVVM 中 DataGridListView 的 SelectedItem 为空
- flutter - 如何自定义 CupertinoTabBar
- image - 是否可以将图像用于 ipywidget 按钮
- android - 如何收听android MediaPlayer位置变化事件
- javascript - 如何查看我未创建的网站的 .js 文件