azure - 将按时间排序的事件发送到 Kafka
问题描述
我正在使用 Autoloader(来自 Databricks)来摄取一些镶木地板文件,然后将它们发送到 Kafka 主题。
我能够毫无问题地读取和写入文件,但我对顺序有疑问。
这些文件timestamp
在有效负载内包含一个字段,指示文件的修改日期。
是否可以将我通过自动加载器收到的每个事件写入到该日期订购的 Kafka 接收器中?
我希望能够在此基础上用 Kafka 编写从最旧到最新的事件timestamp
。
我已经考虑定义一个将被调用的函数,在该函数中它为每个批次都foreachBatch
做了一个简单的操作。orderBy
像这样的东西:
def orderByFunc ( batchDF:DataFrame, batchID:Long ) : Unit = {
val rodered_df=batchDF.orderBy($"some_field".desc) // order by the timestamp field
rodered_df.write.format("kafka").option(...) // write into Kafka
}
streamingInputDF
.writeStream
.queryName(job_name)
.option("checkpointLocation", checkpoint_path)
.foreachBatch(orderByFunc _)
.start()
有没有更简单的方法?我错过了什么吗?
非常感谢大家
解决方案
推荐阅读
- java - @Version 在插入期间抛出 IncompatibleClassChangeError
- react-native - 在本机反应中连接来自两个提取的两个数据
- node.js - 为什么函数连接没有答案?“无法设置未定义的属性‘集合’”
- jakarta-ee - 需要帮助设置“工作场所”
- python - DataFrame interrows() 和 .to_csv:逐行写入
- reactjs - 如果项目尚未从状态加载,则阻止渲染尝试
- c# - lambda 表达式内的转换 - LINQ to Entities 无法识别方法“Int32 ToInt32(System.String)”方法
- javascript - 使用带有多个测试的 filter 方法过滤对象数组
- java - 使用 AsyncTask 时无法从 HttpURLConnection.getResponseCode 接收值?
- regex - IPv4 正则表达式捕获地址的不正确部分