node.js - Kafka Streams如何从scala中的kafka消息中获取时间戳
问题描述
我正在运行一个简单的 Kafka 流应用程序,它将使用 Node JS 记录的信息带到 Kafka 主题。
E.g.
Producer = kafka.Producer
KeyedMessage = kafka.KeyedMessage
client = new kafka.KafkaClient()
producer = new Producer(client)
km = new KeyedMessage('key', 'message')
kafka_message = JSON.stringify({ id: req.session.data.toString(), url: article.info })
payloads = [
{ topic: 'eventTopic', messages: kafka_message,timestamp:timestampNow}
];
producer.send(payloads, function (err, data) {
console.log(data);
});
还需要注意的是,时间戳只是一个数字,表示自 1970 年 6 月以来的秒数。
我正在使用 Scala 中的 Kafka Stream 来使用这些数据。
例如
val builder = new StreamsBuilder
val stream = builder
.stream[String, String]("TopicTest")
.foreach((k:String, v:String) => {
println(k)
println(v)
}
但是,我不确定如何将时间戳(我从 nodeJS 发送)提取到该流。
例如,如果我尝试做这样的事情
val stream = builder
.stream[String, String,Long]("TopicTest")
.foreach((k:String, v:String,timeStamp:Long) => {
println(k)
println(v)
println(timeStamp)
}
这会给出错误“无法解析符号流”。我想知道如何解决这个问题。仅供参考,这是我的流的拓扑和配置。val 拓扑 = builder.build
import java.util.Properties
val props = new Properties()
import org.apache.kafka.streams.StreamsConfig
val appId = this.getClass.getSimpleName.replace("$", "")
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId)
props.put(StreamsConfig.CLIENT_ID_CONFIG, appId)
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ":9092")
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
// Step 4. Create Kafka Streams Client
import org.apache.kafka.streams.KafkaStreams
val ks = new KafkaStreams(topology, props)
ks.start
解决方案
确实存在 TimeStampExtractor ( https://jaceklaskowski.gitbooks.io/mastering-kafka-streams/content/kafka-streams-TimestampExtractor.html )。但是,可以将时间戳作为任何常规 kafka 消息发送。我改变的第一件事是我的 NodeJS 代码。
kafka_message = JSON.stringify({ id: req.session.information.toString(), url: article.info,timestamp:timestampNow.toString() })
payloads = [
{ topic: 'eventTopic', messages: kafka_message}
];
producer.send(payloads, function (err, data) {
console.log(data);
});
我发送的 JSON 消息中现在有一个时间戳字段。
最后,我们可以使用 argonaut 解析 JSON 消息。
val streamEvents = builder
.stream[String, String]("testTopic")
.foreach((k:String, json:String) => {
println(k)
println(json)
println(Parse.parse(json))
val url:String = Parse.parseWith(json, _.field("url").flatMap(_.string).getOrElse("Error!"), msg => msg)
val id:String = Parse.parseWith(json, _.field("id").flatMap(_.string).getOrElse("Error!"), msg => msg)
val timestamp:String = Parse.parseWith(json, _.field("timestamp").flatMap(_.string).getOrElse("Error!"), msg => msg)
val timeStampInt:Long = timestamp.toLong
推荐阅读
- xcode - Xcode 11.0 出现安装额外所需组件未知错误
- java - 如何使用 Selenium 和 Java 从多选列表中获取所选选项的文本
- html - 如何将我的图像浮动到许多段落的右侧?
- android - 防止在触摸外部时关闭加载有导航组件的对话框片段
- javascript - Angular:将对象转换为日期和时刻
- python - 将 subproccess.os 与正则表达式结合以获得过滤后的目录/文件列表
- python-3.x - 如何在不打印负载处理器消息的情况下加载 Stanfordnlp 管道
- java - 使用 Kotlin 的 Moshi 1.8.0 将 HashMaps 列表从/到 JSON 的转换问题失败
- json - Angular 6:SyntaxError:JSON.parse 中位置 0 处的 JSON 中的意外标记 O 具有有效的 JSON
- laravel - Vue + Laravel + tinymce 上传图片被 CORS 策略阻止