首页 > 解决方案 > 根据唯一键创建不同的会话

问题描述

我从一个向我发送 JSON 消息的 kafka 主题获取消息。我想从该 json 消息中提取一个字段(可以是一个 ID),并且我想为 'n' 个唯一设备 IDs 创建 'n' 个会话

我已经尝试为我收到的每个唯一 ID 创建一个新的会话实例,但是在创建新的会话窗口实例之后,即在管道中为每个 ID 创建一个新分支后,我无法将下一个即将到来的消息推送到相应的分支已经存在。

我想要的预期结果是,假设我们收到类似的消息

{ID:1,...}, {ID:2,...}, {ID:3,...},{ID:1,...}

将创建三个不同的会话,第四条消息将发送到设备 ID 1 的会话。有没有办法在 apache 梁编程范式或 Java 编程范式中做到这一点?任何帮助将不胜感激。

标签: javagoogle-cloud-dataflowapache-beam

解决方案


是的,如果您使用自定义WindowFn. 您可以对Sessions类进行子类化并对其进行修改,以根据每个元素的 ID 以不同的方式设置间隔持续时间。您可以在 中执行此操作assignWindows,如下所示Sessions

  @Override
  public Collection<IntervalWindow> assignWindows(AssignContext c) {
    // Assign each element into a window from its timestamp until gapDuration in the
    // future.  Overlapping windows (representing elements within gapDuration of
    // each other) will be merged.
    return Arrays.asList(new IntervalWindow(c.timestamp(), gapDuration));
  }

该类AssignContext可用于访问分配给此窗口的元素,这将允许您检索该元素的 ID。

听起来您还希望将具有不同 ID 的元素分组在不同的窗口中(即,如果元素 A 和 B 在间隙持续时间内进入但具有不同的 ID,则它们仍应位于不同的窗口中)。这可以通过GroupByKey将元素的 ID 作为键执行 a 来完成。如 Beam Programming Guide 中所述,会话窗口基于每个键应用,因此这将按 ID 分隔元素。


推荐阅读