首页 > 解决方案 > 来自kafka的火花流如何指定轮询事件的截止时间

问题描述

我有火花流应用程序,它运行一天结束并消耗上游应用程序发送的 kafka 事件。目前上游应用程序整天都在推送新数据,而我的消费者最终消费了它。我想根据截止时间限制消耗的事件,比如每天下午 6 点。有没有办法指定截止时间来限制根据截止时间消耗的事件,比如 kafka 事件时间戳或其他东西。下面是消费者代码

  KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topicSet, kafkaParams))

标签: scalaapache-sparkapache-kafkaspark-streamingkafka-consumer-api

解决方案


您可以根据时间戳或时间或任何字段过滤掉处理过程中的事件。例如,假设您的事件是 JSON,并且它有一个名为 hour 的字段,它是事件时间小时值。您可以轻松选择仅在 6 之前创建的事件,如下所示。

directStream.foreachRDD { rdd =>
        val eventDfRDD = rdd.filter(record => {
          val option = JSON.parseFull(record).get.asInstanceOf[Map[String, String]]
          option.get("hour") < 1800
        })
      }

推荐阅读