首页 > 解决方案 > 通过 python 代码在 jupyter notebook 中创建一个 EMR 步进函数

问题描述

现在,我将许多小的单个 spark parquet 文件从 EMR 传输到 S3。我目前这样做的方式是在集群步骤 UI 中创建一个步骤函数,这是 AWS 推荐的,下面是一个示例

JAR location :command-runner.jar
Main class :None
Arguments :/usr/bin/s3-dist-cp --src=/refoutput --dest=s3://***-us-east-1/bens-flattened-step/refs
Action on failure:Continue

我实际上需要根据正在上传的文件更改 --dest 参数的一部分。我想要做的是创建相同的步进函数,除了可以放入我的 Jupyter 笔记本的 python 代码,而不是使用 UI 执行此操作。这可能吗?此外,spark.parquet.write('s3 path') 在有很多小文件时会导致 S3 挂起,因此这不是一个可行的解决方案。

标签: pythonamazon-web-servicesapache-sparkamazon-emraws-step-functions

解决方案


在 Notebook 中,您可以使用 boto3 列出集群并使用 Cluster-ID,您可以提交 spark 步骤。

第一的。要安装 boto3,您可以将 pip 包安装为

sc.install_pypi_package("boto3")
#You can check it by using sc.list_packages()

使用 boto3,您可以列出集群 ID

import boto3

boto3 = boto3.session.Session(region_name='us-east-2')
emr = boto3.client('emr')

page_iterator = emr.get_paginator('list_clusters').paginate(
    ClusterStates=['RUNNING','WAITING']
)

for page in page_iterator:
    for item in page['Clusters']:
        print(item['Id'])

或更简单

response = emr.list_clusters(
    ClusterStates=[
        'RUNNING','WAITING',
    ],
   
)
print(response)

您也可以过滤集群,例如使用日期参数list_cluster

CreatedAfter=datetime(2019, 11, 11),
CreatedBefore=datetime(2020, 1, 1)

手头没有cluster-id,您可以提交步骤add_job_flow_steps

例如,一个步骤可以是

newsteps =[
                 {
            'Name': 'AWS S3 Copy',
            'ActionOnFailure': 'CONTINUE',
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args':["aws","s3","cp","s3://XXX/","/home/hadoop/copy/","--recursive"],
                        }
              }
              ]

使用您检索到的集群 ID 添加步骤(例如 j-xxxxxxxxxxx)

step_response = emr.add_job_flow_steps(JobFlowId="j-xxxxxxxxxxxxxxx", Steps=newsteps)

step_ids = step_response['StepIds']

print("Step IDs:", step_ids)

推荐阅读