python - pyspark:将一列数组解压缩成多列更有效?
问题描述
我有一列可以像这样创建的数组
df = spark.CreateDataFrame(["[{\"key\":1}, {\"key\":2}"], "tmp")
from pyspark.sql import functions as F
df = df.\
withColumn("tmp", F.from_json(in_col_name, "array<string>")).\
cache()
# obtain the maximum number of components in the array
max_arr_len = df.select(F.size(tmp)).rdd.max()[0]
for i in range(max_arr_len )
df = df.withColumn("tmp"+str(i), F.col("tmp").getItem(i))
想象一下,如果我在 1 亿行上运行它。我认为循环使用getItem
是低效的。有没有办法max_arr_len
一次获得所有列?
解决方案
在这种情况下,实际上循环并不是那么低效。getItem
是一种惰性转换,因此 Spark 能够优化您的代码并将所有循环步骤作为一个步骤执行。查看使用的计划df.explain()
:
== Physical Plan ==
*(1) Project [in_col_name#820, tmp#822, tmp#822[0] AS tmp0#945, tmp#822[1] AS tmp1#949]
+- InMemoryTableScan [in_col_name#820, tmp#822]
+- InMemoryRelation [in_col_name#820, tmp#822], StorageLevel(disk, memory, deserialized, 1 replicas)
+- Project [in_col_name#820, from_json(ArrayType(StructType(StructField(key,IntegerType,true)),true), in_col_name#820, Some(Etc/UTC)) AS tmp#822]
+- *(1) Scan ExistingRDD[in_col_name#820]
您会注意到所有元素提取都在同一行执行:
tmp#822[0] AS tmp0#945, tmp#822[1] AS tmp1#949
推荐阅读
- c++ - 线程和互斥锁的使用有什么问题
- python - Python windows app UI自动化测试:如何向winappdriver发送鼠标和键盘事件?
- angular - 为什么变量不更新?
- authentication - .NET CORE - JWT Auth - 用户是否有效/活跃?
- vaadin - 如何在 vaadin 流程中禁用 ComboBox 中的清除按钮?
- java - Java replaceAll 不起作用
- c# - 如何从 UserControl.ascx.cs SharePoint 访问解决方案列表项
- typescript - 如何使用ngrx和打字稿加入具有相等价值的属性列表项
- arrays - 如何使用codeigniter中的会话动态地将新数组添加到现有数组中
- sql-server - Kubernetes 与 SQL Server 建立连接时发生与网络相关或特定于实例的错误