首页 > 解决方案 > 进行表连接时如何获得最后一个结果(在flink sql中使用toRetractStream

问题描述

这是我使用 flink sql API 连接两个表的代码

tEnv.createTemporaryView("A", streamA,"speed_sum,cnt,window_start_time,window_end_time");
tEnv.createTemporaryView("B",streamB,"speed_sum,cnt,window_start_time,window_end_time");

String execSQL1 = "select A.speed_sum+COALESCE(B.speed_sum,0.0), " +
        "A.cnt+COALESCE(B.cnt,0), " +
        "A.window_start_time, A.window_end_time " +
        "from A " +
        "left join B on A.window_start_time = B.window_start_time ";
Table table = tEnv.sqlQuery(execSQL1);
DataStream<Tuple2<Boolean, Row>> streamResult = tEnv.toRetractStream(table, Row.class).;
streamResult.print("streamResult");

我的输出是这样的:

 streamA-----------(5078.000000,199,1635333650000,1635333660000)
 streamB-----------(1721.388891,111,1635333650000,1635333660000)
 streamResult:3> (true,5078.0,199,1635333650000,1635333660000) // drop
 streamResult:3> (false,5078.0,199,1635333650000,1635333660000) // drop
 streamResult:3> (true,6799.388891220093,310,1635333650000,1635333660000)  // want to save

如您所见,toRetractStreamAPI 将生成三条记录。我想知道如何获得最后一条记录,它正确地加起来了A.speed_sumB.speed_sumA.cntB.cnt)。

标签: apache-flinkflink-streamingflink-sqlflink-table-api

解决方案


一些流式 SQL 查询,例如您的 JOIN,会产生更新流。鉴于流的连续性、无限性,Flink 无法知道何时达到“最终”结果。

如果您在有界输入上执行此查询,您可以在批处理模式下执行它,然后只打印最终结果。

在某些流式处理用例中,您可以使用时间属性而不是时​​间戳,然后 Flink SQL 规划器能够推断某些查询的结果何时完成。例如,这就是 Flink SQL 中的窗口能够产生追加流而不是更新流的方式。您的查询几乎是一个区间连接。如果它是一个间隔连接,那么结果流将是一个附加流,您不必处理这些撤回。


推荐阅读