java - 跨窗口复制数据以填充不定数量的点
问题描述
我正在尝试将数据记录复制到未来的窗口中。这解决的问题是每个窗口的计算统计数据将更加准确,因为这些数据是连续的(如温度)并且需要基线值。
在这些图中,每个框代表一个固定窗口。每个窗口中的数字代表来自源的 PCollection 中的数据。
这是一个示例输入 PCollection:
+---------+---------+---------+--------->
| 1 2 | 3 | | |
+---------+---------+---------+--------->
结果输出 PCollection:
+---------+---------+---------+--------->
| 1 2 | 2 3 | 3 | 3 |
+---------+---------+---------+--------->
请注意最新数据点(基于事件时间戳)如何转发到下一个窗口。如果有多个空窗口,则必须重新转发该值。
我已经解决了转发一次的问题,方法是通过一个有状态的 DoFn 运行窗口 PCollection,该 DoFn 发出一个额外的重复和修改的元素:
public class DupeFn extends DoFn<Datum, Datum> {
@StateId("latest")
private final StateSpec<ValueState<Datum>> latestStateSpec = StateSpecs.value();
@TimerId("emit")
private final TimerSpec emitSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@ProcessElement
public void processElement(
@Element Datum element,
OutputReceiver<Datum> receiver,
IntervalWindow window,
@StateId("latest") ValueState<Datum> latest,
@TimerId("emit") Timer emit
) {
emit.set(window.maxTimestamp());
Datum prev = latest.read();
if (prev == null || element.timestamp > prev.timestamp) {
latest.write(element);
}
receiver.output(element);
}
@OnTimer("emit")
public void emitLatest(
OutputReceiver<Datum> receiver,
IntervalWindow window,
@StateId("latest") ValueState<Datum> latest
) {
Datum last = latest.read();
// modify the timestamp such that it lands in the next window
last.timestamp = window.end().getMillis() + 10;
last.id += " DUPED";
receiver.outputWithTimestamp(last, new Instant(last.timestamp));
}
}
现在的问题是,如果有一个空窗口,则不会将任何内容复制到进行中的窗口中。理想情况下,行为将如上图所述。
有没有办法做到这一点?
编辑
我发现了这篇相关的未发表的博客文章。
解决方案
目前 Beam 文档存在一个小问题,一旦修复,博客就会出现。循环计时器将为您提供解决此问题的部分方法。因为即使在没有数据的情况下,它也会确保每个间隔窗口中都有活动。
编辑:博客现在可以在这里使用链接到博客
下一部分需要使用全局窗口,这会带来更多的复杂性。下周的 Apache Beam 峰会上将对此进行讨论。
为了保持状态,您需要将固定窗口聚合流到 GlobalWindow。但是 GlobalWindow 不保证顺序,因此您需要遵循以下流程:
@ProcessElement
- 将元素保存到 BaggedState
- 如果尚未设置时间戳,则创建一个 EventTime 时间戳。 你需要这样的东西来解决这个问题
@OnTimer
- 按时间戳读取和排序 BaggedList
- 如果下一个聚合没有值(它是使用循环计时器而不是外部数据源制作的),则将每个聚合的最终值链接到下一个聚合中。
- 输出时间戳为 < 然后 OnTimer.Timestamp 的所有值
- 清除任何已处理元素的包列表,请注意今天这是低效的,因为您无法从列表中删除特定元素。如果您查看 Apache Beam 上的开发列表,将会有一个很好的讨论,关于未来对 Sorted Map 的请求,这将非常有用。
对不起,它不是一个简短的答案!
推荐阅读
- rest - REST GET 端点根据用户角色返回不同的模型
- amazon-web-services - 如何用 AWS RDS 替换最重要的数据库容器?
- angular - 从 Angular 6 中 API 发送的元数据创建类
- javascript - Mongoose .save() 未执行
- javascript - HTML 和 Javascript 中的多个隐藏的 div 函数
- c# - 在控制器之外将 HttpResponseMessage 转换为 HttpActionResult 的最简单方法
- batch-file - 在批处理脚本中使用或在 if 语句中
- c# - 升级包后的 Service Fabric 远程调用
- php - Rest Full API PHP - 获取回发事件
- php - 如何回显 php 错误 (ftp_get)