首页 > 解决方案 > 抑制 join (KStream,KStream) 的输出,直到超过 JoinWindow

问题描述

我正在使用 Java 的 KStreams-API。我正在尝试通过对结果集的点击来加入搜索查询。一个查询可以产生 0 到 n 次点击。点击和查询分别记录在一个主题中,并且可以通过请求 ID 加入。当我加入他们时,加入的对首先在带有点击信息的部分中为空。(显然,因为点击发生需要时间)。我将记录输出到 cassandra 以在之后对它们执行聚合(我知道我不是一个好孩子)。无论如何,我不想每次查询都先有一个“空连接”,我只想要超过 JoinWindow 之后的结果。那么有没有办法抑制 Join 的输出直到窗口结束?

这是连接的(Kotlin)代码:

// Consuming the query log topic with message key = request_id
val queryLogs = streamBuilder.stream("query_logs",
            Consumed.with(stringSerdes, querySerdes))

// Consuming the click log topic with message key = request_id
val clickLogs = streamBuilder.stream("click_logs",
            Consumed.with(stringSerdes, clickSerdes))

// Joining the click and the query log on request id to get the information which queries resulted in which clicks
val outerJoin = queryLogs.outerJoin(clickLogs, QueryClickJoiner(),
            JoinWindows.of(Duration.ofMinutes(30)),
            Joined.with(stringSerdes, clickSerdes, querySerdes))
outerJoin.to("joined_clicks", Produced.with(stringSerdes,queryClickSerdes!!))

生成的“query-click”将包含一个 query_log 字段和一个结果点击列表。我只希望在 30 分钟的加入窗口结束后得到结果,而不是在单击与查询连接时每次更新。

亲切的问候

标签: javacassandraapache-kafka-streams

解决方案


推荐阅读