apache-spark - Pyspark 从列表中加载所有文件并并行打印每列的不同值
问题描述
我想并行运行所有这些 1)从列表中读取文件 2)从这些表的每一列中打印不同的值,我如何替换下面的 for 循环以实现并行目的?
alist =['File_A','File_B']
mainfolder ='hdfs://1.2.3.4/home'
def loadfile(filename):
file = spark.read_csv(mainfolder+str(filename))
column = file.columns
for i in column :
printcol(i)
def printcol (column) :
print (s_df.toPandas()[column].unique())
for i in alist:
loaddfile(i)
解决方案
您可以在一次迭代中获得所有列的实际不同值,而无需使用 pandas:
import os
from pyspark.sql import functions as F
def load_file(filename: str):
"""
Prints distinct values for all colmuns of the file `filename`
Example output:
Distinct values per column in file 'filename':
+---+----------+------------+
|c1 |c2 |c3 |
+---+----------+------------+
|a |a,b,c,d,e |1,2,3,5,7,8 |
+---+----------+------------+
"""
df = spark.read.csv(os.path.join(mainfolder, filename), header=True)
df = df.select(
*[F.array_join(F.collect_set(c), ",").alias(c) for c in df.columns]
)
print(f"Distinct values per column in file '{filename}':\n", df._jdf.showString(1, 0, False))
要为多个文件并行运行此函数,您可以使用ThreadPool
:
from multiprocessing.pool import ThreadPool
alist =['File_A','File_B']
with ThreadPool(len(alist) or 1) as p:
p.map(load_file, alist)
推荐阅读
- c - 关于 C 中宏的数据类型
- sql-server - 从 Powershell 插入 SQL Server 数据库时,为什么我的(大)字符串会被截断?
- sap - grant_privilege 表中的 SAP HANA 系统权限和应用程序权限
- ios - React-native run-ios 命令 - 找不到 iphone 模拟器
- asp.net-mvc - 资产作为目录连接/硬链接 - 更改跟踪无法正常工作
- python - 使用chdir后找不到任何文件或目录
- vba - 在我已经被另一个过滤后按属性过滤块
- amazon-web-services - 使用 cli 上传到 s3 时添加 content-disposition 元数据
- android - Flutter google play 说应用程序已在调试模式下登录
- javascript - 使用切换按钮删除容器