apache-spark - 结构流式传输:前 n 行
问题描述
最近,我在实时数据工程中遇到了结构流中的“前 n 行”问题。我需要获取 50 个最新的事件时间记录作为输出,但是结构流式传输给了我一个完整的无界表或几个更新的结果。我在网上搜索了很多,有以下几种方法:
(1)使用TTL,但我认为它是基于摄取时间的,这不是我想要的事件时间;
(2) 使用 Flink 捕获最新的事件时间记录。同时使用 flink 和结构流是一件很麻烦的事情。如下,我试过用flink 1.6,statics是表吗?我不知道如何处理,因为没有输出。
val source: KafkaTableSource = Kafka010JsonTableSource.builder()
.forTopic("BINANCE_BTCUSDT_RESULT")
.withKafkaProperties(properties)
.withSchema(TableSchema.builder()
.field("timestamp", Types.SQL_TIMESTAMP)
.field("future_max", Types.DOUBLE)
.field("future_min", Types.DOUBLE)
.field("close",Types.DOUBLE)
.field("quantities",Types.DOUBLE).build())
.fromEarliest()
.build()
tableEnv.registerTableSource("statics", source)
val statics = tableEnv.scan("statics")
statics.?
任何人都可以告诉我更多关于前 n 行问题的解决方法吗?如果问题解决了,如何将数据框发布到 url 中?
解决方案
我推荐你使用 Flink 1.5,因为 1.6 还不稳定(其实 1.5 刚刚发布)。
在 Flink 中使用事件时间时,Flink 需要知道你的时间戳,并且它需要水印,这表明事件时间的流向。要使用 Kafka010JsonTableSource 执行此操作,您应该配置rowtime 属性。
请注意,fetch() 仅在以批处理模式使用 Flink SQL 时可用。
推荐阅读
- python - Django ORM:使用 F、MAX 和 GROUP BY 的组合
- python - 将对象分配给 django 中的特定用户?
- elasticsearch - Elasticsearch 改进滚动
- tcl - 自动显示TK标签中的TCL子字段
- excel - 如果单元格包含字符串,则VBA复制单元格
- node.js - 如何为首次构建电子设置 package.json
- reactjs - jsx 函数中使用的括号渲染和卷曲
- android - 将数据发布到 RESTful-webservice - 为什么此代码在 Eclipse 中有效,但在 android 中无效?
- php - 我无法使用从 .txt 获得的数据连接到数据库
- javascript - javascript - 将按钮单击的处理推迟到页面加载后