python - Python 上的 Apache Beam:TypeError:Receiver() 不接受任何参数
问题描述
我正在尝试在 Python 上学习 Apache Beam,但我的示例都不起作用。
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
options = PipelineOptions()
p = beam.Pipeline(options=options)
input_filename = "./data/kinglear.txt"
p | beam.io.ReadFromText(input_filename)
result = p.run()
结果是
Traceback (most recent call last):
File "C:/Projects/Perm/PythonPOC/Beam/Main.py", line 10, in <module>
result = p.run()
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\pipeline.py", line 416, in run
self._options).run(False)
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\pipeline.py", line 429, in run
return self.runner.run_pipeline(self, self._options)
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 135, in run_pipeline
return runner.run_pipeline(pipeline, options)
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\portability\fn_api_runner.py", line 389, in run_pipeline
default_environment=self._default_environment))
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\portability\fn_api_runner.py", line 396, in run_via_runner_api
return self.run_stages(stage_context, stages)
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\portability\fn_api_runner.py", line 478, in run_stages
stage_context.safe_coders)
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\portability\fn_api_runner.py", line 774, in _run_stage
result, splits = bundle_manager.process_bundle(data_input, data_output)
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\portability\fn_api_runner.py", line 1811, in process_bundle
part, expected_outputs), part_inputs):
File "C:\ProgramData\Anaconda3\lib\concurrent\futures\_base.py", line 586, in result_iterator
yield fs.pop().result()
File "C:\ProgramData\Anaconda3\lib\concurrent\futures\_base.py", line 432, in result
return self.__get_result()
File "C:\ProgramData\Anaconda3\lib\concurrent\futures\_base.py", line 384, in __get_result
raise self._exception
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\utils\thread_pool_executor.py", line 42, in run
self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\portability\fn_api_runner.py", line 1811, in <lambda>
part, expected_outputs), part_inputs):
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\portability\fn_api_runner.py", line 1747, in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\portability\fn_api_runner.py", line 1173, in push
response = self.worker.do_instruction(request)
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 291, in do_instruction
request.instruction_id)
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 317, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 675, in process_bundle
data.transform_id].process_encoded(data.data)
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 146, in process_encoded
self.output(decoded_value)
File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\worker\operations.py", line 259, in output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "C:\ProgramData\Anaconda3\lib\site-packages\Cython\Shadow.py", line 165, in cast
return type(*args)
TypeError: Receiver() takes no arguments
我无法运行任何论坛上给出的任何示例,所有示例都导致相同的错误。不知道需要做什么来修复它,我是 Apache 梁的新手。
解决方案
Apache Beam 中仍然存在问题。另一种解决方案是使用virtualenv
for DirectRunner
。Apache Beam 文档中已经提供了一个简短的教程。创建 virtualenv 不会自动加载 CPython,因此可以避免错误。
推荐阅读
- reactjs - 在 Netlify 上部署时无法显示 app.js 的内容(npm run build)
- symfony - 如何让 Symfony 和 Twig 使用模板
- c++ - 为什么当我使用 constexpr 执行我的代码时,它有时会在编译时评估,有时会在运行时评估?
- php - 使用 PHP 创建 WHM 帐户列表
- javascript - 在 React 中使表格单元格可点击
- java - 如何对 indexedEmbedded 中的字段进行排序
- javascript - 将其推入数组 onClick
- swift - 为 URL 类型实现 KeyedDecodingContainerProtocol
- firebase - 事务仅适用于同时调用一次
- php - Laravel 路由到子域