首页 > 解决方案 > 如何从 udf 调用文件系统

问题描述

我所期望的

目标是为每一DataFrame行添加一个带有修改时间的列。

给定

val data = spark.read.parquet("path").withColumn("input_file_name", input_file_name())

+----+------------------------+
| id |        input_file_name |
+----+------------------------+
|  1 | hdfs://path/part-00001 |
|  2 | hdfs://path/part-00001 |
|  3 | hdfs://path/part-00002 |
+----+------------------------+

预期的

+----+------------------------+
| id |      modification_time |
+----+------------------------+
|  1 | 2000-01-01Z00:00+00:00 |
|  2 | 2000-01-01Z00:00+00:00 |
|  3 | 2000-01-02Z00:00+00:00 |
+----+------------------------+

我试过的

我写了一个函数来获取修改时间

def getModificationTime(path: String): Long = {
    FileSystem.get(spark.sparkContext.hadoopConfiguration)
        .getFileStatus(new org.apache.hadoop.fs.Path(path))
        .getModificationTime()
}

val modificationTime = getModificationTime("hdfs://srsdev/projects/khajiit/data/OfdCheques2/date=2020.02.01/part-00002-04b9e4c8-5916-4bb2-b9ff-757f843a0142.c000.snappy.parquet")

修改时间:Long = 1580708401253

...但它在查询中不起作用

def input_file_modification_time = udf((path: String) => getModificationTime(path))

data.select(input_file_modification_time($"input_file_name") as "modification_time").show(20, false)

org.apache.spark.SparkException:作业因阶段失败而中止:阶段 54.0 中的任务 0 失败 4 次,最近一次失败:阶段 54.0 中丢失任务 0.3(TID 408,srs-hdp-s1.dev.kontur.ru,执行器 3): org.apache.spark.SparkException: 无法执行用户定义的函数($anonfun$input_file_modification_time$1: (string) => bigint)

标签: scalaapache-spark

解决方案


问题在于sparkUDF 中为 null,因为它仅存在于驱动程序中。另一个问题是 hadoopsConfiguration是不可序列化的,所以你不能轻易地将它包含在 udf 中。但是有一个解决方法org.apache.spark.SerializableWritable

import org.apache.spark.SerializableWritable
import org.apache.hadoop.conf.Configuration

val conf = new SerializableWritable(spark.sparkContext.hadoopConfiguration)

def getModificationTime(path: String, conf:SerializableWritable[Configuration]): Long = {
    org.apache.hadoop.fs.FileSystem.get(conf.value)
        .getFileStatus(new org.apache.hadoop.fs.Path(path))
        .getModificationTime()
}

def input_file_modification_time(conf:SerializableWritable[Configuration]) = udf((path: String) => getModificationTime(path,conf))

data.select(input_file_modification_time(conf)($"input_file_name") as "modification_time").show(20, false)

推荐阅读