首页 > 解决方案 > EMR 从 Git 分支安装 Python 包

问题描述

我通常通过 Spark 的install_pypi_package方法在 EMR 中安装包。这限制了我可以从中安装软件包的位置。如何从特定的 GitHub 分支安装包?有没有办法通过该install_pypi_package方法做到这一点?

标签: pythonapache-sparkpipamazon-emr

解决方案


如果您有权访问集群创建步骤,则可以使用来自 github 的 pip 在bootstrap安装包。(install_pypi_package需要,因为当时集群已经在运行,并且包可能无法在所有节点上解析)

安装先前的集群正在运行:

使用 pip 从 github 进行引导和安装的一个简单示例(例如,使用 download.sh 引导文件)是

#!/bin/bash
sudo pip install <you-repo>.git 

然后你可以在引导程序中使用这个 bash 作为

aws emr create-cluster --name "测试集群" --bootstrap-actions Path="s3://elasticmapreduce/bootstrap-actions/download.sh"

或者您可以在引导程序中使用 pip3

sudo pip3 install <you-repo>.git 

或者只是克隆它并使用 setup.py 文件在 EMR 上本地构建它

#!/bin/bash
git clone <your-repo>.git
sudo python setup.py install

集群运行后(复杂,不推荐)

如果您仍然想在集群已经运行时安装或构建自定义包,AWS在这里有一些解释,用于AWS-RunShellScript在所有核心节点上安装包。它说

(I) 将包安装到主节点,(通过 shell 或 jupyter notebook 在运行集群上执行 pip install )

(II) 在 EMR 上本地运行以下脚本,为此您传递cluster-id和 boostrap 脚本路径(例如download.sh上面)作为参数。

import argparse
import time
import boto3


def install_libraries_on_core_nodes(
        cluster_id, script_path, emr_client, ssm_client):
    """
    Copies and runs a shell script on the core nodes in the cluster.

    :param cluster_id: The ID of the cluster.
    :param script_path: The path to the script, typically an Amazon S3 object URL.
    :param emr_client: The Boto3 Amazon EMR client.
    :param ssm_client: The Boto3 AWS Systems Manager client.
    """
    core_nodes = emr_client.list_instances(
        ClusterId=cluster_id, InstanceGroupTypes=['CORE'])['Instances']
    core_instance_ids = [node['Ec2InstanceId'] for node in core_nodes]
    print(f"Found core instances: {core_instance_ids}.")

    commands = [
        # Copy the shell script from Amazon S3 to each node instance.
        f"aws s3 cp {script_path} /home/hadoop",
        # Run the shell script to install libraries on each node instance.
        "bash /home/hadoop/install_libraries.sh"]
    for command in commands:
        print(f"Sending '{command}' to core instances...")
        command_id = ssm_client.send_command(
            InstanceIds=core_instance_ids,
            DocumentName='AWS-RunShellScript',
            Parameters={"commands": [command]},
            TimeoutSeconds=3600)['Command']['CommandId']
        while True:
            # Verify the previous step succeeded before running the next step.
            cmd_result = ssm_client.list_commands(
                CommandId=command_id)['Commands'][0]
            if cmd_result['StatusDetails'] == 'Success':
                print(f"Command succeeded.")
                break
            elif cmd_result['StatusDetails'] in ['Pending', 'InProgress']:
                print(f"Command status is {cmd_result['StatusDetails']}, waiting...")
                time.sleep(10)
            else:
                print(f"Command status is {cmd_result['StatusDetails']}, quitting.")
                raise RuntimeError(
                    f"Command {command} failed to run. "
                    f"Details: {cmd_result['StatusDetails']}")


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('cluster_id', help="The ID of the cluster.")
    parser.add_argument('script_path', help="The path to the script in Amazon S3.")
    args = parser.parse_args()

    emr_client = boto3.client('emr')
    ssm_client = boto3.client('ssm')

    install_libraries_on_core_nodes(
        args.cluster_id, args.script_path, emr_client, ssm_client)


if __name__ == '__main__':
    main()

推荐阅读