首页 > 解决方案 > pyspark 将 RDD 转换回 DF

问题描述

我正在尝试读取一个 csv 文件,使用迭代器对其进行分区,修改 Row() 数据并将其返回。我的df1作品,因为我可以读取文件中的数据。我的辅助方法用于修改每个 pyspark 行中的数据(不能直接修改行数据)。但是,即使我尝试过ordf2也不起作用。这是代码:toDF()createDataFrame()

COL = 1
BATCH_LIMIT = 100
def ops(iterator):
    records = list(iterator)
    processed_records = []
    batch = []
    for i, row in enumerate(records):
        # Skip column with empty data
        if row[COL] == "":
            continue
        batch.append(row)
        if i == len(records) - 1 or len(batch) == BATCH_LIMIT:
            for j, item in enumerate(batch):
                converted_obj = convert_to_obj(item)
                new_item = modify_obj(converted_obj)
                batch[j] = Row(**OrderedDict(sorted(new_item.items())))
            # print("\n\n========BATCH==========\n\n")
            # print(batch)
            processed_records.extend(batch)
            # Reset batch
            batch = []
    # print(processed_records)
    return [processed_records]

df1 = spark.read.option("delimiter", ",").option("header", True).csv(path)
print("\n\n==== DF1 =====\n\n")
df1.show(10)
df2 = df1.rdd.mapPartitions(ops).cache().toDF()
print("\n\n===== DF2 =====\n\n")
df2.show(10)

标签: pythonpyspark

解决方案


推荐阅读