首页 > 解决方案 > pyspark 中的 dataframe.write.csv 非常慢

问题描述

我有一个数据框,它是由行联合创建的,

def group_acc_num_and_make_metrics(acc_list: List[str], data_frame: DataFrame):
    for acc_no in acc_list:
        rows = rows.union(metrics_df(data_frame.filter(data_frame.SOURCE_ACCOUNT_NBR == acc_no), acc_no))
    return row

metrics_df 创建了一个包含 109 列的新数据框,因此该函数的返回类型是一个包含 109 列和一些行的数据框

现在,当我想将此数据帧保存到 csv 时,需要花费大量时间,此数据帧中的行数仅为 70,将其写入 csv 文件大约需要 10 分钟。生成的分区csv文件的数量也是70个。重新分区/合并也是一个非常耗时的操作。以下是将其保存到 csv 的代码

def split_test_train(data_frame: DataFrame, number_of_rows):
    golden_ratio = 0.1
    test_rows = int(number_of_rows * golden_ratio)
    train_rows = number_of_rows - test_rows
    test_x_df = data_frame.sample(golden_ratio)
    train_x_df = data_frame.sample(1-golden_ratio)
    test_y_df = test_x_df.select(test_x_df.san, test_x_df.is_fraud)
    train_y_df = train_x_df.select(test_x_df.san, test_x_df.is_fraud)
    def_path = data_output_path
    path = def_path + str(int(datetime.now().timestamp() * 1000))
    test_x_df.drop(test_x_df.is_fraud).write.format('com.databricks.spark.csv').save(path + "/test_x")
    train_x_df.drop(test_x_df.is_fraud).write.format('com.databricks.spark.csv').save(path + "/train_x")
    test_y_df.write.format('com.databricks.spark.csv').save(path + "test_y")
    train_y_df.write.format('com.databricks.spark.csv').save(path + "/train_y")

标签: apache-sparkpysparkapache-spark-sql

解决方案


推荐阅读