首页 > 解决方案 > 如何在 Spark 结构化流中包含 kafka 时间戳值作为列?

问题描述

我正在寻找将 kafka 的时间戳值添加到我的 Spark 结构化流模式的解决方案。我已经从 kafka 中提取了 value 字段并制作了数据框。我的问题是,我还需要获取时间戳字段(来自 kafka)以及其他列。

这是我当前的代码:

val kafkaDatademostr = spark
  .readStream 
  .format("kafka")
  .option("kafka.bootstrap.servers","zzzz.xxx.xxx.xxx.com:9002")
  .option("subscribe","csvstream")
  .load

val interval = kafkaDatademostr.select(col("value").cast("string")).alias("csv")
  .select("csv.*")

val xmlData = interval.selectExpr("split(value,',')[0] as ddd" ,
    "split(value,',')[1] as DFW",
    "split(value,',')[2] as DTG",
    "split(value,',')[3] as CDF",
    "split(value,',')[4] as DFO",
    "split(value,',')[5] as SAD",
    "split(value,',')[6] as DER",
    "split(value,',')[7] as time_for",
    "split(value,',')[8] as fort")

如何从 kafka 获取时间戳并与其他列一起添加为列?

标签: scalaapache-sparkapache-kafkaspark-structured-streamingspark-streaming-kafka

解决方案


时间戳包含在源模式中。只需添加一个“选择时间戳”即可获得如下所示的时间戳。

val interval = kafkaDatademostr.select(col("value").cast("string").alias("csv"), col("timestamp")).select("csv.*", "timestamp")

推荐阅读