python - pyspark 从数据帧迭代 N 行到每次执行
问题描述
def fun_1(csv):
# returns int[] of length = Number of New Lines in String csv
def fun_2(csv): # My WorkArround to Pass one CSV Line at One Time
return fun_1(csv)[0]
输入数据框是 df
+----+----+-----+
|col1|col2|CSVs |
+----+----+-----+
| 1| a|2,0,1|
| 2| b|2,0,2|
| 3| c|2,0,3|
| 4| a|2,0,1|
| 5| b|2,0,2|
| 6| c|2,0,3|
| 7| a|2,0,1|
+----+----+-----+
下面是一个有效但需要很长时间的代码片段
from pyspark.sql.functions import udf
from pyspark.sql import functions as sf
funudf = udf(fun_2) # wish it could be fun_1
df=df.withColumn( 'pred' , funudf(sf.col('csv')))
fun_1
,存在内存问题,一次最多只能处理 50000 行。我希望使用 funudf = udf(fun_1)
. 因此,如何将 PySpark DF 拆分为 50000 行的段,调用funudf ->fun_1
. 输出有两个列,来自输入的 'col1' 和 'funudf return value' 。
解决方案
groupByKey
通过使用RDD API 中公开的方法,您可以实现强制 PySpark 对固定批次的行进行操作的预期结果。使用groupByKey
将强制 PySpark 将单个密钥的所有数据洗牌到单个执行程序。
注意:出于同样的原因groupByKey
,由于网络成本,通常不鼓励使用。
战略:
- 添加一列,将您的数据分组到所需的批次大小,然后
groupByKey
- 定义一个重现 UDF 逻辑的函数(并返回一个 id 以供稍后加入)。这操作
pyspark.resultiterable.ResultIterable
,结果groupByKey
。将功能应用到您的组使用mapValues
- 将生成的 RDD 转换为 DataFrame 并重新加入。
例子:
# Synthesize DF
data = {'_id': range(9), 'group': ['a', 'b', 'c', 'a', 'b', 'c', 'a', 'b', 'c'], 'vals': [2.0*i for i in range(9)]}
df = spark.createDataFrame(pd.DataFrame(data))
df.show()
##
# Step - 1 Convert to rdd and groupByKey to force each group to separate executor
##
kv = df.rdd.map(lambda r: (r.group, [r._id, r.group, r.vals]))
groups = kv.groupByKey()
##
# Step 2 - Calulate function
##
# Dummy function taking
def mult3(ditr):
data = ditr.data
ids = [v[0] for v in data]
vals = [3*v[2] for v in data]
return zip(ids, vals)
# run mult3 and flaten results
mv = groups.mapValues(mult3).map(lambda r: r[1]).flatMap(lambda r: r) # rdd[(id, val)]
##
# Step 3 - Join results back into base DF
##
# convert results into a DF and join back in
schema = t.StructType([t.StructField('_id', t.LongType()), t.StructField('vals_x_3', t.FloatType())])
df_vals = spark.createDataFrame(mv, schema)
joined = df.join(df_vals, '_id')
joined.show()
>>>
+---+-----+----+
|_id|group|vals|
+---+-----+----+
| 0| a| 0.0|
| 1| b| 2.0|
| 2| c| 4.0|
| 3| a| 6.0|
| 4| b| 8.0|
| 5| c|10.0|
| 6| a|12.0|
| 7| b|14.0|
| 8| c|16.0|
+---+-----+----+
+---+-----+----+--------+
|_id|group|vals|vals_x_3|
+---+-----+----+--------+
| 0| a| 0.0| 0.0|
| 7| b|14.0| 42.0|
| 6| a|12.0| 36.0|
| 5| c|10.0| 30.0|
| 1| b| 2.0| 6.0|
| 3| a| 6.0| 18.0|
| 8| c|16.0| 48.0|
| 2| c| 4.0| 12.0|
| 4| b| 8.0| 24.0|
+---+-----+----+--------+
推荐阅读
- python - 如何使用二维数组作为输入?预期 flatten_input 有 3 个维度,但得到了形状为 (1, 1, 2, 5) 的数组
- ios - UITableView 单元格布局与 iPhone 12 pro 和 promax 的区别
- python - 如何改进 Python 中列表的模式匹配
- java - 如何在 HDFS 中找到文件之间的对称差异?
- datastage - 是否有关于 Datastage Designer/Directory 命令行选项的文档?
- google-cloud-platform - 将自定义域指向没有 Firebase 的 GCP 云功能
- javascript - 如何读取 json 文件并在单击按钮后写入 DOM?
- svelte - 在苗条中调整大小的平滑容器
- javascript - 从 C# 执行 javascript
- javascript - 使用 ejs 生成由选项选择元素触发的动态 html 表单的最佳方法是什么?