首页 > 解决方案 > 为聚合列提供别名并在 pyspark 中使用 groupBy 对结果进行四舍五入

问题描述

我有一个 pyspark DataFrame,其中包含一个名为primary_use.

这是第一行:

在此处输入图像描述

我想按 DataFrame 分组,primary_use使用该mean函数作为聚合的键,给alias聚合列和round它一个。

我的代码虽然创建了一个异常:

(building.groupBy('primary_use').agg({'square_feet' : 'mean'}).alias('avg_sqft')).select('avg_sqft' , round('avg_sqft', 0)).show()

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
C:\spark\spark-3.0.0-preview-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
     97         try:
---> 98             return f(*a, **kw)
     99         except py4j.protocol.Py4JJavaError as e:

C:\spark\spark-3.0.0-preview-bin-hadoop2.7\python\lib\py4j-0.10.8.1-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:

Py4JJavaError: An error occurred while calling o731.select.
: org.apache.spark.sql.AnalysisException: cannot resolve '`avg_sqft`' given input columns: [avg_sqft.avg(square_feet), avg_sqft.primary_use];;
'Project ['avg_sqft, round('avg_sqft, 0) AS round(avg_sqft, 0)#4542]
+- SubqueryAlias `avg_sqft`
   +- Aggregate [primary_use#175], [primary_use#175, avg(square_feet#176) AS avg(square_feet)#4538]
      +- Project [site_id#173, building_id#174, primary_use#175, square_feet#176, year_built#177, floor_count#178]
         +- Project [site_id#173, building_id#174, primary_use#175, square_feet#176, year_built#177, floor_count#178, (2019 - year_built#177) AS Age#3378]
            +- RelationV2[site_id#173, building_id#174, primary_use#175, square_feet#176, year_built#177, floor_count#178] csv file:/D:/DIGITAL_LIBRARY/PySpark/building_metadata.csv

    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:125)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:122)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:310)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:310)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUp$1(QueryPlan.scala:97)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:109)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:109)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:120)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:125)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:125)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:130)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:214)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:130)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:97)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:122)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:90)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:154)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:90)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:87)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:122)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:148)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:145)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:66)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:63)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:63)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:55)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:87)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3436)
    at org.apache.spark.sql.Dataset.select(Dataset.scala:1413)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Unknown Source)


During handling of the above exception, another exception occurred:

AnalysisException                         Traceback (most recent call last)
<ipython-input-136-bc0f5a2281b1> in <module>
----> 1 (building.groupBy('primary_use').agg({'square_feet' : 'mean'}).alias('avg_sqft')).select('avg_sqft' , round('avg_sqft', 0)).show()

C:\spark\spark-3.0.0-preview-bin-hadoop2.7\python\pyspark\sql\dataframe.py in select(self, *cols)
   1320         [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)]
   1321         """
-> 1322         jdf = self._jdf.select(self._jcols(*cols))
   1323         return DataFrame(jdf, self.sql_ctx)
   1324 

C:\spark\spark-3.0.0-preview-bin-hadoop2.7\python\lib\py4j-0.10.8.1-src.zip\py4j\java_gateway.py in __call__(self, *args)
   1284         answer = self.gateway_client.send_command(command)
   1285         return_value = get_return_value(
-> 1286             answer, self.gateway_client, self.target_id, self.name)
   1287 
   1288         for temp_arg in temp_args:

C:\spark\spark-3.0.0-preview-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
    100             converted = convert_exception(e.java_exception)
    101             if not isinstance(converted, UnknownException):
--> 102                 raise converted
    103             else:
    104                 raise

AnalysisException: cannot resolve '`avg_sqft`' given input columns: [avg_sqft.avg(square_feet), avg_sqft.primary_use];;
'Project ['avg_sqft, round('avg_sqft, 0) AS round(avg_sqft, 0)#4542]
+- SubqueryAlias `avg_sqft`
   +- Aggregate [primary_use#175], [primary_use#175, avg(square_feet#176) AS avg(square_feet)#4538]
      +- Project [site_id#173, building_id#174, primary_use#175, square_feet#176, year_built#177, floor_count#178]
         +- Project [site_id#173, building_id#174, primary_use#175, square_feet#176, year_built#177, floor_count#178, (2019 - year_built#177) AS Age#3378]
            +- RelationV2[site_id#173, building_id#174, primary_use#175, square_feet#176, year_built#177, floor_count#178] csv file:/D:/DIGITAL_LIBRARY/PySpark/building_metadata.csv

可能是什么问题?

标签: python-3.xgroup-byapache-spark-sqlrounding

解决方案


我也遇到过同样的问题,用sql风格解决了

您可以通过 spark shell 中的 help(building.groupBy) 进一步检查

from pyspark.sql.functions import sum, round

building.groupBy(building.primary_use).agg(round(mean("square_feet"), 0).alias("avg_sqft")).show()

推荐阅读