首页 > 解决方案 > spark检测并提取列值中的模式

问题描述

我有一个这样的df

    import spark.implicits._
    import org.apache.spark.sql.functions._
    
    val latenies = Seq(
        ("start","304875","2021-10-25 21:26:23.486027"),
        ("start","304875","2021-10-25 21:26:23.486670"),
        ("end","304875","2021-10-25 21:26:23.487590"),
        ("start","304875","2021-10-25 21:26:23.509683"),
        ("end","304875","2021-10-25 21:26:23.509689"),
        ("end","304875","2021-10-25 21:26:23.510154"),
        ("start","201345","2021-10-25 21:26:23.510156"),
        ("end","201345","2021-10-25 21:26:23.510159"),
        ("start","201345","2021-10-25 21:26:23.510333"),
        ("start","201345","2021-10-25 21:26:23.510335"),
        ("end","201345","2021-10-25 21:26:23.513177"),
        ("start","201345","2021-10-25 21:26:23.513187")
      )
    val latenies_df = latenies.toDF("Msg_name","Id_num","TimeStamp")
                            .withColumn("TimeStamp", to_timestamp(col("TimeStamp")))
    latenies_df.show(false)

它看起来像这样:

+--------+------+--------------------------+
|Msg_name|Id_num|TimeStamp                 |
+--------+------+--------------------------+
|start   |304875|2021-10-25 21:26:23.486027|
|start   |304875|2021-10-25 21:26:23.48667 |
|end     |304875|2021-10-25 21:26:23.48759 |
|start   |304875|2021-10-25 21:26:23.509683|
|end     |304875|2021-10-25 21:26:23.509689|
|end     |304875|2021-10-25 21:26:23.510154|
|start   |201345|2021-10-25 21:26:23.510156|
|end     |201345|2021-10-25 21:26:23.510159|
|start   |201345|2021-10-25 21:26:23.510333|
|start   |201345|2021-10-25 21:26:23.510335|
|end     |201345|2021-10-25 21:26:23.513177|
|start   |201345|2021-10-25 21:26:23.513187|
+--------+------+--------------------------+

问题:我想在列中提取某个模式,该模式Msg_name始终是 whenstart具有后续值endwhen partitioned byId和order by timeMsg可以有多个开始或结束。我只想要start-end两者之间的任何东西。

使用这种模式,我想做一个 df :

|patter_name|Timestamp_start           |Timestamp_end             |Id_num  |
|   pattern1|2021-10-25 21:26:23.486670|2021-10-25 21:26:23.487590|304875  |
|   pattern1|2021-10-25 21:26:23.509683|2021-10-25 21:26:23.509689|304875  |
|   pattern1|2021-10-25 21:26:23.510156|2021-10-25 21:26:23.510159|201345  |
|   pattern1|2021-10-25 21:26:23.510335|2021-10-25 21:26:23.513177|201345  |

我所做的是移动框架,由于Msg_name专栏的性质,这不会给我正确的答案。

    val window = org.apache.spark.sql.expressions.Window.partitionBy("Id_num").orderBy("TimeStamp")
    val df_only_pattern = latenies_df.withColumn("TimeStamp_start", when($"Msg_name" !== lag($"Msg_name", 1).over(window), lag("TimeStamp", 1).over(window)).otherwise(lit(null)))
                                    .withColumn("latency_time", when($"TimeStamp_start".isNotNull, round((col("TimeStamp").cast("double")-col("TimeStamp_start").cast("double")) * 1e3, 2)).otherwise(lit(null)))
                                    .withColumnRenamed("TimeStamp", "TimeStamp_end")
                                    .withColumn("patter_name", lit("pattern1"))
                                    .na.drop()
    df_only_pattern.orderBy("TimeStamp_start").show(false)

这给出了什么:

+--------+------+--------------------------+--------------------------+------------+-----------+
|Msg_name|Id_num|TimeStamp_end             |TimeStamp_start           |latency_time|patter_name|
+--------+------+--------------------------+--------------------------+------------+-----------+
|end     |304875|2021-10-25 21:26:23.48759 |2021-10-25 21:26:23.48667 |0.92        |pattern1   |
|start   |304875|2021-10-25 21:26:23.509683|2021-10-25 21:26:23.48759 |22.09       |pattern1   |
|end     |304875|2021-10-25 21:26:23.509689|2021-10-25 21:26:23.509683|0.01        |pattern1   |
|end     |201345|2021-10-25 21:26:23.510159|2021-10-25 21:26:23.510156|0.0         |pattern1   |
|start   |201345|2021-10-25 21:26:23.510333|2021-10-25 21:26:23.510159|0.17        |pattern1   |
|end     |201345|2021-10-25 21:26:23.513177|2021-10-25 21:26:23.510335|2.84        |pattern1   |
|start   |201345|2021-10-25 21:26:23.513187|2021-10-25 21:26:23.513177|0.01        |pattern1   |
+--------+------+--------------------------+--------------------------+------------+-----------+


我可以df使用带有 groupby 并在组内循环的 python pandas 实现想要的,这在 spark 中似乎是不可能的。

标签: scalaapache-sparkapache-spark-sql

解决方案


可以接收消息“end”,在前一行有“start”:

latenies_df
  .withColumn("TimeStamp_start",
    when(lag($"Msg_name", 1).over(window) === lit("start"), lag($"TimeStamp", 1).over(window))
      .otherwise(lit(null).cast(TimestampType))
  )
  .where($"Msg_name" === lit("end"))
  .where($"TimeStamp_start".isNotNull)

  .select(
    lit("pattern1").alias("patter_name"),
    $"TimeStamp_start",
    $"TimeStamp".alias("Timestamp_end"),
    $"Id_num"
  )

结果:

+-----------+--------------------------+--------------------------+------+
|patter_name|TimeStamp_start           |Timestamp_end             |Id_num|
+-----------+--------------------------+--------------------------+------+
|pattern1   |2021-10-25 21:26:23.48667 |2021-10-25 21:26:23.48759 |304875|
|pattern1   |2021-10-25 21:26:23.509683|2021-10-25 21:26:23.509689|304875|
|pattern1   |2021-10-25 21:26:23.510156|2021-10-25 21:26:23.510159|201345|
|pattern1   |2021-10-25 21:26:23.510335|2021-10-25 21:26:23.513177|201345|
+-----------+--------------------------+--------------------------+------+

推荐阅读