python - 如何在 Dataproc 上运行 Apache Beam?
问题描述
我正在尝试通过 Apache Spark Runner 在 Google Dataproc 上运行 Apache Beam 的示例代码,所以我指的是这个页面。
示例代码是下面的 word_count.py。
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""A word-counting workflow."""
# pytype: skip-file
import argparse
import logging
import re
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions, PortableOptions
class WordExtractingDoFn(beam.DoFn):
"""Parse each line of input text into words."""
def process(self, element):
"""Returns an iterator over the words of this element.
The element is a line of text. If the line is blank, note that, too.
Args:
element: the element being processed
Returns:
The processed element.
"""
return re.findall(r'[\w\']+', element, re.UNICODE)
def run(argv=None, save_main_session=True):
"""Main entry point; defines and runs the wordcount pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='Input file to process.')
parser.add_argument(
'--output',
dest='output',
required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
hoge = [
"--runner=PortableRunner",
"--job_endpoint=localhost:8099",
"--environment_type=LOOPBACK"
]
pipeline_args = pipeline_args + hoge
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
print(pipeline_args)
print(known_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
# The pipeline will be run on exiting the with block.
with beam.Pipeline(options=pipeline_options) as p:
# Read the text file[pattern] into a PCollection.
lines = p | 'Read' >> ReadFromText(known_args.input)
counts = (
lines
| 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
| 'PairWIthOne' >> beam.Map(lambda x: (x, 1))
| 'GroupAndSum' >> beam.CombinePerKey(sum))
# Format the counts into a PCollection of strings.
def format_result(word, count):
return '%s: %d' % (word, count)
output = counts | 'Format' >> beam.MapTuple(format_result)
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | 'Write' >> WriteToText(known_args.output)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
在 ssh 到 Dataproc 集群(版本:)之后,我按照说明进行操作1.5.46-debian10
。
第一的,
docker run --net=host apache/beam_spark_job_server:latest
然后,我用
python -m word_count --output=gs://<bucket>/<path_to_dir> --spark_master_url='yarn' --spark_rest_url=spark://localhost:6066 --environment_type=LOOPBACK
我得到了输出。
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:apache_beam.runners.worker.worker_pool_main:Listening for workers at localhost:44113
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:root:Default Python SDK image for environment is apache/beam_python3.7_sdk:2.28.0
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function lift_combiners at 0x7f1309e92680> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function sort_stages at 0x7f1309e92dd0> ====================
INFO:apache_beam.runners.portability.portable_runner:Environment "LOOPBACK" has started a component necessary for the execution. Be sure to run the pipeline using
with Pipeline() as p:
p.apply(..)
This ensures that the pipeline finishes before this program exits.
INFO:apache_beam.runners.portability.portable_runner:Job state changed to STOPPED
INFO:apache_beam.runners.portability.portable_runner:Job state changed to STARTING
INFO:apache_beam.runners.portability.portable_runner:Job state changed to RUNNING
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 0
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control channel for localhost:33767.
INFO:apache_beam.runners.worker.sdk_worker:Control channel established.
INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with unbounded number of workers.
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure state channel for localhost:45575.
INFO:apache_beam.runners.worker.sdk_worker:State channel established.
INFO:apache_beam.runners.worker.data_plane:Creating client data channel for localhost:33121
INFO:apache_beam.runners.worker.sdk_worker:No more requests from control plane
INFO:apache_beam.runners.worker.sdk_worker:SDK Harness waiting for in-flight requests to complete
INFO:apache_beam.runners.worker.data_plane:Closing all cached grpc data channels.
INFO:apache_beam.runners.worker.sdk_worker:Closing all cached gRPC state handlers.
INFO:apache_beam.runners.worker.sdk_worker:Done consuming work.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 0
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control channel for localhost:36907.
INFO:apache_beam.runners.worker.sdk_worker:Control channel established.
INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with unbounded number of workers.
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure state channel for localhost:45469.
INFO:apache_beam.runners.worker.sdk_worker:State channel established.
INFO:apache_beam.runners.worker.data_plane:Creating client data channel for localhost:46297
INFO:apache_beam.runners.worker.sdk_worker:No more requests from control plane
INFO:apache_beam.runners.worker.sdk_worker:SDK Harness waiting for in-flight requests to complete
INFO:apache_beam.runners.worker.data_plane:Closing all cached grpc data channels.
INFO:apache_beam.runners.worker.sdk_worker:Closing all cached gRPC state handlers.
INFO:apache_beam.runners.worker.sdk_worker:Done consuming work.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 0
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control channel for localhost:35957.
INFO:apache_beam.runners.worker.sdk_worker:Control channel established.
INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with unbounded number of workers.
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure state channel for localhost:40055.
INFO:apache_beam.runners.worker.sdk_worker:State channel established.
INFO:apache_beam.runners.worker.data_plane:Creating client data channel for localhost:38615
INFO:apache_beam.runners.worker.sdk_worker:No more requests from control plane
INFO:apache_beam.runners.worker.sdk_worker:SDK Harness waiting for in-flight requests to complete
INFO:apache_beam.runners.worker.data_plane:Closing all cached grpc data channels.
INFO:apache_beam.runners.worker.sdk_worker:Closing all cached gRPC state handlers.
INFO:apache_beam.runners.worker.sdk_worker:Done consuming work.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 0
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control channel for localhost:37573.
INFO:apache_beam.runners.worker.sdk_worker:Control channel established.
INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with unbounded number of workers.
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure state channel for localhost:39507.
INFO:apache_beam.runners.worker.sdk_worker:State channel established.
INFO:apache_beam.runners.worker.data_plane:Creating client data channel for localhost:46455
INFO:apache_beam.runners.worker.sdk_worker:No more requests from control plane
INFO:apache_beam.runners.worker.sdk_worker:SDK Harness waiting for in-flight requests to complete
INFO:apache_beam.runners.worker.data_plane:Closing all cached grpc data channels.
INFO:apache_beam.runners.worker.sdk_worker:Closing all cached gRPC state handlers.
INFO:apache_beam.runners.worker.sdk_worker:Done consuming work.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 0
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control channel for localhost:40157.
INFO:apache_beam.runners.worker.sdk_worker:Control channel established.
INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with unbounded number of workers.
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure state channel for localhost:40969.
INFO:apache_beam.runners.worker.sdk_worker:State channel established.
INFO:apache_beam.runners.worker.data_plane:Creating client data channel for localhost:39675
INFO:apache_beam.runners.worker.sdk_worker:No more requests from control plane
INFO:apache_beam.runners.worker.sdk_worker:SDK Harness waiting for in-flight requests to complete
INFO:apache_beam.runners.worker.data_plane:Closing all cached grpc data channels.
INFO:apache_beam.runners.worker.sdk_worker:Closing all cached gRPC state handlers.
INFO:apache_beam.runners.worker.sdk_worker:Done consuming work.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 0
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control channel for localhost:38337.
INFO:apache_beam.runners.worker.sdk_worker:Control channel established.
INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with unbounded number of workers.
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure state channel for localhost:41695.
INFO:apache_beam.runners.worker.sdk_worker:State channel established.
INFO:apache_beam.runners.worker.data_plane:Creating client data channel for localhost:33821
INFO:apache_beam.runners.worker.sdk_worker:No more requests from control plane
INFO:apache_beam.runners.worker.sdk_worker:SDK Harness waiting for in-flight requests to complete
INFO:apache_beam.runners.worker.data_plane:Closing all cached grpc data channels.
INFO:apache_beam.runners.worker.sdk_worker:Closing all cached gRPC state handlers.
INFO:apache_beam.runners.worker.sdk_worker:Done consuming work.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 0
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control channel for localhost:43021.
INFO:apache_beam.runners.worker.sdk_worker:Control channel established.
INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with unbounded number of workers.
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure state channel for localhost:33591.
INFO:apache_beam.runners.worker.sdk_worker:State channel established.
INFO:apache_beam.runners.worker.data_plane:Creating client data channel for localhost:42095
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 5 files in 0.06402087211608887 seconds.
WARNING:apache_beam.io.filebasedsink:Deleting 4 existing files in target path matching: -*-of-%(num_shards)05d
INFO:apache_beam.runners.worker.sdk_worker:No more requests from control plane
INFO:apache_beam.runners.worker.sdk_worker:SDK Harness waiting for in-flight requests to complete
INFO:apache_beam.runners.worker.data_plane:Closing all cached grpc data channels.
INFO:apache_beam.runners.worker.sdk_worker:Closing all cached gRPC state handlers.
INFO:apache_beam.runners.worker.sdk_worker:Done consuming work.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 0
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control channel for localhost:39545.
INFO:apache_beam.runners.worker.sdk_worker:Control channel established.
INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with unbounded number of workers.
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure state channel for localhost:35987.
INFO:apache_beam.runners.worker.sdk_worker:State channel established.
INFO:apache_beam.runners.worker.data_plane:Creating client data channel for localhost:44969
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 4 files in 0.061638832092285156 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 1 files in 0.07926607131958008 seconds.
INFO:apache_beam.io.filebasedsink:Starting finalize_write threads with num_shards: 4 (skipped: 0), batches: 1, num_threads: 4
INFO:apache_beam.io.filebasedsink:Renamed 4 shards in 0.20 seconds.
INFO:apache_beam.runners.worker.sdk_worker:No more requests from control plane
INFO:apache_beam.runners.worker.sdk_worker:SDK Harness waiting for in-flight requests to complete
INFO:apache_beam.runners.worker.data_plane:Closing all cached grpc data channels.
INFO:apache_beam.runners.worker.sdk_worker:Closing all cached gRPC state handlers.
INFO:apache_beam.runners.worker.sdk_worker:Done consuming work.
INFO:apache_beam.runners.portability.portable_runner:Job state changed to DONE
似乎没有错误,并且工作已成功完成。然而,在网络spark-job-history
上没有这项工作的历史,这很奇怪。
我可能错过了什么。你能给我一些建议吗?
谢谢!
解决方案
推荐阅读
- php - 转义包含引号的 PHP exec cmd
- sql - 动态 Neo4j 密码查询
- r - 在 R 3.4.4 中卸载包 R6 2.2.0
- xml - shell 查找给定属性的 xml 标签
- node.js - Docker 内编译的打字稿中断:“不能在模块外使用导入语句”
- python - 在 Python/Selenium 中使用 find_element_by 语句绕过异常错误
- javascript - 如何在 Vue 3 中向嵌套的 vanilla DOM 元素添加响应式数据?
- c# - (局部变量)列表假期参数 1:无法从 'System.Collections.Generic.List 转换
' 至 '?' - tensorflow - 将 tensorflow 数据集记录分块为多条记录
- python - 使用列表理解和公式