首页 > 解决方案 > apache beam python sdk可以进行状态处理吗?

问题描述

我一直在关注使用 Apache Beam 进行及时(和有状态)处理的文章,虽然内容全面且写得很好,但它没有指定如何使用 python 实现相同的目标。更具体地说,它指出:

Beam 的 Python SDK 尚不支持状态和计时器。

虽然它没有说明原因......是否有先天的原因为什么这是不可能的?

我希望为我打算实现的信号处理系统实现一个重放缓冲区/窗口系统。因此,长度为 W 的特征的滑动窗口/历史帧缓冲区会随着最新窗口不断更新。

在 Java 中,它的实现如下所示:

静态类 FeatureFrameBuffer 扩展 DoFn,FeatureFrame> { Integer bufferSize;

    public FeatureFrameBuffer(Integer bufferSize) {
        this.bufferSize = bufferSize;
    }

    @StateId("buffer")
    private final StateSpec<BagState<KV<String, Double>>> bufferedFeatures = StateSpecs.bag();

    @StateId("count")
    private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

    @ProcessElement
    public void process(
                        ProcessContext context,
                        @StateId("buffer") BagState<KV<String, Double>> bufferState,
                        @StateId("count") ValueState<Integer> countState
                        ) {

        int count = firstNonNull(countState.read(), 0);
        count = count + 1;
        countState.write(count);
        bufferState.add(context.element());

        // Only output buffer if count is greater than bufferSize
        // Remove last element from buffer if count
        // greater than or equals buferSize
        if (count >= bufferSize) {
            bufferState.read();
            createFeatureFrame();
            context.output(featureFrame);
            bufferState.clear();
            countState.clear();
        }
    }
}

在我开始开发自定义实现之前,我想知道是否可以使用 python sdk 实现相同的目标。关于此事的一些建议会很棒。

标签: pythontensorflowgoogle-cloud-dataflowapache-beamstateful

解决方案


自 Beam 版本 2.9.0 起,用户状态和计时器可用。该文档尚未更新。


推荐阅读