首页 > 解决方案 > 如何使用聚合的输出作为 withColumn 的输入

问题描述

我正在尝试使用包含字符串、时间戳、整数和浮点数的 pyspark 数据帧来分析一些数据。

玩具df:

sdf1 = 
|id1|id2|id3|
+---+---+---+
| 1 |"a"| 4 |
+---+---+---+
| 2 |"a"| 6 |
+---+---+---+
| 1 |"a"| 7 |
+---+---+---+
| 3 |"a"| 9 |
+---+---+---+


sdf2 = 
|ids|
+---+
|id1|
+---+
|id2|
+---+
|id3|
+---+

我正在努力实现以下目标

agg_instructions = [F.max(x).alias("{0}".format(x)) for x in sdf1.columns]

sdf3 = sdf2.withColumn("max", sdf1.agg(*agg_instructions))

这将导致以下数据框。然而,这不起作用 - 任何解决方法?

sdf3 = 
|ids|max|
+---+---+
|id1| 3 |
+---+---+
|id2|"a"|
+---+---+
|id3| 9 |
+---+---+

我收到以下错误:

() 7 中的 AssertionError Traceback (最近一次调用最后一次) agg_instructions = [F.max(x).alias("{0}".format(x)) for x in data_sdf.columns] 8 ----> 9 sdf3 = sdf2.withColumn("max", sdf1.agg(*agg_instructions)) 10 11 测试 = test.reset_index()

/databricks/spark/python/pyspark/sql/dataframe.py in withColumn(self, colName, col) 2011 2012 """ -> 2013 assert isinstance(col, Column), "col 应该是 Column" 2014 返回 DataFrame(self ._jdf.withColumn(colName, col._jc), self.sql_ctx) 2015

AssertionError: col 应该是 Column

标签: apache-sparkpysparkapache-spark-sqlpyspark-dataframes

解决方案


这对于您想要实现的目标来说太过分了。您可以从 just 获得所需的输出sdf1

一种方法是创建一个数组列,其中包含列名结构及其对应的最大值。然后分解它并选择结构字段。

这是一个例子:

data = [(1, "a", 4), (2, "a", 6), (1, "a", 7), (3, "a", 9)]
df = spark.createDataFrame(data, ["id1", "id2", "id3"])

agg_instructions = array(
        *[struct(lit(c).alias("ids"), max(col(c)).cast("string").alias("max")) for c in df.columns]
    )

df.agg(agg_instructions.alias("agg")) \
  .withColumn("agg", explode(col("agg"))) \
  .select("agg.*") \
  .show()

#+---+---+
#|ids|max|
#+---+---+
#|id1|3  |
#|id2|a  |
#|id3|9  |
#+---+---+

推荐阅读