首页 > 解决方案 > 具有极高计算步骤的气流过程

问题描述

我有一个步骤涉及需要在 Python 中完成的大量数据(目前使用 pandas)。我只是好奇地建议一个,以确保我有足够的资源来执行大型数据操作,用于几种不同的客户端配置,以及两个如何使这个过程更有效(即使用 Pyspark 或其他工具,这些都是新的,所以请耐心等待任何后续问题)。感谢您的帮助,如果需要只是想保持这个相当高的水平,我会尝试添加更多细节。

Pipeline 目前是一些 PythonOps、一些 BashOperators 和一些 BigQuery 运算符(通过 GCP Composer 运行)

标签: pythongoogle-cloud-platformairflowgoogle-cloud-composer

解决方案


Cloud Composer 用于调度管道。

对于第一个查询:

根据您使用的资源,如果多个任务在一个 DAG 中运行,并且操作员的数量更多,那么您需要配置您的 Composer 环境以满足条件。

  • 节点计数的数量可以根据任务实例增加。
  • 由于可以在单个 DAG 中安排多个任务,因此可以将机器类型更改为工作负载优化的机器和磁盘大小,以最大程度地减少环境速度。

对于第二个查询:

PySpark 支持 Apache Spark,它通过在并行和批处理系统中处理大量数据来处理它们。因此,Pyspark 可用于提高数据处理的效率。

因此,要在 Cloud Composer 中运行 PySpark 代码,您需要创建一个 Dataproc 集群,因为 PySpark 作业在 Dataproc 集群中运行。在 DAG 中,您可以使用 DataprocCreateClusterOperator 安排创建 Dataproc 集群。创建集群后,您可以使用 DataprocSubmitJobOperator 将 PySpark 作业提交到 Dataproc集群。要将作业提交到集群,您需要提供作业源文件。您可以参考下面的代码以供参考。

PySpark 工作代码:


import pyspark
from operator import add
sc = pyspark.SparkContext()

data = sc.parallelize(list("Hello World"))
counts = data.map(lambda x: 
    (x, 1)).reduceByKey(add).sortBy(lambda x: x[1],
     ascending=False).collect()

for (word, count) in counts:
    print("{}: {}".format(word, count))


DAG 代码:


import os
import datetime
from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
   DataprocCreateClusterOperator,
   DataprocSubmitJobOperator
)
from airflow.providers.google.cloud.sensors.dataproc import DataprocJobSensor
from airflow.utils.dates import days_ago

PROJECT_ID = "give your project id"
CLUSTER_NAME =  "your dataproc cluster name that you want to create"
REGION = "us-central1"
ZONE = "us-central1-a"
PYSPARK_URI = "GCS location of your PySpark Code i.e gs://[input file]"

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_dag_args = {
    'start_date': YESTERDAY,
}

# Cluster definition
# [START how_to_cloud_dataproc_create_cluster]

CLUSTER_CONFIG = {
   "master_config": {
       "num_instances": 1,
       "machine_type_uri": "n1-standard-4",
       "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
   },
   "worker_config": {
       "num_instances": 2,
       "machine_type_uri": "n1-standard-4",
       "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
   },
}
with models.DAG(
   "dataproc",

   schedule_interval=datetime.timedelta(days=1),
   default_args=default_dag_args) as dag:

   # [START how_to_cloud_dataproc_create_cluster_operator]
   create_cluster = DataprocCreateClusterOperator(
       task_id="create_cluster",
       project_id=PROJECT_ID,
       cluster_config=CLUSTER_CONFIG,
       region=REGION,
       cluster_name=CLUSTER_NAME,
   )
   PYSPARK_JOB = {
   "reference": {"project_id": PROJECT_ID},
   "placement": {"cluster_name": CLUSTER_NAME},
   "pyspark_job": {"main_python_file_uri": PYSPARK_URI},
   }

   pyspark_task = DataprocSubmitJobOperator(
       task_id="pyspark_task", job=PYSPARK_JOB, location=REGION, project_id=PROJECT_ID
   )

   create_cluster >>  pyspark_task

Dataflow还可用于批处理和流式数据的数据处理。


推荐阅读