python - 使用 Spark 读取文本文件数据并使用逗号分割数据 - python
问题描述
我有以下格式的数据。
abc, x1, x2, x3
def, x1, x3, x4,x8,x9
ghi, x7, x10, x11
我想要的输出是
0,abc, [x1, x2, x3]
1,def, [x1, x3, x4,x8,x9]
2,ghi, [x7, x10, x11]
解决方案
您的数据不是 CSV 格式。CSV 表示具有固定模式的逗号分隔文本文件。您的数据的 CSV 将是:
abc,x1,x2,x3,,
def,x1,x3,x4,x8,x9
ghi,x7,x10,x11,,
请注意第 1 行和第 3 行中的尾随逗号,它们不在您的数据中。
由于您有一个不是 CSV 的文本文件,因此在 Spark 中获取所需架构的方法是在 Python 中读取整个文件,解析为您想要的内容,然后使用spark.crateDataFrame()
. 或者,如果您在一个目录中有多个这样的文件,请使用SparkContext.wholeTextFiles
,然后使用flatMap
您的解析功能。
假设你已经做了类似open("Your File.txt").readlines
的事情,剩下的很简单:
import re
from pyspark.sql import *
lines = [
"abc, x1, x2, x3",
"def, x1, x3, x4,x8,x9",
"ghi, x7, x10, x11"
]
split = re.compile("\s*,\s*")
Line = Row("id", "first", "rest")
def parse_line(id, line):
tokens = split.split(line.strip)
return Line(id, tokens[0], tokens.pop(0))
def parse_lines(lines):
return [parse_line(i, x) for i,x in enumerate(lines)]
spark.createDataFrame(parse_lines(lines))
推荐阅读
- halide - Halide:如何避免不必要的断言
- sql - 在算术表达式中使用多个表时如何防止在 SQL 中除以零
- c# - 修复数字接近 2^5 和 2^7 的不精确问题
- java - 使用递归使用新公式将数字提高到幂
- python - 一个 Django DetailView 中的两个模型并按它们之间的关系进行过滤
- android - 是否可以在没有 sdk 的情况下从其他应用程序启动 Paytm 进行付款?如果是,如何?
- html - 我应该为元素使用什么选择器,这取决于伪元素的悬停行为?
- oracle - Oracle - 生成 Package.Procedure 调用
- javascript - 如何使用javascript将数据绑定到多个元素
- python-3.x - 本网站(nseindia.com)的动态数据未导入 python pandas