apache-spark - PYSPARK DF MAP:获取火花图中给定键的值
问题描述
我有带有地图的“国家”数据框:
+--------------------+
| map|
+--------------------+
|[1-> Spain |
|[2-> Germany ...|
|[3-> Czech Repu...|
|[4-> Malta ...|
我如何使用键访问地图中的值,然后如何使用地图数据框从其他数据框中的列中映射值。
所以从这样的“销售”数据框中:
+--------------------+
|country_id | Sale |
+--------------------+
|1 |200 |
|2 |565 |
country_id 值将映射到国家(我们将删除 country_id 列):
+--------------------+
|country | Sale |
+--------------------+
|Spain |200 |
|Germany |565 |
我知道使用连接或字典映射等替代方法,但这里的问题仅与火花映射有关。尝试了 element_at 之类的功能,但它没有正常工作。
解决方案
如果您从示例中所示的两个数据框开始,则获得所需输出的惯用方法是通过连接。(我假设您的地图 DataFrame 相对于 Sale DataFrame 较小,您可能可以使用broadcast
连接。)
from pyspark.sql.functions import broadcast, col, explode,
from pyspark.sql.types import IntegerType, MapType, StringType
from pyspark.sql.types import StructType, StructField
# set up data
map_df = spark.createDataFrame(
[({1: "Spain"},),({2: "Germany"},),({3: "Czech Republic"},),({4: "Malta"},)],
schema=StructType([StructField("map", MapType(IntegerType(), StringType()))])
)
sale_df = spark.createDataFrame([(1, 200), (2, 565)], ["country_id","Sale"])
# join
sale_df.join(
broadcast(map_df.select(explode("map").alias("country_id", "country"))),
on="country_id",
how="left"
).select("country", "Sale").show()
#+-------+----+
#|country|Sale|
#+-------+----+
#| Spain| 200|
#|Germany| 565|
#+-------+----+
相反,如果您将映射作为单个MapType
,则可以通过在执行计划中向上推映射的评估来避免连接。
from pyspark.sql.functions import array, map_from_arrays, lit
my_dict = {1: "Spain", 2: "Germany", 3: "Czech Republic", 4: "Malta"}
my_map = map_from_arrays(
array(*map(lit, my_dict.keys())),
array(*map(lit, my_dict.values()))
)
print(my_map)
#Column<map_from_arrays(array(1, 2, 3, 4), array(Spain, Germany, Czech Republic, Malta))>
现在getItem
在您的选择语句中使用:
sale_df.select(my_map.getItem(col("country_id")).alias("country"), "Sale").show()
#+-------+----+
#|country|Sale|
#+-------+----+
#| Spain| 200|
#|Germany| 565|
#+-------+----+
以及执行计划:
sale_df.select(my_map.getItem(col("country_id")).alias("country"), "Sale").explain()
#== Physical Plan ==
#*(1) Project [keys: [1,2,3,4], values: [Spain,Germany,Czech Republic,Malta][cast(country_id#6L as int)] AS country#62, Sale#7L]
#+- Scan ExistingRDD[country_id#6L,Sale#7L]
您能否将第一种方法(DataFrame)中的数据转换为第二种方法?是的 - 但几乎可以肯定,这样做不值得。
推荐阅读
- python-3.x - Scipy.optimize fmin 不优化四轴飞行器的功能
- facebook - 通过 Facebook Pagebox 事件在客户喜欢页面后添加回调 url
- dataframe - 将组特定函数应用于 Julia 数据帧
- vercel - 如何将节点服务器部署到 Vercel?
- asp.net-core - 基于 UI 的角色管理 asp.net core 5.0 razor pages
- mysql - 每次我构建容器时 docker-compose 清除数据库
- python - 使用 python 在应用程序中执行重复性任务
- prolog - 域声明中的 CLPFD "OR" 条件
- function - 不能在方案编程中定义这样的功能
- android-activity - 对活动生命周期感到困惑