python - 具有极高计算步骤的气流过程
问题描述
我有一个步骤涉及需要在 Python 中完成的大量数据(目前使用 pandas)。我只是好奇地建议一个,以确保我有足够的资源来执行大型数据操作,用于几种不同的客户端配置,以及两个如何使这个过程更有效(即使用 Pyspark 或其他工具,这些都是新的,所以请耐心等待任何后续问题)。感谢您的帮助,如果需要只是想保持这个相当高的水平,我会尝试添加更多细节。
Pipeline 目前是一些 PythonOps、一些 BashOperators 和一些 BigQuery 运算符(通过 GCP Composer 运行)
解决方案
Cloud Composer 用于调度管道。
对于第一个查询:
根据您使用的资源,如果多个任务在一个 DAG 中运行,并且操作员的数量更多,那么您需要配置您的 Composer 环境以满足条件。
- 节点计数的数量可以根据任务实例增加。
- 由于可以在单个 DAG 中安排多个任务,因此可以将机器类型更改为工作负载优化的机器和磁盘大小,以最大程度地减少环境速度。
对于第二个查询:
PySpark 支持 Apache Spark,它通过在并行和批处理系统中处理大量数据来处理它们。因此,Pyspark 可用于提高数据处理的效率。
因此,要在 Cloud Composer 中运行 PySpark 代码,您需要创建一个 Dataproc 集群,因为 PySpark 作业在 Dataproc 集群中运行。在 DAG 中,您可以使用 DataprocCreateClusterOperator 安排创建 Dataproc 集群。创建集群后,您可以使用 DataprocSubmitJobOperator 将 PySpark 作业提交到 Dataproc集群。要将作业提交到集群,您需要提供作业源文件。您可以参考下面的代码以供参考。
PySpark 工作代码:
import pyspark
from operator import add
sc = pyspark.SparkContext()
data = sc.parallelize(list("Hello World"))
counts = data.map(lambda x:
(x, 1)).reduceByKey(add).sortBy(lambda x: x[1],
ascending=False).collect()
for (word, count) in counts:
print("{}: {}".format(word, count))
DAG 代码:
import os
import datetime
from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
DataprocCreateClusterOperator,
DataprocSubmitJobOperator
)
from airflow.providers.google.cloud.sensors.dataproc import DataprocJobSensor
from airflow.utils.dates import days_ago
PROJECT_ID = "give your project id"
CLUSTER_NAME = "your dataproc cluster name that you want to create"
REGION = "us-central1"
ZONE = "us-central1-a"
PYSPARK_URI = "GCS location of your PySpark Code i.e gs://[input file]"
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
default_dag_args = {
'start_date': YESTERDAY,
}
# Cluster definition
# [START how_to_cloud_dataproc_create_cluster]
CLUSTER_CONFIG = {
"master_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
},
"worker_config": {
"num_instances": 2,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
},
}
with models.DAG(
"dataproc",
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [START how_to_cloud_dataproc_create_cluster_operator]
create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster",
project_id=PROJECT_ID,
cluster_config=CLUSTER_CONFIG,
region=REGION,
cluster_name=CLUSTER_NAME,
)
PYSPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pyspark_job": {"main_python_file_uri": PYSPARK_URI},
}
pyspark_task = DataprocSubmitJobOperator(
task_id="pyspark_task", job=PYSPARK_JOB, location=REGION, project_id=PROJECT_ID
)
create_cluster >> pyspark_task
Dataflow还可用于批处理和流式数据的数据处理。
推荐阅读
- c# - Do While 循环中的随机数
- python - Tkinter:输出小部件不显示文本
- scala - ZIO [声明式] 事务管理
- javascript - 如何为 nextJS 创建静态生成的标头
- html - 在画布中一一着色矩形
- jsf - 使用 org.omnifaces.cdi.ViewScoped 时,我的 javax.servlet.Filter 在单个页面视图上同时接收到 GET 和 POST
- android - 这里有什么问题?为什么代码中出现红色下划线?
- flutter - 如何设置标签栏颤动的样式
- javascript - addEventListeners 自动触发
- python - DataFrames结构的dict来比较xlsx表