首页 > 解决方案 > 如何在 Dataproc 上运行 Apache Beam?

问题描述

我正在尝试通过 Apache Spark Runner 在 Google Dataproc 上运行 Apache Beam 的示例代码,所以我指的是这个页面

示例代码是下面的 word_count.py。

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""A word-counting workflow."""

# pytype: skip-file

import argparse
import logging
import re

import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions, PortableOptions


class WordExtractingDoFn(beam.DoFn):
  """Parse each line of input text into words."""
  def process(self, element):
    """Returns an iterator over the words of this element.

    The element is a line of text.  If the line is blank, note that, too.

    Args:
      element: the element being processed

    Returns:
      The processed element.
    """
    return re.findall(r'[\w\']+', element, re.UNICODE)


def run(argv=None, save_main_session=True):
  """Main entry point; defines and runs the wordcount pipeline."""
  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--input',
      dest='input',
      default='gs://dataflow-samples/shakespeare/kinglear.txt',
      help='Input file to process.')
  parser.add_argument(
      '--output',
      dest='output',
      required=True,
      help='Output file to write results to.')
  known_args, pipeline_args = parser.parse_known_args(argv)
  hoge = [
    "--runner=PortableRunner",
    "--job_endpoint=localhost:8099",
    "--environment_type=LOOPBACK"
    ]
  pipeline_args = pipeline_args + hoge

  # We use the save_main_session option because one or more DoFn's in this
  # workflow rely on global context (e.g., a module imported at module level).
  pipeline_options = PipelineOptions(pipeline_args)
  print(pipeline_args)
  print(known_args)
  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

  # The pipeline will be run on exiting the with block.
  with beam.Pipeline(options=pipeline_options) as p:

    # Read the text file[pattern] into a PCollection.
    lines = p | 'Read' >> ReadFromText(known_args.input)

    counts = (
        lines
        | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
        | 'PairWIthOne' >> beam.Map(lambda x: (x, 1))
        | 'GroupAndSum' >> beam.CombinePerKey(sum))

    # Format the counts into a PCollection of strings.
    def format_result(word, count):
      return '%s: %d' % (word, count)

    output = counts | 'Format' >> beam.MapTuple(format_result)

    # Write the output using a "Write" transform that has side effects.
    # pylint: disable=expression-not-assigned
    output | 'Write' >> WriteToText(known_args.output)


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

在 ssh 到 Dataproc 集群(版本:)之后,我按照说明进行操作1.5.46-debian10

第一的,

docker run --net=host apache/beam_spark_job_server:latest

然后,我用

python -m word_count --output=gs://<bucket>/<path_to_dir> --spark_master_url='yarn' --spark_rest_url=spark://localhost:6066 --environment_type=LOOPBACK

我得到了输出。

INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:apache_beam.runners.worker.worker_pool_main:Listening for workers at localhost:44113
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:root:Default Python SDK image for environment is apache/beam_python3.7_sdk:2.28.0
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function lift_combiners at 0x7f1309e92680> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function sort_stages at 0x7f1309e92dd0> ====================
INFO:apache_beam.runners.portability.portable_runner:Environment "LOOPBACK" has started a component necessary for the execution. Be sure to run the pipeline using
  with Pipeline() as p:
    p.apply(..)
This ensures that the pipeline finishes before this program exits.
INFO:apache_beam.runners.portability.portable_runner:Job state changed to STOPPED
INFO:apache_beam.runners.portability.portable_runner:Job state changed to STARTING
INFO:apache_beam.runners.portability.portable_runner:Job state changed to RUNNING
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 0
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control channel for localhost:33767.
INFO:apache_beam.runners.worker.sdk_worker:Control channel established.
INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with unbounded number of workers.
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure state channel for localhost:45575.
INFO:apache_beam.runners.worker.sdk_worker:State channel established.
INFO:apache_beam.runners.worker.data_plane:Creating client data channel for localhost:33121
INFO:apache_beam.runners.worker.sdk_worker:No more requests from control plane
INFO:apache_beam.runners.worker.sdk_worker:SDK Harness waiting for in-flight requests to complete
INFO:apache_beam.runners.worker.data_plane:Closing all cached grpc data channels.
INFO:apache_beam.runners.worker.sdk_worker:Closing all cached gRPC state handlers.
INFO:apache_beam.runners.worker.sdk_worker:Done consuming work.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 0
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control channel for localhost:36907.
INFO:apache_beam.runners.worker.sdk_worker:Control channel established.
INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with unbounded number of workers.
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure state channel for localhost:45469.
INFO:apache_beam.runners.worker.sdk_worker:State channel established.
INFO:apache_beam.runners.worker.data_plane:Creating client data channel for localhost:46297
INFO:apache_beam.runners.worker.sdk_worker:No more requests from control plane
INFO:apache_beam.runners.worker.sdk_worker:SDK Harness waiting for in-flight requests to complete
INFO:apache_beam.runners.worker.data_plane:Closing all cached grpc data channels.
INFO:apache_beam.runners.worker.sdk_worker:Closing all cached gRPC state handlers.
INFO:apache_beam.runners.worker.sdk_worker:Done consuming work.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 0
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control channel for localhost:35957.
INFO:apache_beam.runners.worker.sdk_worker:Control channel established.
INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with unbounded number of workers.
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure state channel for localhost:40055.
INFO:apache_beam.runners.worker.sdk_worker:State channel established.
INFO:apache_beam.runners.worker.data_plane:Creating client data channel for localhost:38615
INFO:apache_beam.runners.worker.sdk_worker:No more requests from control plane
INFO:apache_beam.runners.worker.sdk_worker:SDK Harness waiting for in-flight requests to complete
INFO:apache_beam.runners.worker.data_plane:Closing all cached grpc data channels.
INFO:apache_beam.runners.worker.sdk_worker:Closing all cached gRPC state handlers.
INFO:apache_beam.runners.worker.sdk_worker:Done consuming work.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 0
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control channel for localhost:37573.
INFO:apache_beam.runners.worker.sdk_worker:Control channel established.
INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with unbounded number of workers.
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure state channel for localhost:39507.
INFO:apache_beam.runners.worker.sdk_worker:State channel established.
INFO:apache_beam.runners.worker.data_plane:Creating client data channel for localhost:46455
INFO:apache_beam.runners.worker.sdk_worker:No more requests from control plane
INFO:apache_beam.runners.worker.sdk_worker:SDK Harness waiting for in-flight requests to complete
INFO:apache_beam.runners.worker.data_plane:Closing all cached grpc data channels.
INFO:apache_beam.runners.worker.sdk_worker:Closing all cached gRPC state handlers.
INFO:apache_beam.runners.worker.sdk_worker:Done consuming work.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 0
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control channel for localhost:40157.
INFO:apache_beam.runners.worker.sdk_worker:Control channel established.
INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with unbounded number of workers.
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure state channel for localhost:40969.
INFO:apache_beam.runners.worker.sdk_worker:State channel established.
INFO:apache_beam.runners.worker.data_plane:Creating client data channel for localhost:39675
INFO:apache_beam.runners.worker.sdk_worker:No more requests from control plane
INFO:apache_beam.runners.worker.sdk_worker:SDK Harness waiting for in-flight requests to complete
INFO:apache_beam.runners.worker.data_plane:Closing all cached grpc data channels.
INFO:apache_beam.runners.worker.sdk_worker:Closing all cached gRPC state handlers.
INFO:apache_beam.runners.worker.sdk_worker:Done consuming work.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 0
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control channel for localhost:38337.
INFO:apache_beam.runners.worker.sdk_worker:Control channel established.
INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with unbounded number of workers.
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure state channel for localhost:41695.
INFO:apache_beam.runners.worker.sdk_worker:State channel established.
INFO:apache_beam.runners.worker.data_plane:Creating client data channel for localhost:33821
INFO:apache_beam.runners.worker.sdk_worker:No more requests from control plane
INFO:apache_beam.runners.worker.sdk_worker:SDK Harness waiting for in-flight requests to complete
INFO:apache_beam.runners.worker.data_plane:Closing all cached grpc data channels.
INFO:apache_beam.runners.worker.sdk_worker:Closing all cached gRPC state handlers.
INFO:apache_beam.runners.worker.sdk_worker:Done consuming work.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 0
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control channel for localhost:43021.
INFO:apache_beam.runners.worker.sdk_worker:Control channel established.
INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with unbounded number of workers.
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure state channel for localhost:33591.
INFO:apache_beam.runners.worker.sdk_worker:State channel established.
INFO:apache_beam.runners.worker.data_plane:Creating client data channel for localhost:42095
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 5 files in 0.06402087211608887 seconds.
WARNING:apache_beam.io.filebasedsink:Deleting 4 existing files in target path matching: -*-of-%(num_shards)05d
INFO:apache_beam.runners.worker.sdk_worker:No more requests from control plane
INFO:apache_beam.runners.worker.sdk_worker:SDK Harness waiting for in-flight requests to complete
INFO:apache_beam.runners.worker.data_plane:Closing all cached grpc data channels.
INFO:apache_beam.runners.worker.sdk_worker:Closing all cached gRPC state handlers.
INFO:apache_beam.runners.worker.sdk_worker:Done consuming work.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 0
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control channel for localhost:39545.
INFO:apache_beam.runners.worker.sdk_worker:Control channel established.
INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with unbounded number of workers.
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure state channel for localhost:35987.
INFO:apache_beam.runners.worker.sdk_worker:State channel established.
INFO:apache_beam.runners.worker.data_plane:Creating client data channel for localhost:44969
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 4 files in 0.061638832092285156 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 1 files in 0.07926607131958008 seconds.
INFO:apache_beam.io.filebasedsink:Starting finalize_write threads with num_shards: 4 (skipped: 0), batches: 1, num_threads: 4
INFO:apache_beam.io.filebasedsink:Renamed 4 shards in 0.20 seconds.
INFO:apache_beam.runners.worker.sdk_worker:No more requests from control plane
INFO:apache_beam.runners.worker.sdk_worker:SDK Harness waiting for in-flight requests to complete
INFO:apache_beam.runners.worker.data_plane:Closing all cached grpc data channels.
INFO:apache_beam.runners.worker.sdk_worker:Closing all cached gRPC state handlers.
INFO:apache_beam.runners.worker.sdk_worker:Done consuming work.
INFO:apache_beam.runners.portability.portable_runner:Job state changed to DONE

似乎没有错误,并且工作已成功完成。然而,在网络spark-job-history上没有这项工作的历史,这很奇怪。

我可能错过了什么。你能给我一些建议吗?

谢谢!

标签: pythonapache-beamgoogle-cloud-dataproc

解决方案


推荐阅读