首页 > 解决方案 > 如何正确打包 Apache Beam 项目以在 Google Dataflow 上运行

问题描述

我正在努力寻找最佳项目/代码结构来在 Google Dataflow 上运行我的基于 Python 的 Apache Beam 项目。使用我当前的设置,我正在部署所有内容,但是一旦我的管道通过 Googles Pub/Sub 接收数据,它就会引发如下异常:

... some more lines ...
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1582, in _create_pardo_operation dofn_data = pickler.loads(serialized_fn) 
File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 294, in loads return dill.loads(s) 
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 275, in loads return load(file, ignore, **kwds) 
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 270, in load return Unpickler(file, ignore=ignore, **kwds).load() 
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load obj = StockUnpickler.load(self) 
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 462, in find_class return StockUnpickler.find_class(self, module, name) 

ModuleNotFoundError: No module named 'coruscantbeam.coruscantbeam' passed through: ==> dist_proc/dax/workflow/worker/fnapi_service.cc:631

这是我目前的项目结构:

\coruscantbeam
   __init__.py
  setup.py
  README.md
  \coruscantbeam
    __init__.py
    dataflow.py
    \pardos
      __init__.py
      mypardo.py

要将项目部署到 Google Dataflow,我直接调用 dataflow.py。文件 dataflow.py 如下所示:

import apache_beam as beam
import os
from .pardos.mypardo import MyPardo

pipeline_options = PipelineOptions(
        streaming=streaming, save_main_session=True,
        setup_file=os.path.join(os.path.dirname(__file__), "..", "setup.py"),
)

def run_beam():
  with beam.Pipeline(options=pipeline_options) as p:
    p | beam.io.ReadFromPubSub(topic=topic) | beam.ParDo(MyPardo())

if __name__ == "__main__":
  run_beam()

这是一些“伪”代码,但原始代码在本地工作,我可以将其部署到云端,但如前所述,它不会处理导致模块导入错误的数据。

我已经玩过各种结构,这是让我最接近运行的东西的结构;)

标签: pythonprojectgoogle-cloud-dataflowapache-beam

解决方案


我终于设法将它正确包装。这是它最终的样子:

\coruscantbeam
  setup.py
  requirements.txt
  README.md
  \coruscantbeam
    __init__.py
    __main__.py
    main.py
    \pardos
      __init__.py
      mypardo.py

main.py看起来像这样:

import apache_beam as beam
from apache_beam.options.pipeline_options import (StandardOptions)
import os
from .pardos.mypardo import MyPardo

pipeline_options = PipelineOptions(
        streaming=streaming, save_main_session=True,
        setup_file=os.path.join(os.path.dirname(__file__), "..", "setup.py"),
requirements_file=os.path.join(os.path.dirname(__file__), "..", "requirements.txt"),
)

def run():
  p = beam.Pipeline(options=pipeline_options):
  p | beam.io.ReadFromPubSub(topic=topic) | beam.ParDo(MyPardo())

  pipeline_result = p.run()

  # Used while testing locally
  if pipeline_options.view_as(StandardOptions).runner == "DirectRunner":
    pipeline_result.wait_until_finish()

__main__.py看起来像这样:

import os
import logging

if __name__ == "__main__":
    from .main import run
    logging.getLogger().setLevel(level=logging.INFO)
    run()

最后setup.py

import setuptools

setuptools.setup(
    name='coruscantbeam',
    version='1.0',
    author="Indiana Jones",
    author_email="youremail@domain.tld",
    url="https://blabla.com",
    data_files = [('', ['coruscantbeam/some_schema.json'])],
    include_package_data=True,
    packages=setuptools.find_packages(),
)

一切都从调用开始:python -m coruscantbeam

我对结果很满意,而且看起来很干净。


推荐阅读