首页 > 解决方案 > 结构流式传输:前 n 行

问题描述

最近,我在实时数据工程中遇到了结构流中的“前 n 行”问题。我需要获取 50 个最新的事件时间记录作为输出,但是结构流式传输给了我一个完整的无界表或几个更新的结果。我在网上搜索了很多,有以下几种方法:

(1)使用TTL,但我认为它是基于摄取时间的,这不是我想要的事件时间;

(2) 使用 Flink 捕获最新的事件时间记录。同时使用 flink 和结构流是一件很麻烦的事情。如下,我试过用flink 1.6,statics是表吗?我不知道如何处理,因为没有输出。

val source: KafkaTableSource = Kafka010JsonTableSource.builder()
.forTopic("BINANCE_BTCUSDT_RESULT")
.withKafkaProperties(properties)
.withSchema(TableSchema.builder()
 .field("timestamp", Types.SQL_TIMESTAMP)
 .field("future_max", Types.DOUBLE)
 .field("future_min", Types.DOUBLE)
 .field("close",Types.DOUBLE)
 .field("quantities",Types.DOUBLE).build())
.fromEarliest()
.build()
tableEnv.registerTableSource("statics", source)
val statics = tableEnv.scan("statics")
statics.?

任何人都可以告诉我更多关于前 n 行问题的解决方法吗?如果问题解决了,如何将数据框发布到 url 中?

标签: apache-sparkspark-streamingapache-flink

解决方案


我推荐你使用 Flink 1.5,因为 1.6 还不稳定(其实 1.5 刚刚发布)。

在 Flink 中使用事件时间时,Flink 需要知道你的时间戳,并且它需要水印,这表明事件时间的流向。要使用 Kafka010JsonTableSource 执行此操作,您应该配置rowtime 属性

请注意,fetch() 仅在以批处理模式使用 Flink SQL 时可用。


推荐阅读