首页 > 解决方案 > 使用 Spark 读取文本文件数据并使用逗号分割数据 - python

问题描述

我有以下格式的数据。

abc, x1, x2, x3  
def, x1, x3, x4,x8,x9   
ghi, x7, x10, x11  

我想要的输出是

0,abc, [x1, x2, x3]  
1,def, [x1, x3, x4,x8,x9]  
2,ghi, [x7, x10, x11]

标签: pythonapache-sparkpyspark

解决方案


您的数据不是 CSV 格式。CSV 表示具有固定模式的逗号分隔文本文件。您的数据的 CSV 将是:

abc,x1,x2,x3,,
def,x1,x3,x4,x8,x9
ghi,x7,x10,x11,,

请注意第 1 行和第 3 行中的尾随逗号,它们不在您的数据中。

由于您有一个不是 CSV 的文本文件,因此在 Spark 中获取所需架构的方法是在 Python 中读取整个文件,解析为您想要的内容,然后使用spark.crateDataFrame(). 或者,如果您在一个目录中有多个这样的文件,请使用SparkContext.wholeTextFiles,然后使用flatMap您的解析功能。

假设你已经做了类似open("Your File.txt").readlines的事情,剩下的很简单:

import re
from pyspark.sql import *

lines = [
  "abc, x1, x2, x3",
  "def, x1, x3, x4,x8,x9",
  "ghi, x7, x10, x11"
]

split = re.compile("\s*,\s*")
Line = Row("id", "first", "rest")

def parse_line(id, line):
  tokens = split.split(line.strip)
  return Line(id, tokens[0], tokens.pop(0))

def parse_lines(lines):
  return [parse_line(i, x) for i,x in enumerate(lines)]

spark.createDataFrame(parse_lines(lines))

推荐阅读