python-3.x - 为聚合列提供别名并在 pyspark 中使用 groupBy 对结果进行四舍五入
问题描述
我有一个 pyspark DataFrame,其中包含一个名为primary_use
.
这是第一行:
我想按 DataFrame 分组,primary_use
使用该mean
函数作为聚合的键,给alias
聚合列和round
它一个。
我的代码虽然创建了一个异常:
(building.groupBy('primary_use').agg({'square_feet' : 'mean'}).alias('avg_sqft')).select('avg_sqft' , round('avg_sqft', 0)).show()
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
C:\spark\spark-3.0.0-preview-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
97 try:
---> 98 return f(*a, **kw)
99 except py4j.protocol.Py4JJavaError as e:
C:\spark\spark-3.0.0-preview-bin-hadoop2.7\python\lib\py4j-0.10.8.1-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
Py4JJavaError: An error occurred while calling o731.select.
: org.apache.spark.sql.AnalysisException: cannot resolve '`avg_sqft`' given input columns: [avg_sqft.avg(square_feet), avg_sqft.primary_use];;
'Project ['avg_sqft, round('avg_sqft, 0) AS round(avg_sqft, 0)#4542]
+- SubqueryAlias `avg_sqft`
+- Aggregate [primary_use#175], [primary_use#175, avg(square_feet#176) AS avg(square_feet)#4538]
+- Project [site_id#173, building_id#174, primary_use#175, square_feet#176, year_built#177, floor_count#178]
+- Project [site_id#173, building_id#174, primary_use#175, square_feet#176, year_built#177, floor_count#178, (2019 - year_built#177) AS Age#3378]
+- RelationV2[site_id#173, building_id#174, primary_use#175, square_feet#176, year_built#177, floor_count#178] csv file:/D:/DIGITAL_LIBRARY/PySpark/building_metadata.csv
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:125)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:122)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:310)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:310)
at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUp$1(QueryPlan.scala:97)
at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:109)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:109)
at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:120)
at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:125)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:125)
at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:130)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:214)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:130)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:97)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:122)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:90)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:154)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:90)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:87)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:122)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:148)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:145)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:66)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:63)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:63)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:55)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:87)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3436)
at org.apache.spark.sql.Dataset.select(Dataset.scala:1413)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Unknown Source)
During handling of the above exception, another exception occurred:
AnalysisException Traceback (most recent call last)
<ipython-input-136-bc0f5a2281b1> in <module>
----> 1 (building.groupBy('primary_use').agg({'square_feet' : 'mean'}).alias('avg_sqft')).select('avg_sqft' , round('avg_sqft', 0)).show()
C:\spark\spark-3.0.0-preview-bin-hadoop2.7\python\pyspark\sql\dataframe.py in select(self, *cols)
1320 [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)]
1321 """
-> 1322 jdf = self._jdf.select(self._jcols(*cols))
1323 return DataFrame(jdf, self.sql_ctx)
1324
C:\spark\spark-3.0.0-preview-bin-hadoop2.7\python\lib\py4j-0.10.8.1-src.zip\py4j\java_gateway.py in __call__(self, *args)
1284 answer = self.gateway_client.send_command(command)
1285 return_value = get_return_value(
-> 1286 answer, self.gateway_client, self.target_id, self.name)
1287
1288 for temp_arg in temp_args:
C:\spark\spark-3.0.0-preview-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
100 converted = convert_exception(e.java_exception)
101 if not isinstance(converted, UnknownException):
--> 102 raise converted
103 else:
104 raise
AnalysisException: cannot resolve '`avg_sqft`' given input columns: [avg_sqft.avg(square_feet), avg_sqft.primary_use];;
'Project ['avg_sqft, round('avg_sqft, 0) AS round(avg_sqft, 0)#4542]
+- SubqueryAlias `avg_sqft`
+- Aggregate [primary_use#175], [primary_use#175, avg(square_feet#176) AS avg(square_feet)#4538]
+- Project [site_id#173, building_id#174, primary_use#175, square_feet#176, year_built#177, floor_count#178]
+- Project [site_id#173, building_id#174, primary_use#175, square_feet#176, year_built#177, floor_count#178, (2019 - year_built#177) AS Age#3378]
+- RelationV2[site_id#173, building_id#174, primary_use#175, square_feet#176, year_built#177, floor_count#178] csv file:/D:/DIGITAL_LIBRARY/PySpark/building_metadata.csv
可能是什么问题?
解决方案
我也遇到过同样的问题,用sql风格解决了
您可以通过 spark shell 中的 help(building.groupBy) 进一步检查
from pyspark.sql.functions import sum, round
building.groupBy(building.primary_use).agg(round(mean("square_feet"), 0).alias("avg_sqft")).show()
推荐阅读
- css - 为什么我在 1 页上获得多个图像
- python - 无法在 PyCharm 中创建解释器
- ios - 使用 AVAudioPlayer 播放音频
- r - R Data.Table 检查组条件
- javascript - 我在使用 JavaScript 重置表单时遇到了这个问题
- webpack - MongoDB Stitch + Gatsby Buid: WebpackError: ReferenceError: self is not defined with
- python - 如何在 RobotFramework 的浏览器警报中输入值
- selenium - 如何找到包含特定子元素的元素?
- javascript - 如何使图表水平滚动(使用 Chart.js 时)
- azure - 为什么 Azure 中的所有 VM 都显示为灰色且无法使用?如何选择虚拟机?