scala - 如何从 udf 调用文件系统
问题描述
我所期望的
目标是为每一DataFrame
行添加一个带有修改时间的列。
给定
val data = spark.read.parquet("path").withColumn("input_file_name", input_file_name())
+----+------------------------+
| id | input_file_name |
+----+------------------------+
| 1 | hdfs://path/part-00001 |
| 2 | hdfs://path/part-00001 |
| 3 | hdfs://path/part-00002 |
+----+------------------------+
预期的
+----+------------------------+
| id | modification_time |
+----+------------------------+
| 1 | 2000-01-01Z00:00+00:00 |
| 2 | 2000-01-01Z00:00+00:00 |
| 3 | 2000-01-02Z00:00+00:00 |
+----+------------------------+
我试过的
我写了一个函数来获取修改时间
def getModificationTime(path: String): Long = {
FileSystem.get(spark.sparkContext.hadoopConfiguration)
.getFileStatus(new org.apache.hadoop.fs.Path(path))
.getModificationTime()
}
val modificationTime = getModificationTime("hdfs://srsdev/projects/khajiit/data/OfdCheques2/date=2020.02.01/part-00002-04b9e4c8-5916-4bb2-b9ff-757f843a0142.c000.snappy.parquet")
修改时间:Long = 1580708401253
...但它在查询中不起作用
def input_file_modification_time = udf((path: String) => getModificationTime(path))
data.select(input_file_modification_time($"input_file_name") as "modification_time").show(20, false)
org.apache.spark.SparkException:作业因阶段失败而中止:阶段 54.0 中的任务 0 失败 4 次,最近一次失败:阶段 54.0 中丢失任务 0.3(TID 408,srs-hdp-s1.dev.kontur.ru,执行器 3): org.apache.spark.SparkException: 无法执行用户定义的函数($anonfun$input_file_modification_time$1: (string) => bigint)
解决方案
问题在于spark
UDF 中为 null,因为它仅存在于驱动程序中。另一个问题是 hadoopsConfiguration
是不可序列化的,所以你不能轻易地将它包含在 udf 中。但是有一个解决方法org.apache.spark.SerializableWritable
:
import org.apache.spark.SerializableWritable
import org.apache.hadoop.conf.Configuration
val conf = new SerializableWritable(spark.sparkContext.hadoopConfiguration)
def getModificationTime(path: String, conf:SerializableWritable[Configuration]): Long = {
org.apache.hadoop.fs.FileSystem.get(conf.value)
.getFileStatus(new org.apache.hadoop.fs.Path(path))
.getModificationTime()
}
def input_file_modification_time(conf:SerializableWritable[Configuration]) = udf((path: String) => getModificationTime(path,conf))
data.select(input_file_modification_time(conf)($"input_file_name") as "modification_time").show(20, false)
推荐阅读
- python - 过滤以多列为条件的数据框,根据列值具有不同的条件
- javascript - 试图获取表格点击的行的数据反应js
- reactjs - 为什么前两项在反应中不起作用?
- r - 在 R 中运行 spark 包不起作用,如何将 spark 包调用到 R 中?
- android - 工具栏中的双向菜单项未正确镜像
- python - 在 Python 中生成新进程时分配了多少内存
- php - 在随机时间内没有在服务器上获得 PHP soap api 响应
- sql - 使用立即执行在循环中分配多个字段
- sql - 在 SQL Server 2008R2 上以毫秒为单位返回 DATEDIFF
- python-3.x - 如何让命令行将 & 解释为字符串而不显式将其设为字符串