python - 如何在 pyspark 中读取大的 zip 文件
问题描述
我在 s3 上有 n 个 .zip 文件,我想处理并从中提取一些数据。zip 文件包含一个 json 文件。在 Spark 中,我们可以读取 .gz 文件,但我没有找到任何方法来读取 .zip 文件中的数据。有人可以帮助我如何使用 python 在 spark 上处理大型 zip 文件。我遇到了一些像 newAPIHadoopFile 这样的选项,但没有得到任何运气,也没有找到在 pyspark 中实现它们的方法。请注意 zip 文件大于 1G,有些也是 20G。
下面是我使用的代码:
import zipfile
import io
file_name = "s3 file path for zip file"
def zip_extract(x):
in_memory_data = io.BytesIO(x[1])
file_obj = zipfile.ZipFile(in_memory_data, "r")
files = [i for i in file_obj.namelist()]
return dict(zip(files, [file_obj.open(file).read() for file in files]))
zips = sc.binaryFiles(file_name)
files_data = zips.map(zip_extract)
但由于以下原因,它失败了。我使用的实例是 r42x.large。
Exit code: 52
Stack trace: ExitCodeException exitCode=52:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0
解决方案
我确实以块的形式阅读了 zip 文件的内容,并使用 spark 处理了这些块。这对我有用,并帮助我阅读大小超过 10G 的 zip 文件。下面是示例集:
max_data_length=10000
z = zipfile.ZipFile(zip_file)
data = []
counter=1
with z.open(z.infolist()[0]) as f:
line_counter=0
for line in f:
# Append file contents to list
data.append(line)
line_counter=line_counter+1
# Reset counters if record count hit max-data-length threshold
# Create spark dataframes
if not line_counter % max_data_length:
# Spark processing like:
df_rdd = spark.sparkContext.parallelize(data)
# Reset Counters and data-list
counter=counter+1
line_counter=0
data= []
推荐阅读
- android - 如何在listview android中加粗特定单词
- verilog - 如果我没有在 Verilog/SystemVerilog 中为未知状态 x 指定大小和基本格式会怎样?
- google-analytics - bigquery 中的 GA 未显示正确结果
- javascript - 原语上的 isPrototypeOf() 函数
- c# - 在 WEB API 应用程序中,当 Connect with Excel 出现错误时,我只是创建了与 excel 的连接,当尝试打开它时出现错误
- javascript - HTML 表单输入标签未捕获 iOS (iPhone / iPad) 上的所有文本字符
- python - Python 3 中的元素未从列表中删除
- c++ - 关于字符串到字符的转换*
- java - Java 中的 OdataV3 实现
- functional-programming - 功能性语言本质上很难与非功能性语言建立接口,这是真的吗