首页 > 解决方案 > Spark Dataframe 复杂排序

问题描述

我有一个事件日志数据集,如下所示:

| patient | timestamp     | event_st       | extra_info |
| 1       | 1/1/2018 2:30 | urg_admission  | x          |
| 1       | 1/1/2018 3:00 | urg_discharge  | x          |
| 1       | 1/1/2018      | hosp_admission | y          |
| 1       | 1/10/2018     | hosp_discharge | y          | 

我想按patientand对所有行进行排序timestamp,但不幸的是,根据 event 的类型event_sttimestamp可能以分钟或天为单位。

因此,我将在 C++ 中使用的解决方案是定义一个复杂的<运算符,当时间粒度不同时,我将使用它event_st作为鉴别器。例如,对于显示的数据,当日期相同时,hosp_带有前缀的事件将始终排在带有前缀的事件之后urg_

是否有使用DataFrameAPI 或其他 Spark API 的等效方法?

非常感谢。

标签: apache-sparkapache-spark-sql

解决方案


一种选择是首先将所有时间戳标准化为某种标准形式,如 ddMMYY 或纪元。最简单的方法是使用 udf。

例如:如果您考虑将所有时间戳转换为纪元,那么您的代码将如下所示:

def convertTimestamp(timeStamp:String, event_st:String) : Long = {
    if(event_st == 'urg_admission') {
    ...// Add conversion logic
    }
    if(event_st == 'hosp_admission') {
    ...// Add conversion logic
    }
     ...
}

val df = spark.read.json("/path/to/log/dataset") // I am assuming json format
spark.register.udf("convertTimestamp", convertTimestamp _)
df.createOrReplaceTempTable("logdataset")
val df_normalized = spark.sql("select logdataset.*, convertTimestamp(timestamp,event_st) as normalized_timestamp from logdataset")

在此之后,您可以使用规范化的数据集形成后续操作。


推荐阅读