首页 > 解决方案 > spark execution - 在驱动程序和执行程序中访问文件内容的单一方法

问题描述

根据这个问题 - pyspark 中的 --files 选项不起作用 sc.addFiles 选项应该适用于访问驱动程序和执行程序中的文件。但我无法让它对执行者起作用

测试.py

from pyspark import SparkContext, SparkConf
from pyspark import SparkFiles

conf = SparkConf().setAppName("File access test")
sc = SparkContext(conf=conf)
sc.addFile("file:///home/hadoop/uploads/readme.txt")

with open(SparkFiles.get('readme.txt')) as test_file:
    lines = [line.strip() for line in test_file]
print(lines) # this works
print('********************')
lines = sc.textFile(SparkFiles.get('readme.txt')) # run in the executors. this errors
print(lines.collect())

命令

spark-submit --master yarn --deploy-mode client test.py

readme.txt/home/hadoop/uploads主节点下

我在日志中看到以下内容

21/01/27 15:03:30 INFO SparkContext: Added file file:///home/hadoop/uploads/readme.txt at spark://ip-10-133-70-121.sysco.net:44401/files/readme.txt with timestamp 1611759810247
21/01/27 15:03:30 INFO Utils: Copying /home/hadoop/uploads/readme.txt to /mnt/tmp/spark-f929a1e2-e7e8-401e-8e2e-dcd1def3ee7b/userFiles-fed4d5bf-3e31-4e1e-b2ae-3d4782ca265c/readme.txt

所以它将它复制到一些火花目录并挂载(我对火花世界还是比较陌生)。如果我使用 --files 标志并传递文件,它还会将其复制到执行程序可以读取的 hdfs:// 路径。

这是因为 addFile 要求文件也存在于本地的执行程序上。目前在readme.txt主节点上。如果是这样,有没有办法将它从主服务器传播到执行器。

我正在尝试找到一种访问文件的统一方式。我能够将文件从本地机器推送到主节点。然而,在火花代码中,我想要一种访问文件内容的单一方式,无论是驱动程序还是执行程序

目前,为了让代码的执行程序部分工作,我还必须在 --files 标志(spark-submit --master yarn --deploy-mode client --files uploads/readme.txt test.py)中传递文件并使用类似以下的内容

path = f'hdfs://{sc.getConf().get("spark.driver.host")}:8020/user/hadoop/.sparkStaging/{sc.getConf().get("spark.app.id")}/readme.txt'
lines = sc.textFile(path)

标签: apache-sparkpysparkamazon-emr

解决方案


您可以使用 --archives 在驱动程序和执行程序之间共享文件。

将您的存档以以下格式保存在 s3 中。

references.zip 
 |_file1.txt
 |_file2.txt
 |_reference.ini


spark-submit --deploy-mode cluster --master yarn --archives s3://bucket/references.zip#references s3://bucket/spark_script.py

在此处使用#references 将解压缩references/ 目录下的所有文件。

您可以在执行程序/驱动程序中访问这样的文件:

with open('references/file1.txt') as f:
    data1 = f.read()

config = configparser.ConfigParser()
config.read('references/reference.ini')

推荐阅读