首页 > 解决方案 > Pyspark 从列表中加载所有文件并并行打印每列的不同值

问题描述

我想并行运行所有这些 1)从列表中读取文件 2)从这些表的每一列中打印不同的值,我如何替换下面的 for 循环以实现并行目的?

alist =['File_A','File_B']

mainfolder ='hdfs://1.2.3.4/home'

def loadfile(filename):
   file = spark.read_csv(mainfolder+str(filename))
   column  =  file.columns
   for i in column :
       printcol(i)

def printcol  (column) :
    print (s_df.toPandas()[column].unique())

for i in alist:
     loaddfile(i)

标签: apache-sparkpysparkapache-spark-sqlpyspark-dataframes

解决方案


您可以在一次迭代中获得所有列的实际不同值,而无需使用 pandas:

import os    
from pyspark.sql import functions as F

def load_file(filename: str):
    """
    Prints distinct values for all colmuns of the file `filename`
    Example output:
        Distinct values per column in file 'filename':
        +---+----------+------------+
        |c1 |c2        |c3          |
        +---+----------+------------+
        |a  |a,b,c,d,e |1,2,3,5,7,8 |
        +---+----------+------------+
    """

    df = spark.read.csv(os.path.join(mainfolder, filename), header=True)
    df = df.select(
        *[F.array_join(F.collect_set(c), ",").alias(c) for c in df.columns]
    )

    print(f"Distinct values per column in file '{filename}':\n", df._jdf.showString(1, 0, False))

要为多个文件并行运行此函数,您可以使用ThreadPool

from multiprocessing.pool import ThreadPool

alist =['File_A','File_B']

with ThreadPool(len(alist) or 1) as p:
    p.map(load_file, alist)

推荐阅读