scala - 如何在 Spark 结构化流中包含 kafka 时间戳值作为列?
问题描述
我正在寻找将 kafka 的时间戳值添加到我的 Spark 结构化流模式的解决方案。我已经从 kafka 中提取了 value 字段并制作了数据框。我的问题是,我还需要获取时间戳字段(来自 kafka)以及其他列。
这是我当前的代码:
val kafkaDatademostr = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","zzzz.xxx.xxx.xxx.com:9002")
.option("subscribe","csvstream")
.load
val interval = kafkaDatademostr.select(col("value").cast("string")).alias("csv")
.select("csv.*")
val xmlData = interval.selectExpr("split(value,',')[0] as ddd" ,
"split(value,',')[1] as DFW",
"split(value,',')[2] as DTG",
"split(value,',')[3] as CDF",
"split(value,',')[4] as DFO",
"split(value,',')[5] as SAD",
"split(value,',')[6] as DER",
"split(value,',')[7] as time_for",
"split(value,',')[8] as fort")
如何从 kafka 获取时间戳并与其他列一起添加为列?
解决方案
时间戳包含在源模式中。只需添加一个“选择时间戳”即可获得如下所示的时间戳。
val interval = kafkaDatademostr.select(col("value").cast("string").alias("csv"), col("timestamp")).select("csv.*", "timestamp")
推荐阅读
- java - Java 扫描仪错误:java.util.NoSuchElementException:找不到行 -- java.base/java.util.Scanner.nextLine(Scanner.java:1651))
- c# - 需要帮助制定英里步行计划
- c# - 正则表达式 - 如何匹配块注释
- sql-server - T-SQL 获取给定组的最后一个值
- ios - 为什么 UINavigationBar.items 是一个数组?
- java - 有没有办法使用“for”循环遍历变量?
- python - 在 discord.py 中嵌入颜色
- python - 如何使用 PYSimplGUI 更新 python 中连续线程中使用的值?
- django - Django注销方法不起作用
- reactjs - 有条件地在特定路线和视口高度上渲染组件