首页 > 解决方案 > Python 上的 Apache Beam:TypeError:Receiver() 不接受任何参数

问题描述

我正在尝试在 Python 上学习 Apache Beam,但我的示例都不起作用。

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions()
p = beam.Pipeline(options=options)
input_filename = "./data/kinglear.txt"

p | beam.io.ReadFromText(input_filename)

result = p.run()

结果是

Traceback (most recent call last):
  File "C:/Projects/Perm/PythonPOC/Beam/Main.py", line 10, in <module>
    result = p.run()
  File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\pipeline.py", line 416, in run
    self._options).run(False)
  File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\pipeline.py", line 429, in run
    return self.runner.run_pipeline(self, self._options)
  File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 135, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\portability\fn_api_runner.py", line 389, in run_pipeline
    default_environment=self._default_environment))
  File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\portability\fn_api_runner.py", line 396, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\portability\fn_api_runner.py", line 478, in run_stages
    stage_context.safe_coders)
  File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\portability\fn_api_runner.py", line 774, in _run_stage
    result, splits = bundle_manager.process_bundle(data_input, data_output)
  File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\portability\fn_api_runner.py", line 1811, in process_bundle
    part, expected_outputs), part_inputs):
  File "C:\ProgramData\Anaconda3\lib\concurrent\futures\_base.py", line 586, in result_iterator
    yield fs.pop().result()
  File "C:\ProgramData\Anaconda3\lib\concurrent\futures\_base.py", line 432, in result
    return self.__get_result()
  File "C:\ProgramData\Anaconda3\lib\concurrent\futures\_base.py", line 384, in __get_result
    raise self._exception
  File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\utils\thread_pool_executor.py", line 42, in run
    self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
  File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\portability\fn_api_runner.py", line 1811, in <lambda>
    part, expected_outputs), part_inputs):
  File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\portability\fn_api_runner.py", line 1747, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\portability\fn_api_runner.py", line 1173, in push
    response = self.worker.do_instruction(request)
  File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 291, in do_instruction
    request.instruction_id)
  File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 317, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 675, in process_bundle
    data.transform_id].process_encoded(data.data)
  File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 146, in process_encoded
    self.output(decoded_value)
  File "C:\ProgramData\Anaconda3\lib\site-packages\apache_beam\runners\worker\operations.py", line 259, in output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "C:\ProgramData\Anaconda3\lib\site-packages\Cython\Shadow.py", line 165, in cast
    return type(*args)
TypeError: Receiver() takes no arguments

我无法运行任何论坛上给出的任何示例,所有示例都导致相同的错误。不知道需要做什么来修复它,我是 Apache 梁的新手。

标签: pythonapache-beam

解决方案


Apache Beam 中仍然存在问题。另一种解决方案是使用virtualenvfor DirectRunnerApache Beam 文档中已经提供了一个简短的教程。创建 virtualenv 不会自动加载 CPython,因此可以避免错误。


推荐阅读