postgresql - spark将字符串转换为TimestampType
问题描述
我有一个数据框,我想在 Spark 中插入 Postgresql。在 spark 中 DateTimestamp 列是字符串格式。在 postgreSQL 中,它是没有时区的 TimeStamp。
在日期时间列上插入数据库时会出现 Spark 错误。我确实尝试更改数据类型,但插入仍然出错。我无法弄清楚为什么强制转换不起作用。如果我将相同的插入字符串粘贴到 PgAdmin 中并运行,则插入语句运行正常。
import java.text.SimpleDateFormat;
import java.util.Calendar
object EtlHelper {
// Return the current time stamp
def getCurrentTime() : String = {
val now = Calendar.getInstance().getTime()
val hourFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
return hourFormat.format(now)
}
}
在另一个文件中
object CreateDimensions {
def createDimCompany(spark:SparkSession, location:String, propsLocation :String):Unit = {
import spark.implicits._
val dimCompanyStartTime = EtlHelper.getCurrentTime()
val dimcompanyEndTime = EtlHelper.getCurrentTime()
val prevDimCompanyId = 2
val numRdd = 27
val AuditDF = spark.createDataset(Array(("dim_company", prevDimCompanyId,numRdd,dimCompanyStartTime,dimcompanyEndTime))).toDF("audit_tbl_name","audit_tbl_id","audit_no_rows","audit_tbl_start_date","audit_tbl_end_date")//.show()
AuditDF.withColumn("audit_tbl_start_date",AuditDF.col("audit_tbl_start_date").cast(DataTypes.TimestampType))
AuditDF.withColumn("audit_tbl_end_date",AuditDF.col("audit_tbl_end_date").cast(DataTypes.TimestampType))
AuditDF.printSchema()
}
}
root
|-- audit_tbl_name: string (nullable = true)
|-- audit_tbl_id: long (nullable = false)
|-- audit_no_rows: long (nullable = false)
|-- audit_tbl_start_date: string (nullable = true)
|-- audit_tbl_end_date: string (nullable = true)
这是我得到的错误
INSERT INTO etl.audit_master ("audit_tbl_name","audit_tbl_id","audit_no_rows","audit_tbl_start_date","audit_tbl_end_date") VALUES ('dim_company',27,2,'2018-05-02 12:15:54','2018-05-02 12:15:59') was aborted: ERROR: column "audit_tbl_start_date" is of type timestamp without time zone but expression is of type character varying
Hint: You will need to rewrite or cast the expression.
任何帮助表示赞赏。
谢谢
解决方案
AuditDF.printSchema()
正在使用原始AuditDF
数据框,因为您没有.withColumn
通过分配保存转换。数据帧是不可变的对象,可以转换为另一个数据帧,但不能改变自身。所以你总是需要一个任务来保存你应用的转换。
所以正确的方法是分配以保存更改
val transformedDF = AuditDF.withColumn("audit_tbl_start_date",AuditDF.col("audit_tbl_start_date").cast(DataTypes.TimestampType))
.withColumn("audit_tbl_end_date",AuditDF.col("audit_tbl_end_date").cast("timestamp"))
transformedDF.printSchema()
你会看到变化
root
|-- audit_tbl_name: string (nullable = true)
|-- audit_tbl_id: integer (nullable = false)
|-- audit_no_rows: integer (nullable = false)
|-- audit_tbl_start_date: timestamp (nullable = true)
|-- audit_tbl_end_date: timestamp (nullable = true)
.cast(DataTypes.TimestampType)
并且.cast("timestamp")
都相同
推荐阅读
- python - 如何使用 Tkinter 简单地更新标签
- flutter - 如何使用flutter包的firebase_ml_vision加速图像检测
- r - 如何使用 R 更新数据框?
- android - Android Ble 服务器 - 只允许以前连接的设备连接
- mysql - Finding the leaf category in a table using recursive procedure in mysql
- php - MySQL 服务器已消失 - 但没有连接错误
- eclipse - 目标定义选择 Orbit 源包,但不是 Tycho 构建中的二进制文件
- php - 我应该如何在 PHP 中正确命名我的命名空间?
- material-ui - 两行后的材质ui文本省略号
- javascript - 为什么用 javascript 模拟选择选项不进行 API 调用?