首页 > 解决方案 > 如何让 PySpark 在 Google Cloud Composer 上运行

问题描述

我发现 Google Cloud Composer 是非常有前途的托管 Apache Airflow 服务,但我不知道如何使用 Cloud Composer 来执行带有 PySpark 代码的管道。我能够安装其他 Python 包,例如 Pandas 并很好地使用 Cloud Composer。

任何指针都非常感谢。

标签: apache-sparkgoogle-cloud-platformgoogle-cloud-composer

解决方案


Cloud Composer 用于调度管道。

因此,要在 Cloud Composer 中运行 PySpark 代码,您需要创建一个 Dataproc 集群,因为 PySpark 作业在 Dataproc 集群中运行。在 DAG 中,您可以使用DataprocCreateClusterOperator安排创建 Dataproc 集群。创建集群后,您可以使用 DataprocSubmitJobOperator 将 PySpark 作业提交到 Dataproc集群。要将作业提交到集群,您需要提供作业源文件。您可以参考下面的代码以供参考。

PySpark 代码:


import pyspark
from operator import add
sc = pyspark.SparkContext()

data = sc.parallelize(list("Hello World"))
counts = data.map(lambda x: 
    (x, 1)).reduceByKey(add).sortBy(lambda x: x[1],
     ascending=False).collect()

for (word, count) in counts:
    print("{}: {}".format(word, count))


DAG 代码:


import os
import datetime
from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
   DataprocCreateClusterOperator,
   DataprocSubmitJobOperator
)
from airflow.providers.google.cloud.sensors.dataproc import DataprocJobSensor
from airflow.utils.dates import days_ago

PROJECT_ID = "give your project id"
CLUSTER_NAME =  "your dataproc cluster name that you want to create"
REGION = "us-central1"
ZONE = "us-central1-a"
PYSPARK_URI = "GCS location of your PySpark Code i.e gs://[input file]"

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_dag_args = {
    'start_date': YESTERDAY,
}

# Cluster definition
# [START how_to_cloud_dataproc_create_cluster]

CLUSTER_CONFIG = {
   "master_config": {
       "num_instances": 1,
       "machine_type_uri": "n1-standard-4",
       "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
   },
   "worker_config": {
       "num_instances": 2,
       "machine_type_uri": "n1-standard-4",
       "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
   },
}

with models.DAG(
   "dataproc",

   schedule_interval=datetime.timedelta(days=1),
   default_args=default_dag_args) as dag:

   # [START how_to_cloud_dataproc_create_cluster_operator]
   create_cluster = DataprocCreateClusterOperator(
       task_id="create_cluster",
       project_id=PROJECT_ID,
       cluster_config=CLUSTER_CONFIG,
       region=REGION,
       cluster_name=CLUSTER_NAME,
   )

   PYSPARK_JOB = {
   "reference": {"project_id": PROJECT_ID},
   "placement": {"cluster_name": CLUSTER_NAME},
   "pyspark_job": {"main_python_file_uri": PYSPARK_URI},
   }

   pyspark_task = DataprocSubmitJobOperator(
       task_id="pyspark_task", job=PYSPARK_JOB, location=REGION, project_id=PROJECT_ID
   )

   create_cluster >>  pyspark_task

推荐阅读