python - 如何使用 Airflow 连接到 AWS Emr Notebook
问题描述
我想将我的气流连接到集群上运行的 Emr Notebook,到目前为止我已成功连接到 AWS EMR 集群,但我无法连接到笔记本,请帮忙。
在下面的代码中,我正在将一些文件加载到 s3 存储桶中,然后我想在我的集群上执行一些步骤功能,但我也想在无法连接的 emr 集群上运行预制笔记本。请帮忙谢谢
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.hooks.S3_hook import S3Hook
from airflow.operators import PythonOperator
from airflow.contrib.operators.emr_create_job_flow_operator import (
EmrCreateJobFlowOperator,
)
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.contrib.operators.emr_terminate_job_flow_operator import (
EmrTerminateJobFlowOperator,
)
# Configurations
BUCKET_NAME = "as*****************" # replace this with your bucket name
local_data = "./dags/data/movie_review.csv"
s3_data = "data/movie_review.csv"
local_script = "./dags/scripts/spark/random_text_classification.py"
s3_script = "scripts/random_text_classification.py"
s3_clean = "clean_data/"
SPARK_STEPS = [ # Note the params values are supplied to the operator
{
"Name": "Move raw data from S3 to HDFS",
"ActionOnFailure": "CANCEL_AND_WAIT",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"s3-dist-cp",
"--src=s3://{{ params.BUCKET_NAME }}/data",
"--dest=/movie",
],
},
},
{
"Name": "Classify movie reviews",
"ActionOnFailure": "CANCEL_AND_WAIT",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode",
"client",
"s3://{{ params.BUCKET_NAME }}/{{ params.s3_script }}",
],
},
},
{
"Name": "Move clean data from HDFS to S3",
"ActionOnFailure": "CANCEL_AND_WAIT",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"s3-dist-cp",
"--src=/output",
"--dest=s3://{{ params.BUCKET_NAME }}/{{ params.s3_clean }}",
],
},
},
]
# helper function
def _local_to_s3(filename, key, bucket_name=BUCKET_NAME):
s3 = S3Hook()
s3.load_file(filename=filename, bucket_name=bucket_name, replace=True, key=key)
default_args = {
"owner": "airflow",
"depends_on_past": True,
"wait_for_downstream": True,
"start_date": datetime(2020, 10, 17),
"email": ["airflow@airflow.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
"spark_submit_airflow",
default_args=default_args,
schedule_interval="0 10 * * *",
max_active_runs=1,
)
start_data_pipeline = DummyOperator(task_id="start_data_pipeline", dag=dag)
data_to_s3 = PythonOperator(
dag=dag,
task_id="data_to_s3",
python_callable=_local_to_s3,
op_kwargs={"filename": local_data, "key": s3_data,},
)
script_to_s3 = PythonOperator(
dag=dag,
task_id="script_to_s3",
python_callable=_local_to_s3,
op_kwargs={"filename": local_script, "key": s3_script,},
)
# Add your steps to the EMR cluster
step_adder = EmrAddStepsOperator(
task_id="add_steps",
job_flow_id="j-***********", #cluster id
aws_conn_id="aws_default",
steps=SPARK_STEPS,
params={ # these params are used to fill the paramterized values in SPARK_STEPS json
"BUCKET_NAME": BUCKET_NAME,
"s3_data": s3_data,
"s3_script": s3_script,
"s3_clean": s3_clean,
},
dag=dag,
)
last_step = len(SPARK_STEPS) - 1
# wait for the steps to complete
step_checker = EmrStepSensor(
task_id="watch_step",
job_flow_id="j-*************",#cluster ID
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')["
+ str(last_step)
+ "] }}",
aws_conn_id="aws_default",
dag=dag,
)
end_data_pipeline = DummyOperator(task_id="end_data_pipeline", dag=dag)
start_data_pipeline >> [data_to_s3, script_to_s3] >> step_adder >> step_checker >> end_data_pipeline
解决方案
到目前为止,我认为我们还没有笔记本电脑的 emr 操作员。
为了运行预制的 emr 笔记本,您可以通过提供预制笔记本的路径来使用boto3
emr 客户端的方法start_notebook_execution 。
start_notebook_execution
制作一个自定义 python 运算符,在您的管道中执行并使用它。在此自定义 python 运算符中,您将需要一个 clusterID,在您的情况下,它是从EmrAddStepsOperator
(step_adder)返回的
def start_nb_execution(cluster_id,**context):
emr = boto3.client('emr', region_name=REGION)
start_nb = emr.start_notebook_execution(
EditorId="YOUR_NOTEBOOK_ID",
RelativePath="YOUR_NOTEBOOK_FILE_NAME",
ExecutionEngine={'Id': cluster_id, 'Type': 'EMR'},
ServiceRole='EMR_Notebooks_DefaultRole'
)
execution_id = start_nb['NotebookExecutionId']
print("Started an execution: " + execution_id)
return execution_id
将此函数称为 PythonOperator
start_nb_execution = PythonOperator(
task_id='start_nb_execution',
python_callable=start_execution,
provide_context=True,
op_kwargs={"cluster_id":step_adder},
)
现在您可以将其添加到管道中
start_data_pipeline >> [data_to_s3, script_to_s3] >> step_adder >> step_checker >> start_nb_execution >> end_data_pipeline
这里有一个很好的教程,其中也有笔记本的传感器示例
推荐阅读
- android - Flutter_Messaging 不在真正的 Android 手机上播放声音
- django - Django_Tables2 带过滤器的动态列
- r - 如何在 R 中为 ggplot geom_col 函数设置条形框不可见
- html - Sublime VS Code asp.net 格式不能用作 Visual Studio 格式选择
- angular - 禁用变量比实际值落后一步
- r - 在 tidyeval 函数中制作循环友好的公式界面
- youtube - YouTube 播放列表项 API 已发布在字段说明中
- swift - 使用 FetchedResults 填充 SwiftUI 选择器的最佳实践
- typescript - 如何在 Vue3 中使用 TypeScript 为 ref 定义类型(绑定模板)?
- databricks - 无法通过 Azure DataBricks 执行 CTE 查询