scala - 将日期列与 spark sql 中的最大日期进行比较
问题描述
使用 Spark2.3.0 和 Scala
有一个如下表:
created_date mth ColA
2019-01-01 2019-01 a
2019-01-01 2019-01 b
2019-01-02 2019-01 a
2019-01-02 2019-01 b
.
.
2019-06-26 2019-01 a
架构看起来像:
root
|-- transaction_created_date: string (nullable = true)
|-- txn_mth: string (nullable = true)
|-- ColA: string (nullable = true)
想要将 created_date 列与 max_date 进行比较并创建一个新列
尝试如下:
var max_date = sparkVal.sql(s"""SELECT cast(max(created_date)
as DATE) from BASE_TABLE""").first()
val maxDateValue = max_date.get(0)
var day_counter=10
val data =spark.sql(s"""SELECT
created_date,
mth,
sum(if(date_add(created_date+$day_counter) > cast($maxDateValue as DATE) ),1,0))
as Total_arrival from BASE_TALE a""")
lets say max_date = 2019-06-29
想要像这样的输出
created_date mth Total_arrival
2019-01-01 2019-01 1
2019-01-01 2019-01 1
2019-01-01 2019-01 1
2019-01-02 2019-01 1
.
.
2019-06-26 2019-01 0
2019-06-27 2019-01 0
2019-06-28 2019-01 0
2019-06-29 2019-01 0
2019-06-30 2019-01 0
getting below error :
org.apache.spark.sql.AnalysisException:由于数据类型不匹配,无法解析“CAST(((2019-6)-26)AS DATE)”:无法将int转换为日期;第 43 行,第 106 条;
任何人都可以帮助转换 maxdate 以便它可以用于与日期列进行比较吗?
解决方案
一种实现可能如下:
object TestSO {
def main(args: Array[String]) : Unit = {
// dataset
implicit val spark: SparkSession =
SparkSession
.builder()
.master("local[1]")
.appName("Test")
.getOrCreate()
import org.apache.spark.sql.functions.{to_date, col, max, when, date_add, lit}
val data = Seq(Row("2019-01-01", "2019-01", "a"),
Row("2019-01-01", "2019-01", "b"),
Row("2019-01-02", "2019-01", "a"),
Row("2019-01-02", "2019-01", "b"))
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), StructType(List(StructField("transaction_created_date", StringType, false),
StructField("txn_mth", StringType, false),
StructField("ColA", StringType, false))))
// Add a column with a new column as date. It could be done all in one line
val df_withdate = df.withColumn("transaction_created_date",
to_date(col("transaction_created_date")))
var day_counter=10
// Getting the max
val max_date = df_withdate
.select(max(col("transaction_created_date")))
.collect()(0)(0)
// Put 1, in rows where creation_date + day_counter > max_date
val result_df = df_withdate.withColumn("Total_arrival",
when(date_add(col("transaction_created_date"), day_counter) > to_date(lit(max_date)), 1)
.otherwise(0))
result_df.show()
}
}
它给:
+------------------------+-------+----+-------------+
|transaction_created_date|txn_mth|ColA|Total_arrival|
+------------------------+-------+----+-------------+
| 2019-01-01|2019-01| a| 1|
| 2019-01-01|2019-01| b| 1|
| 2019-01-02|2019-01| a| 1|
| 2019-01-02|2019-01| b| 1|
+------------------------+-------+----+-------------+
推荐阅读
- c# - 如何使用 DataSet 和 DataAdapter C#/asp.net 删除?
- django - 在 Django 中的 Serializer.is_valid() 中预取 ForeignKey
- google-chrome - Chrome 通知证书不是由第三方安全签名的
- apache-spark - 如何使用 pyspark 分解 Apache Spark DataFrame 中的嵌套结构?
- mysql - MYSQL 对 JOIN 查询的最佳使用索引
- javascript - 客户端获取请求头
- azure-container-service - 公司代理后面的私有 VNET 中的 AKS
- java - 如何放置两个不同版本的依赖项?- Maven - Java - 春天
- azure-devops - VSTS 构建寻找不存在的驱动器
- google-sheets - 将 Google 表格中最后日期的列标题作为数组公式返回