apache-beam - Apache BEAM 实现 UnboundedSource - BEAM 如何决定创建多少阅读器?
问题描述
我正在实现 UnboundedReader 以使用自定义数据源(基于公司内部、基于订阅的 Java API)。当我执行管道时,我注意到创建了多个 UnboundedReader 实例。BEAM 如何决定调用多少次
public abstract UnboundedSource.UnboundedReader<OutputT> createReader(PipelineOptions options, CheckPointMarkT checkpointMark)
UnboundedSource 的方法?
我的 split() 方法实现为:
public List<? extends UnboundedSource<MyRecord, MyCheckpointMark>> split(int desiredNumSplits, PipelineOptions options) throws Exception {
List<MySubscriptionSource> list = new ArrayList<>(1);
list.add(this);
return list;
}
有没有办法强制只创建一个阅读器?
解决方案
我做了一些挖掘并阅读了直接跑步者的来源。它被写入随机关闭现有阅读器(概率为 5%)并强制恢复检查点:https ://github.com/apache/beam/blob/a679d98cbcc49b01528c168cce8b578338a5bcdd/runners/direct-java/src/main/java/ org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150
没有评论说明原因 - 我的猜测是这样做是为了模拟一些失败率
推荐阅读
- node.js - 尝试需要模块时遇到问题 - Node.js
- spreadsheet - Google Sheet - 在 A 列中搜索“Cat”或“Dog”,然后返回 B 列相应范围的最大值
- .net-core - C# MongoDB 驱动程序通过过滤或任何聚合框架管道替换递归调用
- python - 如何使用 AES 模式 CBC 实现 HMAC 验证
- angular - 动态生成的嵌套 mat-tab-group 未按预期工作
- python - 如何在一个进程中引发异常并在另一个进程中捕获它?
- javascript - 我想在我的项目中添加 gif,我正在编译文件加载器。但我在控制台 Minified React 错误 #130 上收到错误消息
- arrays - VueJs 使用 push 在 LocalStorage 中插入数据数组
- javascript - 检查浏览器是否支持 unadjustedMovement
- node.js - 如何在“包含 EJS 标签”中使用三元运算符