apache-spark - 如何使用聚合的输出作为 withColumn 的输入
问题描述
我正在尝试使用包含字符串、时间戳、整数和浮点数的 pyspark 数据帧来分析一些数据。
玩具df:
sdf1 =
|id1|id2|id3|
+---+---+---+
| 1 |"a"| 4 |
+---+---+---+
| 2 |"a"| 6 |
+---+---+---+
| 1 |"a"| 7 |
+---+---+---+
| 3 |"a"| 9 |
+---+---+---+
sdf2 =
|ids|
+---+
|id1|
+---+
|id2|
+---+
|id3|
+---+
我正在努力实现以下目标
agg_instructions = [F.max(x).alias("{0}".format(x)) for x in sdf1.columns]
sdf3 = sdf2.withColumn("max", sdf1.agg(*agg_instructions))
这将导致以下数据框。然而,这不起作用 - 任何解决方法?
sdf3 =
|ids|max|
+---+---+
|id1| 3 |
+---+---+
|id2|"a"|
+---+---+
|id3| 9 |
+---+---+
我收到以下错误:
() 7 中的 AssertionError Traceback (最近一次调用最后一次) agg_instructions = [F.max(x).alias("{0}".format(x)) for x in data_sdf.columns] 8 ----> 9 sdf3 = sdf2.withColumn("max", sdf1.agg(*agg_instructions)) 10 11 测试 = test.reset_index()
/databricks/spark/python/pyspark/sql/dataframe.py in withColumn(self, colName, col) 2011 2012 """ -> 2013 assert isinstance(col, Column), "col 应该是 Column" 2014 返回 DataFrame(self ._jdf.withColumn(colName, col._jc), self.sql_ctx) 2015
AssertionError: col 应该是 Column
解决方案
这对于您想要实现的目标来说太过分了。您可以从 just 获得所需的输出sdf1
。
一种方法是创建一个数组列,其中包含列名结构及其对应的最大值。然后分解它并选择结构字段。
这是一个例子:
data = [(1, "a", 4), (2, "a", 6), (1, "a", 7), (3, "a", 9)]
df = spark.createDataFrame(data, ["id1", "id2", "id3"])
agg_instructions = array(
*[struct(lit(c).alias("ids"), max(col(c)).cast("string").alias("max")) for c in df.columns]
)
df.agg(agg_instructions.alias("agg")) \
.withColumn("agg", explode(col("agg"))) \
.select("agg.*") \
.show()
#+---+---+
#|ids|max|
#+---+---+
#|id1|3 |
#|id2|a |
#|id3|9 |
#+---+---+
推荐阅读
- r - 如何在 R 中生成 5000 个合成数据集,每个数据集有 1000 个高斯观测值;
- java - 我想在Java中按下按钮时显示图像
- macos - Plaform.exit() 使 Java 崩溃
- postgresql - 当我的气流启动时,它会在数据库上打开很多连接
- android - glDrawArrays 不渲染到帧缓冲区(尽管 glClear 工作正常)?
- c - 如何使用旧版本的 CCS 头文件?
- google-sheets - 如何使用谷歌时间戳按日期计算条目
- r - 在 ifelse R 中粘贴0
- java - 如何在 Android 中制作上滑面板?
- android - 工具栏上的汉堡包图标充当后退按钮