pyspark - 计算 pyspark.sql 中与爆炸列匹配的记录数?
问题描述
我有一个使用 Spark 2.4 和 Yelp 数据集的一部分的作业。我们将从业务数据中使用的模式部分如下,并在同一个 DataFrame 中使用:
"business_id": string
"categories": comma delimited list of strings
"stars": double
我们应该创建一个新的 DataFrame,它按类别对业务进行分组,具有以下列:
"category": string exploded from "categories"
"businessCount": integer; number of businesses in that category
"averageStarRating": double; average rating of businesses in the category
"minStarRating": double; lowest rating of any restaurant in that category
"maxStarRating": double; highest rating of any restaurant in that category
到目前为止,我已经能够弄清楚如何使用explode命令将“类别”列分解为单独的记录并显示“business_id”、“category”和“stars”:
import from pyspark.sql functions as F
businessdf.select("business_id", F.explode(F.split("categories", ",")).alias("category"), "stars").show(5)
上面的命令给了我这个结果:
+--------------------+--------------+-----+
| business_id| category|stars|
+--------------------+--------------+-----+
|1SWheh84yJXfytovI...| Golf| 3.0|
|1SWheh84yJXfytovI...| Active Life| 3.0|
|QXAEGFB4oINsVuTFx...|Specialty Food| 2.5|
|QXAEGFB4oINsVuTFx...| Restaurants| 2.5|
|QXAEGFB4oINsVuTFx...| Dim Sum| 2.5|
+--------------------+--------------+-----+
only showing top 5 rows
我不知道该怎么做是使用聚合函数来创建其他列。我的教授说这一切都必须在一份声明中完成。到目前为止,我的所有尝试都导致了错误。
我的任务说,在进行任何聚合之前,我还需要删除新创建的“类别”列上的任何前导/尾随空格,但我的尝试都导致了错误。
我觉得这是我最接近的一次,但不知道接下来要尝试什么:
businessdf.select(F.explode(F.split("categories", ",")).alias("category")).groupBy("category").agg(F.count("category").alias("businessCount"), F.avg("stars").alias("averageStarRating"), F.min("stars").alias("minStarRating"), F.max("stars").alias("maxStarRating"))
这是该命令附带的错误:
`pyspark.sql.utils.AnalysisException: "cannot resolve '`stars`' given input columns: [category];;\n'Aggregate [category#337], [category#337, count(category#337) AS businessCount#342L, avg('stars) AS averageStarRating#344, min('stars) AS minStarRating#346, max('stars) AS maxStarRating#348]\n+- Project [category#337]\n +- Generate explode(split(categories#33, ,)), false, [category#337]\n +- Relation[address#30,attributes#31,business_id#32,categories#33,city#34,hours#35,is_open#36L,latitude#37,lo`ngitude#38,name#39,postal_code#40,review_count#41L,stars#42,state#43] json\n"
解决方案
没关系,发帖一定帮助我自己完成了它。我在上面发布的命令非常接近,但我忘记在 select 语句中添加“stars”列。正确的命令在这里:
businessdf.select(F.explode(F.split("categories", ",")).alias("category"), "stars").groupBy("category").agg(F.count("category").alias("businessCount"), F.avg("stars").alias("averageStarRating"), F.min("stars").alias("minStarRating"), F.max("stars").alias("maxStarRating")).show()
推荐阅读
- python - 具有最小唯一值的行的随机值 pandas
- php - WooCommerce:如何在后端没有显式字段的情况下将信息保存到变体数据
- c99 - 为什么我得到分段错误(核心转储)
- python - 如何正确地将 excel 表中的数据分配给 python/pyomo(具体模型)上的参数
- python - 如何找到从特定字符位置开始的单词而不是字符串中的单词位置?
- excel - 读取 Excel 数据的 VM Domino 服务器在 RDP 会话处于活动状态时工作
- python - 使用 plotly 在一个图中绘制多条 3d 线
- protocol-buffers - 对 pbtools 中的重复条目进行编码
- python - ldap python result3 函数结果中缺少 LDAP 属性,但出现在 ldapsearch 命令结果中
- oracle - 尝试执行查询时忽略 $