首页 > 解决方案 > 将按时间排序的事件发送到 Kafka

问题描述

我正在使用 Autoloader(来自 Databricks)来摄取一些镶木地板文件,然后将它们发送到 Kafka 主题。

我能够毫无问题地读取和写入文件,但我对顺序有疑问。

这些文件timestamp在有效负载内包含一个字段,指示文件的修改日期。

是否可以将我通过自动加载器收到的每个事件写入到该日期订购的 Kafka 接收器中?

我希望能够在此基础上用 Kafka 编写从最旧到最新的事件timestamp

我已经考虑定义一个将被调用的函数,在该函数中它为每个批次都foreachBatch做了一个简单的操作。orderBy像这样的东西:


def orderByFunc ( batchDF:DataFrame, batchID:Long ) : Unit = {

  val rodered_df=batchDF.orderBy($"some_field".desc) // order by the timestamp field
  rodered_df.write.format("kafka").option(...) // write into Kafka
 
}

streamingInputDF
                .writeStream
                .queryName(job_name)
                .option("checkpointLocation", checkpoint_path)
                .foreachBatch(orderByFunc _)
                .start()

有没有更简单的方法?我错过了什么吗?

非常感谢大家

标签: azureapache-kafkaspark-structured-streamingazure-databricks

解决方案


推荐阅读