python - 如何正确打包 Apache Beam 项目以在 Google Dataflow 上运行
问题描述
我正在努力寻找最佳项目/代码结构来在 Google Dataflow 上运行我的基于 Python 的 Apache Beam 项目。使用我当前的设置,我正在部署所有内容,但是一旦我的管道通过 Googles Pub/Sub 接收数据,它就会引发如下异常:
... some more lines ...
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1582, in _create_pardo_operation dofn_data = pickler.loads(serialized_fn)
File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 294, in loads return dill.loads(s)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 275, in loads return load(file, ignore, **kwds)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 270, in load return Unpickler(file, ignore=ignore, **kwds).load()
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load obj = StockUnpickler.load(self)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 462, in find_class return StockUnpickler.find_class(self, module, name)
ModuleNotFoundError: No module named 'coruscantbeam.coruscantbeam' passed through: ==> dist_proc/dax/workflow/worker/fnapi_service.cc:631
这是我目前的项目结构:
\coruscantbeam
__init__.py
setup.py
README.md
\coruscantbeam
__init__.py
dataflow.py
\pardos
__init__.py
mypardo.py
要将项目部署到 Google Dataflow,我直接调用 dataflow.py。文件 dataflow.py 如下所示:
import apache_beam as beam
import os
from .pardos.mypardo import MyPardo
pipeline_options = PipelineOptions(
streaming=streaming, save_main_session=True,
setup_file=os.path.join(os.path.dirname(__file__), "..", "setup.py"),
)
def run_beam():
with beam.Pipeline(options=pipeline_options) as p:
p | beam.io.ReadFromPubSub(topic=topic) | beam.ParDo(MyPardo())
if __name__ == "__main__":
run_beam()
这是一些“伪”代码,但原始代码在本地工作,我可以将其部署到云端,但如前所述,它不会处理导致模块导入错误的数据。
我已经玩过各种结构,这是让我最接近运行的东西的结构;)
解决方案
我终于设法将它正确包装。这是它最终的样子:
\coruscantbeam
setup.py
requirements.txt
README.md
\coruscantbeam
__init__.py
__main__.py
main.py
\pardos
__init__.py
mypardo.py
main.py
看起来像这样:
import apache_beam as beam
from apache_beam.options.pipeline_options import (StandardOptions)
import os
from .pardos.mypardo import MyPardo
pipeline_options = PipelineOptions(
streaming=streaming, save_main_session=True,
setup_file=os.path.join(os.path.dirname(__file__), "..", "setup.py"),
requirements_file=os.path.join(os.path.dirname(__file__), "..", "requirements.txt"),
)
def run():
p = beam.Pipeline(options=pipeline_options):
p | beam.io.ReadFromPubSub(topic=topic) | beam.ParDo(MyPardo())
pipeline_result = p.run()
# Used while testing locally
if pipeline_options.view_as(StandardOptions).runner == "DirectRunner":
pipeline_result.wait_until_finish()
__main__.py
看起来像这样:
import os
import logging
if __name__ == "__main__":
from .main import run
logging.getLogger().setLevel(level=logging.INFO)
run()
最后setup.py
:
import setuptools
setuptools.setup(
name='coruscantbeam',
version='1.0',
author="Indiana Jones",
author_email="youremail@domain.tld",
url="https://blabla.com",
data_files = [('', ['coruscantbeam/some_schema.json'])],
include_package_data=True,
packages=setuptools.find_packages(),
)
一切都从调用开始:python -m coruscantbeam
我对结果很满意,而且看起来很干净。
推荐阅读
- mysql - 从主机网络的 sql-client 连接到 docker 容器时出错
- java - 表单数据休息 Web 服务中的德语/特殊字符支持
- ruby-on-rails - 嵌入式表单上的 CSRF 问题
- neo4j - 检查neo4j的collect中是否存在序列
- r - 删除行取决于另一行 R 中的列值
- powershell - 如何使用powershell读取特定行并将值保存在表中
- bjam - 访问 jam 配置文件中的环境变量
- c++ - 如何让 Visual Studio 自动检测子文件夹中的 .cpp 文件?
- c# - 正则表达式 - 最少 6 个字符且没有空格。其他一切都允许
- jquery - 使用 jQuery 数据表中的列过滤器下拉列表修复标题