java - 根据唯一键创建不同的会话
问题描述
我从一个向我发送 JSON 消息的 kafka 主题获取消息。我想从该 json 消息中提取一个字段(可以是一个 ID),并且我想为 'n' 个唯一设备 IDs 创建 'n' 个会话。
我已经尝试为我收到的每个唯一 ID 创建一个新的会话实例,但是在创建新的会话窗口实例之后,即在管道中为每个 ID 创建一个新分支后,我无法将下一个即将到来的消息推送到相应的分支已经存在。
我想要的预期结果是,假设我们收到类似的消息
{ID:1,...}, {ID:2,...}, {ID:3,...},{ID:1,...}
将创建三个不同的会话,第四条消息将发送到设备 ID 1 的会话。有没有办法在 apache 梁编程范式或 Java 编程范式中做到这一点?任何帮助将不胜感激。
解决方案
是的,如果您使用自定义WindowFn
. 您可以对Sessions类进行子类化并对其进行修改,以根据每个元素的 ID 以不同的方式设置间隔持续时间。您可以在 中执行此操作assignWindows
,如下所示Sessions
:
@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) {
// Assign each element into a window from its timestamp until gapDuration in the
// future. Overlapping windows (representing elements within gapDuration of
// each other) will be merged.
return Arrays.asList(new IntervalWindow(c.timestamp(), gapDuration));
}
该类AssignContext
可用于访问分配给此窗口的元素,这将允许您检索该元素的 ID。
听起来您还希望将具有不同 ID 的元素分组在不同的窗口中(即,如果元素 A 和 B 在间隙持续时间内进入但具有不同的 ID,则它们仍应位于不同的窗口中)。这可以通过GroupByKey
将元素的 ID 作为键执行 a 来完成。如 Beam Programming Guide 中所述,会话窗口基于每个键应用,因此这将按 ID 分隔元素。
推荐阅读
- c# - 如何检测 C# 应用程序中的内存泄漏?
- python - spaCy Matcher 条件或/和 Python
- python - 如何检查一个点是否在由另外两个点定义的线上?
- python - for循环中'for'前面的字母代表什么?
- stimulusjs - 具有相同控制器的嵌套元素
- python - tf.keras 子模型在训练、保存和加载中的行为是什么?
- google-apps-script - Google Sheets API / JS:在工作表中查找值(任何地方)
- reactjs - React:更新受控子组件
- python - matplotlib(mpl_connect) in for loop to create many interactive plots does not work
- mysql - MySQL - 使用外键将行复制到另一个表