apache-spark - 如何将数组拆分为块并找到块的总和并将输出作为数组存储在 pyspark 中
问题描述
我有一个数据框,如下所示:
+-----+------------------------+
|Index| finalArray |
+-----+------------------------+
|1 |[0, 2, 0, 3, 1, 4, 2, 7]|
|2 |[0, 4, 4, 3, 4, 2, 2, 5]|
+-----+------------------------+
我想将数组分成 2 个块,然后找到每个块的总和并将结果数组存储在 finalArray 列中。如下所示:
+-----+---------------------+
|Index| finalArray |
+-----+---------------------+
|1 |[2, 3, 5, 9] |
|2 |[4, 7, 6, 7] |
+-----+---------------------+
我可以通过创建 UDF 来做到这一点,但要寻找更好和优化的方法。如果我可以使用 withColumn 并传递 flagArray 来处理它,而不必编写 UDF,则最好。
@udf(ArrayType(DoubleType()))
def aggregate(finalArray,chunkSize):
n = int(chunkSize)
aggsum = []
final = [finalArray[i * n:(i + 1) * n] for i in range((len(finalArray) + n - 1) // n )]
for item in final:
agg = 0
for j in item:
agg += j
aggsum.append(agg)
return aggsum
我无法在 UDF 中使用以下表达式,因此我使用了循环
[sum(finalArray[x:x+2]) for x in range(0, len(finalArray), chunkSize)]
解决方案
对于 spark 2.4+,您可以尝试sequence + transform:
from pyspark.sql.function import expr
df = spark.createDataFrame([
(1, [0, 2, 0, 3, 1, 4, 2, 7]),
(2, [0, 4, 4, 3, 4, 2, 2, 5])
], ["Index", "finalArray"])
df.withColumn("finalArray", expr("""
transform(
sequence(0,ceil(size(finalArray)/2)-1),
i -> finalArray[2*i] + ifnull(finalArray[2*i+1],0))
""")).show(truncate=False)
+-----+------------+
|Index|finalArray |
+-----+------------+
|1 |[2, 3, 5, 9]|
|2 |[4, 7, 6, 7]|
+-----+------------+
对于任何 N 的块大小,使用聚合函数进行小计:
N = 3
sql_expr = """
transform(
/* create a sequence from 0 to number_of_chunks-1 */
sequence(0,ceil(size(finalArray)/{0})-1),
/* iterate the above sequence */
i ->
/* create a sequence from 0 to chunk_size-1
calculate the sum of values containing every chunk_size items by their indices
*/
aggregate(
sequence(0,{0}-1),
0L,
(acc, y) -> acc + ifnull(finalArray[i*{0}+y],0)
)
)
"""
df.withColumn("finalArray", expr(sql_expr.format(N))).show()
+-----+----------+
|Index|finalArray|
+-----+----------+
| 1| [2, 8, 9]|
| 2| [8, 9, 7]|
+-----+----------+
推荐阅读
- sql - 如何在 Teradata SQL 中的某些特定值之间仅选择字符串?
- botframework - 在 Ms 团队 composeExtensions.commands 中编辑描述
- django - Django 一个自定义模型字段到两个数据库列
- python - 如何控制后台脚本(python)执行某些任务?(最好使用终端命令)
- javascript - 切换到另一个视频时如何使视频停止播放?
- html - 如何从 FontAwesome 制作具有相同宽度的图标
- python-3.x - 使用 Pymodbus,我想将线圈状态绑定到我的服务器中的 GPIO 引脚,而不是来自客户端
- python - 我的 librosa MFCC 输出是否正确?我想我在使用 librosa MFCC 时得到了错误的帧数
- ansible - 为所有 ansible 相关命令使用密钥存储
- java - Amazon RDS Parquet 文件到本地 MySQL 数据库