python - 通过 python 代码在 jupyter notebook 中创建一个 EMR 步进函数
问题描述
现在,我将许多小的单个 spark parquet 文件从 EMR 传输到 S3。我目前这样做的方式是在集群步骤 UI 中创建一个步骤函数,这是 AWS 推荐的,下面是一个示例
JAR location :command-runner.jar
Main class :None
Arguments :/usr/bin/s3-dist-cp --src=/refoutput --dest=s3://***-us-east-1/bens-flattened-step/refs
Action on failure:Continue
我实际上需要根据正在上传的文件更改 --dest 参数的一部分。我想要做的是创建相同的步进函数,除了可以放入我的 Jupyter 笔记本的 python 代码,而不是使用 UI 执行此操作。这可能吗?此外,spark.parquet.write('s3 path') 在有很多小文件时会导致 S3 挂起,因此这不是一个可行的解决方案。
解决方案
在 Notebook 中,您可以使用 boto3 列出集群并使用 Cluster-ID,您可以提交 spark 步骤。
第一的。要安装 boto3,您可以将 pip 包安装为
sc.install_pypi_package("boto3")
#You can check it by using sc.list_packages()
使用 boto3,您可以列出集群 ID
import boto3
boto3 = boto3.session.Session(region_name='us-east-2')
emr = boto3.client('emr')
page_iterator = emr.get_paginator('list_clusters').paginate(
ClusterStates=['RUNNING','WAITING']
)
for page in page_iterator:
for item in page['Clusters']:
print(item['Id'])
或更简单
response = emr.list_clusters(
ClusterStates=[
'RUNNING','WAITING',
],
)
print(response)
您也可以过滤集群,例如使用日期参数list_cluster
CreatedAfter=datetime(2019, 11, 11),
CreatedBefore=datetime(2020, 1, 1)
手头没有cluster-id,您可以提交步骤add_job_flow_steps
例如,一个步骤可以是
newsteps =[
{
'Name': 'AWS S3 Copy',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args':["aws","s3","cp","s3://XXX/","/home/hadoop/copy/","--recursive"],
}
}
]
使用您检索到的集群 ID 添加步骤(例如 j-xxxxxxxxxxx)
step_response = emr.add_job_flow_steps(JobFlowId="j-xxxxxxxxxxxxxxx", Steps=newsteps)
step_ids = step_response['StepIds']
print("Step IDs:", step_ids)