apache-spark - Pyspark 根据列值复制行
问题描述
我想根据每行上给定列的值复制我的 DataFrame 中的所有行,然后索引每个新行。假设我有:
Column A Column B
T1 3
T2 2
我希望结果是:
Column A Column B Index
T1 3 1
T1 3 2
T1 3 3
T2 2 1
T2 2 2
我能够使用固定值进行类似的操作,但不能使用列上的信息。我当前的固定值工作代码是:
idx = [lit(i) for i in range(1, 10)]
df = df.withColumn('Index', explode(array( idx ) ))
我试图改变:
lit(i) for i in range(1, 10)
到
lit(i) for i in range(1, df['Column B'])
并将其添加到我的 array() 函数中:
df = df.withColumn('Index', explode(array( lit(i) for i in range(1, df['Column B']) ) ))
但它不起作用(TypeError: 'Column' 对象不能被解释为整数)。
我应该如何实现这个?
解决方案
不幸的是,您不能像这样遍历 Column。您始终可以使用 a ,但如果您使用的是 Spark 2.1 或更高版本udf
,我确实有一个非 udf hack解决方案应该适合您。
诀窍是利用pyspark.sql.functions.posexplode()
来获取索引值。我们通过重复逗号Column B
时间创建一个字符串来做到这一点。然后我们用逗号分割这个字符串,并用它posexplode
来获取索引。
df.createOrReplaceTempView("df") # first register the DataFrame as a temp table
query = 'SELECT '\
'`Column A`,'\
'`Column B`,'\
'pos AS Index '\
'FROM ( '\
'SELECT DISTINCT '\
'`Column A`,'\
'`Column B`,'\
'posexplode(split(repeat(",", `Column B`), ",")) '\
'FROM df) AS a '\
'WHERE a.pos > 0'
newDF = sqlCtx.sql(query).sort("Column A", "Column B", "Index")
newDF.show()
#+--------+--------+-----+
#|Column A|Column B|Index|
#+--------+--------+-----+
#| T1| 3| 1|
#| T1| 3| 2|
#| T1| 3| 3|
#| T2| 2| 1|
#| T2| 2| 2|
#+--------+--------+-----+
注意:您需要将列名包含在反引号中,因为它们中有空格,如本文所述:如何在 Spark SQL 中表达名称中包含空格的列
推荐阅读
- angular7 - 当我处于开发阶段时,如何从角度项目外部提供 img src 路径
- python-3.x - 为什么 pandas.to_csv 在尝试保留 NaN 时会丢弃数字?
- flutter - 如何在 Flutter 中本地化服务器数据?
- javascript - 获取输入的值并将其转换为某种数组格式
- reactjs - 创建反应应用程序 - 没有打字稿,得到错误:无法加载解析器'@typescript-eslint/parser'
- javascript - 将 socket.io 与快速路由器一起使用
- javascript - 使用 XHR 提交 HTML 表单在基于 iOS/iPadOS 的设备上不起作用
- laravel-5 - 无法通过 larvel 5.5 的作曲家下载任何 pdf 包
- amazon-web-services - 哪个时间序列数据库用于一致的开发产品 (aws) 奇偶校验
- c# - C# 使用参数从 Post 中检索数据