python - 如何将数据框中的连接值插入 Pyspark 中的另一个数据框中?
问题描述
我正在创建一个time_interval列并将其添加到 Pyspark 中的现有数据框中。理想情况下,time_interval 将采用“ HHmm ”格式,分钟向下舍入到最接近的 15 分钟标记(815、830、845、900 等)。
我有为我执行逻辑的 spark sql 代码,但我如何获取连接为字符串列的值并将其插入现有的数据框?
time_interval = sqlContext.sql("select extract(hour from current_timestamp())||floor(extract(minute from current_timestamp())/15)*15")
time_interval.show()
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|concat(CAST(hour(current_timestamp()) AS STRING), CAST((FLOOR((CAST(minute(current_timestamp()) AS DOUBLE) / CAST(15 AS DOUBLE))) * CAST(15 AS BIGINT)) AS STRING))|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| 1045|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
baseDF = sqlContext.sql("select * from test_table")
newBase = baseDF.withColumn("time_interval", lit(str(time_interval)))
newBase.select("time_interval").show()
+--------------------+
| time_interval|
+--------------------+
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
+--------------------+
only showing top 20 rows
所以实际的预期结果应该只是在我正在创建的新列中显示实际的字符串值,而不是来自数据帧的这个连接值。如下所示:
newBase.select("time_interval").show(1)
+-------------+
|time_interval|
+-------------+
| 1045 |
+-------------+
解决方案
作为time_interval
数据帧类型,对于这种情况需要collect
和extract the required value out from dataframe
.
试试这个方法:
newBase = baseDF.withColumn("time_interval", lit(str(time_interval.collect()[0][0])))
newBase.show()
(或者)
通过使用select(expr())
功能:
newBase = baseDF.select("*",expr("string(extract(hour from current_timestamp())||floor(extract(minute from current_timestamp())/15)*15) AS time_interval"))
正如评论中提到的pault,使用selectExpr()
函数:
newBase = baseDF.selectExpr("*","string(extract(hour from current_timestamp())||floor(extract(minute from current_timestamp())/15)*15) AS time_interval")
例子:
>>> from pyspark.sql.functions import *
>>> from pyspark.sql.types import IntegerType
>>> time_interval = spark.sql("select extract(hour from current_timestamp())||floor(extract(minute from current_timestamp())/15)*15")
>>> baseDF=spark.createDataFrame([1,2,3,4],IntegerType())
>>> newBase = baseDF.withColumn("time_interval", lit(str(time_interval.collect()[0][0])))
>>> newBase.show()
+-----+-------------+
|value|time_interval|
+-----+-------------+
| 1| 1245|
| 2| 1245|
| 3| 1245|
| 4| 1245|
+-----+-------------+
推荐阅读
- c++ - c++模板
删除 - ruby-on-rails - 带有 Ruby 2.5.1 控制台的 Rails 5.2.0 - `warning:` `already` 已初始化常量 FileUtils::VERSION
- zsh - Zsh 中的 DRYer 提示和分配
- javascript - 单元测试 ECMAScript 模块 (ESM) 和模拟本地状态?
- c# - Unity:断言失败:表达式断言失败:'CompareApproximately(det, 1.0F, .005f)'
- c++ - zlib 添加的各种类型的目的是什么,我该如何使用它们?
- javascript - Next.js - 头部元素不起作用
- visual-c++ - 如何抑制 Word 自动化中的“太多拼写错误”?
- css - 如何使用 React.js 中的 Reactstrap 将导航栏折叠到移动视图中的侧边栏?
- php - 将复选框值数组插入数据库,包括未选中