scala - 使用 Option 作为输入参数定义 Spark scala UDF
问题描述
编写以下 UDF 旨在使其处理未定义参数的情况。下面是代码:
val addTimeFromCols: UserDefinedFunction = udf((year: String, month: String, day: String, hour: String) => {
Option(hour) match {
case None => (List(year, month, day).mkString(DASH_SEP)).concat(SPACE).concat(defaultHour)
case Some(x) => (List(year, month, day).mkString(DASH_SEP)).concat(SPACE).concat(hour)
}
})
def addTimestampFromFileCols(): DataFrame = df
.withColumn(COLUMN_TS, addTimeFromCols(col(COLUMN_YEAR), col(COLUMN_MONTH), col(COLUMN_DAY), col(COLUMN_HOUR)).cast(TimestampType))
我的目标是使此功能适用于所有用例(具有 HOUR 列的数据框和其他没有此列的数据框,在这种情况下,我默认定义一个值。不幸的是,当我再次测试不'没有我收到以下错误的列:
cannot resolve '`HOUR`' given input columns
请知道如何解决这个问题
解决方案
如果该列不存在,则必须通过 lit() 函数提供默认值,否则将引发错误。以下对我有用
scala> defaultHour
res77: String = 00
scala> :paste
// Entering paste mode (ctrl-D to finish)
def addTimestampFromFileCols(df:DataFrame) =
{
val hr = if( df.columns.contains("hour") ) col(COLUMN_HOUR) else lit(defaultHour)
df.withColumn(COLUMN_TS, addTimeFromCols(col(COLUMN_YEAR), col(COLUMN_MONTH), col(COLUMN_DAY), hr).cast(TimestampType))
}
// Exiting paste mode, now interpreting.
addTimestampFromFileCols: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
scala>
+ve 案例
scala> val df = Seq(("2019","01","10","09")).toDF("year","month","day","hour")
df: org.apache.spark.sql.DataFrame = [year: string, month: string ... 2 more fields]
scala> addTimestampFromFileCols(df).show(false)
+----+-----+---+----+-------------------+
|year|month|day|hour|tstamp |
+----+-----+---+----+-------------------+
|2019|01 |10 |09 |2019-01-10 09:00:00|
+----+-----+---+----+-------------------+
-ve 案例
scala> val df = Seq(("2019","01","10")).toDF("year","month","day")
df: org.apache.spark.sql.DataFrame = [year: string, month: string ... 1 more field]
scala> addTimestampFromFileCols(df).show(false)
+----+-----+---+-------------------+
|year|month|day|tstamp |
+----+-----+---+-------------------+
|2019|01 |10 |2019-01-10 00:00:00|
+----+-----+---+-------------------+
scala>
推荐阅读
- python - Python date.today() 不刷新
- react-query - 如何最好地从反应查询缓存中获取数据?
- hl7-fhir - 推荐用于 CDS 反馈的 FHIR 类型
- vue.js - vue js重定向到相同的路径,但后退按钮上有参数
- kubernetes - 使用亲和力将两个 Kubernetes pod 分开,标签值未知
- visual-studio-code - VS Code No Module Found 错误 - 需要帮助在本地运行 PySpark 代码
- ios - Crashlytics 和 XCode Crash Organizer 是否相互排斥?
- networking - 是否有任何支持 Multilink PPP 的嵌入式 IP 堆栈库?
- vaadin - 我想在悬停分离器时触发手柄受到影响
- django - 在模板中显示后存储消息