python - pyspark 将 RDD 转换回 DF
问题描述
我正在尝试读取一个 csv 文件,使用迭代器对其进行分区,修改 Row() 数据并将其返回。我的df1
作品,因为我可以读取文件中的数据。我的辅助方法用于修改每个 pyspark 行中的数据(不能直接修改行数据)。但是,即使我尝试过ordf2
也不起作用。这是代码:toDF()
createDataFrame()
COL = 1
BATCH_LIMIT = 100
def ops(iterator):
records = list(iterator)
processed_records = []
batch = []
for i, row in enumerate(records):
# Skip column with empty data
if row[COL] == "":
continue
batch.append(row)
if i == len(records) - 1 or len(batch) == BATCH_LIMIT:
for j, item in enumerate(batch):
converted_obj = convert_to_obj(item)
new_item = modify_obj(converted_obj)
batch[j] = Row(**OrderedDict(sorted(new_item.items())))
# print("\n\n========BATCH==========\n\n")
# print(batch)
processed_records.extend(batch)
# Reset batch
batch = []
# print(processed_records)
return [processed_records]
df1 = spark.read.option("delimiter", ",").option("header", True).csv(path)
print("\n\n==== DF1 =====\n\n")
df1.show(10)
df2 = df1.rdd.mapPartitions(ops).cache().toDF()
print("\n\n===== DF2 =====\n\n")
df2.show(10)
解决方案
推荐阅读
- r - 如何使用 R 向另一个程序发送命令?
- swift - UIBezierPath 的快速替代方案
- gremlin - 在java中将属性添加到已经存在的顶点
- python - 在 Python 中将 Json Dict 对象转换为 DataFrame
- asp.net - 从另一个控制器 (Web api) 将新记录发布到 AspNetUsers
- r - 对多个列表元素应用一组指令
- angular - 可视化或以图形表示我的应用程序中的路线?
- powershell - 仅 PowerShell Get-Childitem 顶级文件夹
- python - 循环遍历 pandas 数据框,将收件人和数据框插入 Outlook 电子邮件
- reactjs - 提前输入,反应表带