首页 > 解决方案 > 如何在 kubernetes 环境中使用 spark 配置 beam python sdk

问题描述

TLDR;

如何使用“environment_type”= EXTERNAL 或 PROCESS 配置 Apache Beam 管道选项?

描述

目前,我们在 Kubernetes 中有一个独立的 spark 集群,按照这个解决方案(和设置),我们启动了一个 beam 管道,在需要联合运行 python SDK 的 spark worker 上创建一个嵌入式 spark 作业服务器。Apache Beam 允许以4 种不同的方式运行 python SDK :

发展

  1. 使用“外部” - 在同一个 pod 上使用 python sdk 实现 spark worker:
apiVersion: apps/v1
kind: Deployment
metadata:
  name: spark-worker
  labels:
    app: spark-worker
spec:
  selector:
    matchLabels:
      app: spark-worker
  template:
    metadata:
      labels:
        app: spark-worker
    spec:
      containers:
      - name: spark-worker
        image: spark-py-custom:latest
        imagePullPolicy: Never
        ports:
        - containerPort: 8081
          protocol: TCP
        command: ['/bin/bash',"-c","--"]
        args: ["/start-worker.sh" ]
        resources :
          requests :
            cpu : 4
            memory : "5Gi"
          limits :
             cpu : 4
             memory : "5Gi"
        volumeMounts:
          - name: spark-jars
            mountPath: "/tmp"
      - name: python-beam-sdk
        image: apachebeam/python3.7_sdk:latest
        command: ["/opt/apache/beam/boot", "--worker_pool"]
        ports:
        - containerPort: 50000
        resources:
          limits:
            cpu: "1"
            memory: "1Gi"
      volumes:
        - name: spark-jars
          persistentVolumeClaim:
            claimName: spark-jars

而他们,如果我们执行命令

python3 wordcount.py \
--output ./data_test/counts \
--runner=SparkRunner \
--spark_submit_uber_jar \
--spark_job_server_jar=beam-runners-spark-job-server-2.28.0.jar \
--spark_master_url=spark://spark-master:7077 \
--spark_rest_url=http://spark-master:6066 \
--environment_type=EXTERNAL \
--environment_config=localhost:50000

我们得到一个处于“RUNNING”状态的卡住终端:

INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:oauth2client.client:Timeout attempting to reach GCE metadata service.
WARNING:apache_beam.internal.gcp.auth:Unable to find default credentials to use: The Application Default Credentials are not available. They are available if running in Google Compute Engine. Otherwise, the environment variable GOOGLE_APPLICATION_CREDENTIALS must be defined pointing to a file defining the credentials. See https://developers.google.com/accounts/docs/application-default-credentials for more information.
Connecting anonymously.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:root:Default Python SDK image for environment is apache/beam_python3.7_sdk:2.28.0
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function lift_combiners at 0x7fc360c0b8c8> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function sort_stages at 0x7fc360c0f048> ====================
INFO:apache_beam.runners.portability.abstract_job_service:Artifact server started on port 36369
INFO:apache_beam.runners.portability.abstract_job_service:Running job 'job-2448721e-e686-41d4-b924-5f8c5ae73ac2'
INFO:apache_beam.runners.portability.spark_uber_jar_job_server:Submitted Spark job with ID driver-20210305172421-0000
INFO:apache_beam.runners.portability.portable_runner:Job state changed to STOPPED
INFO:apache_beam.runners.portability.portable_runner:Job state changed to RUNNING

在火花工作者日志中:

21/03/05 17:24:25 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=45203" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@spark-worker-64fd4ddd6-tqdrs:45203" "--executor-id" "0" "--hostname" "172.18.0.20" "--cores" "3" "--app-id" "app-20210305172425-0000" "--worker-url" "spark://Worker@172.18.0.20:44365"

在 python sdk 上:

2021/03/05 17:19:52 Starting worker pool 1: python -m apache_beam.runners.worker.worker_pool_main --service_port=50000 --container_executable=/opt/apache/beam/boot
Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=', '--artifact_endpoint=', '--provision_endpoint=', '--control_endpoint=']
2021/03/05 17:24:32 No logging endpoint provided.

检查 spark worker 标准错误(在 localhost 8081 上):

Spark Executor Command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=45203" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@spark-worker-64fd4ddd6-tqdrs:45203" "--executor-id" "0" "--hostname" "172.18.0.20" "--cores" "3" "--app-id" "app-20210305172425-0000" "--worker-url" "spark://Worker@172.18.0.20:44365"
========================================

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/03/05 17:24:26 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 230@spark-worker-64fd4ddd6-tqdrs
21/03/05 17:24:26 INFO SignalUtils: Registered signal handler for TERM
21/03/05 17:24:26 INFO SignalUtils: Registered signal handler for HUP
21/03/05 17:24:26 INFO SignalUtils: Registered signal handler for INT
21/03/05 17:24:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/03/05 17:24:27 INFO SecurityManager: Changing view acls to: root
21/03/05 17:24:27 INFO SecurityManager: Changing modify acls to: root
21/03/05 17:24:27 INFO SecurityManager: Changing view acls groups to: 
21/03/05 17:24:27 INFO SecurityManager: Changing modify acls groups to: 
21/03/05 17:24:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/03/05 17:24:27 INFO TransportClientFactory: Successfully created connection to spark-worker-64fd4ddd6-tqdrs/172.18.0.20:45203 after 50 ms (0 ms spent in bootstraps)
21/03/05 17:24:27 INFO SecurityManager: Changing view acls to: root
21/03/05 17:24:27 INFO SecurityManager: Changing modify acls to: root
21/03/05 17:24:27 INFO SecurityManager: Changing view acls groups to: 
21/03/05 17:24:27 INFO SecurityManager: Changing modify acls groups to: 
21/03/05 17:24:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/03/05 17:24:28 INFO TransportClientFactory: Successfully created connection to spark-worker-64fd4ddd6-tqdrs/172.18.0.20:45203 after 1 ms (0 ms spent in bootstraps)
21/03/05 17:24:28 INFO DiskBlockManager: Created local directory at /tmp/spark-bdffc2b3-f57a-42fa-a720-e22274b86b67/executor-f1eff7ca-d2cd-4ff4-b18b-c8d6a520f590/blockmgr-c61fb65f-ea97-4bd5-bf15-e0025845a251
21/03/05 17:24:28 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
21/03/05 17:24:28 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@spark-worker-64fd4ddd6-tqdrs:45203
21/03/05 17:24:28 INFO WorkerWatcher: Connecting to worker spark://Worker@172.18.0.20:44365
21/03/05 17:24:28 INFO TransportClientFactory: Successfully created connection to /172.18.0.20:44365 after 1 ms (0 ms spent in bootstraps)
21/03/05 17:24:28 INFO WorkerWatcher: Successfully connected to spark://Worker@172.18.0.20:44365
21/03/05 17:24:28 INFO CoarseGrainedExecutorBackend: Successfully registered with driver
21/03/05 17:24:28 INFO Executor: Starting executor ID 0 on host 172.18.0.20
21/03/05 17:24:28 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 42561.
21/03/05 17:24:28 INFO NettyBlockTransferService: Server created on 172.18.0.20:42561
21/03/05 17:24:28 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/03/05 17:24:28 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(0, 172.18.0.20, 42561, None)
21/03/05 17:24:28 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(0, 172.18.0.20, 42561, None)
21/03/05 17:24:28 INFO BlockManager: Initialized BlockManager: BlockManagerId(0, 172.18.0.20, 42561, None)

它永远卡在哪里。检查 python SDK 的源代码,我们可以看到“没有提供日志记录端点”是致命的,它来自于发送给他的缺少配置(没有日志记录/工件/供应/控制端点)。如果我尝试将“--artifact_endpoint”添加到 python 命令,我会收到通信失败的 grcp 错误,因为作业服务器会创建自己的工件端点。在此设置中,需要使用固定端口配置所有这些端点(可能作为 SDK 的 localhost 和工作人员在同一个 pod 中),但我找不到如何配置它。检查所以我可以找到一个相关的问题,但在他的情况下,他会自动获取 python SDK 配置(可能是 spark runner 问题?)

  1. 使用“PROCESS” - 尝试在进程中运行 python SDK,我使用 构建了 python SDK ./gradlew :sdks:python:container:py37:docker,将 sdks/python/container/build/target/launcher/linux_amd64/boot 可执行文件复制到 spark worker 容器内的 /python_sdk/boot并使用了命令:
python3 wordcount.py \
--output ./data_test/counts \
--runner=SparkRunner \
--spark_submit_uber_jar \
--spark_master_url=spark://spark-master:7077 \
--spark_rest_url=http://spark-master:6066 \
--environment_type=PROCESS \
--spark_job_server_jar=beam-runners-spark-job-server-2.28.0.jar \
--environment_config='{"os":"linux","arch":"x84_64","command":"/python_sdk/boot"}'

导致终端出现“运行时异常”:

INFO:apache_beam.runners.portability.portable_runner:Job state changed to FAILED
Traceback (most recent call last):
  File "wordcount.py", line 91, in <module>
    run()
  File "wordcount.py", line 86, in run
    output | "Write" >> WriteToText(known_args.output)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/pipeline.py", line 581, in __exit__
    self.result.wait_until_finish()
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/portability/portable_runner.py", line 608, in wait_until_finish
    raise self._runtime_exception
RuntimeError: Pipeline job-95c13aa5-96ab-4d1d-bc68-7f9d203c8251 failed in state FAILED: unknown error

并再次检查 spark stderr worker 日志,我可以看到问题是java.lang.IllegalArgumentException: No filesystem found for scheme classpath我不知道原因。

21/03/05 18:33:12 INFO Executor: Adding file:/opt/spark/work/app-20210305183309-0000/0/./javax.servlet-api-3.1.0.jar to class loader
21/03/05 18:33:12 INFO TorrentBroadcast: Started reading broadcast variable 0
21/03/05 18:33:12 INFO TransportClientFactory: Successfully created connection to spark-worker-89c5c4c87-5q45s/172.18.0.20:34783 after 1 ms (0 ms spent in bootstraps)
21/03/05 18:33:12 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 12.3 KB, free 366.3 MB)
21/03/05 18:33:12 INFO TorrentBroadcast: Reading broadcast variable 0 took 63 ms
21/03/05 18:33:12 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.5 KB, free 366.3 MB)
21/03/05 18:33:13 INFO MemoryStore: Block rdd_13_0 stored as values in memory (estimated size 16.0 B, free 366.3 MB)
21/03/05 18:33:13 INFO MemoryStore: Block rdd_17_0 stored as values in memory (estimated size 16.0 B, free 366.3 MB)
21/03/05 18:33:13 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 5427 bytes result sent to driver
21/03/05 18:33:14 ERROR SerializingExecutor: Exception while executing runnable org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@5f917914
java.lang.IllegalArgumentException: No filesystem found for scheme classpath
    at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:467)
    at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:537)
    at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:125)
    at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:99)
    at org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
21/03/05 18:33:16 ERROR SerializingExecutor: Exception while executing runnable org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@67fb2b2c
java.lang.IllegalArgumentException: No filesystem found for scheme classpath
    at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:467)
    at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:537)
    at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:125)
    at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:99)
    at org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
21/03/05 18:33:19 INFO ProcessEnvironmentFactory: Still waiting for startup of environment '/python_sdk/boot' for worker id 1-1

可能缺少一些配置参数。

观测值

如果我执行命令

python3 wordcount.py \
--output ./data_test/counts \
--runner=SparkRunner \
--spark_submit_uber_jar \
--spark_job_server_jar=beam-runners-spark-job-server-2.28.0.jar \
--spark_master_url=spark://spark-master:7077 \
--spark_rest_url=http://spark-master:6066 \
--environment_type=LOOPBACK

在我们的 spark worker 内部(在 spark 集群中只有一个 worker),我们有一个完整的工作梁管道,其中包含这些日志。

标签: javaapache-sparkkubernetessdkapache-beam

解决方案


  1. 使用“外部”——这绝对像是 Beam 中的一个错误。工作人员端点应该设置为使用 localhost;我认为不可能配置它们。我不确定他们为什么会失踪;一个有根据的猜测是服务器静默启动失败,使端点为空。我为此问题提交了错误报告 ( BEAM-11957 )。
  2. 使用“进程” - 该方案classpath对应于ClassLoaderFileSystem。该文件系统通常使用 AutoService 加载,这取决于类路径中存在的ClassLoaderFileSystemRegistrar (与文件系统本身的名称无关)。作业 jar 的类路径基于spark_job_server_jar. 你从哪里得到你beam-runners-spark-job-server-2.28.0.jar的?

推荐阅读