首页 > 解决方案 > Apache BEAM 实现 UnboundedSource - BEAM 如何决定创建多少阅读器?

问题描述

我正在实现 UnboundedReader 以使用自定义数据源(基于公司内部、基于订阅的 Java API)。当我执行管道时,我注意到创建了多个 UnboundedReader 实例。BEAM 如何决定调用多少次

public abstract UnboundedSource.UnboundedReader<OutputT> createReader(PipelineOptions options, CheckPointMarkT checkpointMark)

UnboundedSource 的方法?

我的 split() 方法实现为:

public List<? extends UnboundedSource<MyRecord, MyCheckpointMark>> split(int desiredNumSplits, PipelineOptions options) throws Exception {
    List<MySubscriptionSource> list = new ArrayList<>(1);
    list.add(this);
    return list;
}

有没有办法强制只创建一个阅读器?

标签: apache-beamapache-beam-io

解决方案


我做了一些挖掘并阅读了直接跑步者的来源。它被写入随机关闭现有阅读器(概率为 5%)并强制恢复检查点:https ://github.com/apache/beam/blob/a679d98cbcc49b01528c168cce8b578338a5bcdd/runners/direct-java/src/main/java/ org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150

没有评论说明原因 - 我的猜测是这样做是为了模拟一些失败率


推荐阅读