首页 > 解决方案 > Kafka Streams如何从scala中的kafka消息中获取时间戳

问题描述

我正在运行一个简单的 Kafka 流应用程序,它将使用 Node JS 记录的信息带到 Kafka 主题。

E.g. 
  Producer = kafka.Producer
  KeyedMessage = kafka.KeyedMessage
  client = new kafka.KafkaClient()
  producer = new Producer(client)
  km = new KeyedMessage('key', 'message')
  kafka_message = JSON.stringify({ id: req.session.data.toString(), url: article.info })
  payloads = [
    { topic: 'eventTopic', messages: kafka_message,timestamp:timestampNow}
  ];
  producer.send(payloads, function (err, data) {
    console.log(data);
  });

还需要注意的是,时间戳只是一个数字,表示自 1970 年 6 月以来的秒数。

我正在使用 Scala 中的 Kafka Stream 来使用这些数据。

例如

val builder = new StreamsBuilder

val stream = builder
    .stream[String, String]("TopicTest")
    .foreach((k:String, v:String) => {
     println(k)
     println(v) 
}

但是,我不确定如何将时间戳(我从 nodeJS 发送)提取到该流。

例如,如果我尝试做这样的事情

val stream = builder
    .stream[String, String,Long]("TopicTest")
    .foreach((k:String, v:String,timeStamp:Long) => {
     println(k)
     println(v) 
     println(timeStamp)
}

这会给出错误“无法解析符号流”。我想知道如何解决这个问题。仅供参考,这是我的流的拓扑和配置。val 拓扑 = builder.build

  import java.util.Properties
  val props = new Properties()
  import org.apache.kafka.streams.StreamsConfig

  val appId = this.getClass.getSimpleName.replace("$", "")
  props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId)
  props.put(StreamsConfig.CLIENT_ID_CONFIG, appId)
  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ":9092")
  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")


  // Step 4. Create Kafka Streams Client
  import org.apache.kafka.streams.KafkaStreams
  val ks = new KafkaStreams(topology, props)


  ks.start

标签: node.jsscalaapache-kafkastreamtimestamp

解决方案


确实存在 TimeStampExtractor ( https://jaceklaskowski.gitbooks.io/mastering-kafka-streams/content/kafka-streams-TimestampExtractor.html )。但是,可以将时间戳作为任何常规 kafka 消息发送。我改变的第一件事是我的 NodeJS 代码。

kafka_message = JSON.stringify({ id: req.session.information.toString(), url: article.info,timestamp:timestampNow.toString() })
  payloads = [
    { topic: 'eventTopic', messages: kafka_message}
  ];
 
  producer.send(payloads, function (err, data) {
    console.log(data);
  });

我发送的 JSON 消息中现在有一个时间戳字段。

最后,我们可以使用 argonaut 解析 JSON 消息。

 val streamEvents = builder
    .stream[String, String]("testTopic")
    .foreach((k:String, json:String) => {
      println(k)
      println(json)
      println(Parse.parse(json))
      val url:String = Parse.parseWith(json, _.field("url").flatMap(_.string).getOrElse("Error!"), msg => msg)
      val id:String = Parse.parseWith(json, _.field("id").flatMap(_.string).getOrElse("Error!"), msg => msg)
      val timestamp:String = Parse.parseWith(json, _.field("timestamp").flatMap(_.string).getOrElse("Error!"), msg => msg)
      val timeStampInt:Long = timestamp.toLong

推荐阅读