apache-spark - 如何让 PySpark 在 Google Cloud Composer 上运行
问题描述
我发现 Google Cloud Composer 是非常有前途的托管 Apache Airflow 服务,但我不知道如何使用 Cloud Composer 来执行带有 PySpark 代码的管道。我能够安装其他 Python 包,例如 Pandas 并很好地使用 Cloud Composer。
任何指针都非常感谢。
解决方案
Cloud Composer 用于调度管道。
因此,要在 Cloud Composer 中运行 PySpark 代码,您需要创建一个 Dataproc 集群,因为 PySpark 作业在 Dataproc 集群中运行。在 DAG 中,您可以使用DataprocCreateClusterOperator安排创建 Dataproc 集群。创建集群后,您可以使用 DataprocSubmitJobOperator 将 PySpark 作业提交到 Dataproc集群。要将作业提交到集群,您需要提供作业源文件。您可以参考下面的代码以供参考。
PySpark 代码:
import pyspark
from operator import add
sc = pyspark.SparkContext()
data = sc.parallelize(list("Hello World"))
counts = data.map(lambda x:
(x, 1)).reduceByKey(add).sortBy(lambda x: x[1],
ascending=False).collect()
for (word, count) in counts:
print("{}: {}".format(word, count))
DAG 代码:
import os
import datetime
from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
DataprocCreateClusterOperator,
DataprocSubmitJobOperator
)
from airflow.providers.google.cloud.sensors.dataproc import DataprocJobSensor
from airflow.utils.dates import days_ago
PROJECT_ID = "give your project id"
CLUSTER_NAME = "your dataproc cluster name that you want to create"
REGION = "us-central1"
ZONE = "us-central1-a"
PYSPARK_URI = "GCS location of your PySpark Code i.e gs://[input file]"
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
default_dag_args = {
'start_date': YESTERDAY,
}
# Cluster definition
# [START how_to_cloud_dataproc_create_cluster]
CLUSTER_CONFIG = {
"master_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
},
"worker_config": {
"num_instances": 2,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
},
}
with models.DAG(
"dataproc",
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [START how_to_cloud_dataproc_create_cluster_operator]
create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster",
project_id=PROJECT_ID,
cluster_config=CLUSTER_CONFIG,
region=REGION,
cluster_name=CLUSTER_NAME,
)
PYSPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pyspark_job": {"main_python_file_uri": PYSPARK_URI},
}
pyspark_task = DataprocSubmitJobOperator(
task_id="pyspark_task", job=PYSPARK_JOB, location=REGION, project_id=PROJECT_ID
)
create_cluster >> pyspark_task
推荐阅读
- linux - Apache/Linux - 以 root 身份部署网站?
- visual-studio-code - VSCode 是否可以配置为添加自定义“|” 在 Rust 中使用闭包时(管道)匹配?
- scala - Seq[Any] 上的 scala 中的模式匹配
- oracle - 使用HiLo EF Core 5 设置值到前一个
- amcharts4 - 3d列amcharts如何在较大值前面制作较小的值
- aws-api-gateway - 通过其域名调用 API 网关
- html - Bootstrap 4 轮播 - d-md-flex align-items-md-center 刹车 col 宽度
- java - 如何在应用程序运行时检测 sqlite 文件替换?
- python - Pandas 在推断 dtypes 时改变值
- python - 前馈神经网络总是输出随机但相似的值