apache-spark - PySpark - 列中的 to_date 格式
问题描述
我目前正在尝试弄清楚如何通过列参数将字符串格式参数传递给 to_date pyspark 函数。
具体来说,我有以下设置:
sc = SparkContext.getOrCreate()
df = sc.parallelize([('a','2018-01-01','yyyy-MM-dd'),
('b','2018-02-02','yyyy-MM-dd'),
('c','02-02-2018','dd-MM-yyyy')]).toDF(
["col_name","value","format"])
我目前正在尝试添加一个新列,其中列 F.col("value") 中的每个日期,它是一个字符串值,都被解析为一个日期。
分别为每种格式,这可以用
df = df.withColumn("test1",F.to_date(F.col("value"),"yyyy-MM-dd")).\
withColumn("test2",F.to_date(F.col("value"),"dd-MM-yyyy"))
然而,这给了我 2 个新列 - 但我希望有 1 个包含两个结果的列 - 但是使用 to_date 函数调用该列似乎是不可能的:
df = df.withColumn("test3",F.to_date(F.col("value"),F.col("format")))
这里抛出了一个错误“Column object not callable”。
是否可以为所有可能的格式提供通用方法(这样我就不必为每种格式手动添加新列)?
解决方案
您可以使用列值作为参数而不udf
使用 spark-sql 语法:
Spark 2.2 及以上版本
from pyspark.sql.functions import expr
df.withColumn("test3",expr("to_date(value, format)")).show()
#+--------+----------+----------+----------+
#|col_name| value| format| test3|
#+--------+----------+----------+----------+
#| a|2018-01-01|yyyy-MM-dd|2018-01-01|
#| b|2018-02-02|yyyy-MM-dd|2018-02-02|
#| c|02-02-2018|dd-MM-yyyy|2018-02-02|
#+--------+----------+----------+----------+
或者等效地使用 pyspark-sql:
df.createOrReplaceTempView("df")
spark.sql("select *, to_date(value, format) as test3 from df").show()
Spark 1.5 及以上版本
旧版本的 spark 不支持为函数提供format
参数to_date
,因此您必须使用unix_timestamp
and from_unixtime
:
from pyspark.sql.functions import expr
df.withColumn(
"test3",
expr("from_unixtime(unix_timestamp(value,format))").cast("date")
).show()
或者等效地使用 pyspark-sql:
df.createOrReplaceTempView("df")
spark.sql(
"select *, cast(from_unixtime(unix_timestamp(value,format)) as date) as test3 from df"
).show()
推荐阅读
- sql-server - 链接服务器测试连接不工作(SQL Server 到 Oracle 数据库)
- amazon-web-services - AWS Glue 映射对来自不同工作流的作业的依赖关系
- node.js - 在 Node.js 中调用 SOAP 请求操作
- kotlin - 访问 NotificationManager 服务类实例
- docker - 从 Docker 连接 grafana 和 prometheus 数据
- android - 是否可以向已订阅的用户提供促销代码?
- sparql - 在 wikidata SPARQL 上获取度数坐标?
- ruby-on-rails - 如何替换红宝石范围内的最后一次出现?
- javascript - 将javascript注入iframe
- javascript - 更改 redux 存储时如何重新运行函数(在安装组件时运行)?