首页 > 解决方案 > 如何将所有元素放入pyspark中的单列?

问题描述

csv 文件如下面的第 1 列用于作者姓名,其余列用于作者书籍。像这样的数据

大卫,c-first,c++-first,java_2_test,............ 100

史密斯,c_in_smit,d_programming_smit .............120

我需要将第一个列放在作者姓名中,其他放在书列中。

from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
# Load a text file and convert each line to a Row.
lines = sc.textFile("people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(author=p[0], author=(p[1]+ "," + p[2])))
# Infer the schema, and register the DataFrame as a table.
schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")

但问题是作者书可以是一个或多个。

标签: apache-sparkpyspark

解决方案


您可以尝试该split方法的其他实现,如下所示:

parts = lines.map(lambda l: l.split(',', maxsplit=1))
people = parts.map(lambda p: Row(author=p[0], books=(p[1])))

在这里,它只分裂了one一次。所以总的结果是maxsplit+1

有关split参考文档的更多信息。

-:编辑:-

如果你想为每本书单独的行,如下所示:

+-------+-----+
| author|books|
+-------+-----+
|author1|book1|
|author1|book2|
|author1|book3|
|author2|book4|
|author2|book5|
|author2|book6|
+-------+-----+

您可以如下更改您的代码,

def create_rows(temp_data):
    rows = []
    for book in data[1].split(','):
        rows.append(Row(author=data[0], books=book))
    return rows

parts = lines.map(lambda l: l.split(",",maxsplit=1)).collect()

people = []
for data in parts:
    people.extend(create_rows(data))

schemaPeople = spark.createDataFrame(people)
schemaPeople.show()

推荐阅读