python - apache beam python sdk可以进行状态处理吗?
问题描述
我一直在关注使用 Apache Beam 进行及时(和有状态)处理的文章,虽然内容全面且写得很好,但它没有指定如何使用 python 实现相同的目标。更具体地说,它指出:
Beam 的 Python SDK 尚不支持状态和计时器。
虽然它没有说明原因......是否有先天的原因为什么这是不可能的?
我希望为我打算实现的信号处理系统实现一个重放缓冲区/窗口系统。因此,长度为 W 的特征的滑动窗口/历史帧缓冲区会随着最新窗口不断更新。
在 Java 中,它的实现如下所示:
静态类 FeatureFrameBuffer 扩展 DoFn,FeatureFrame> { Integer bufferSize;
public FeatureFrameBuffer(Integer bufferSize) {
this.bufferSize = bufferSize;
}
@StateId("buffer")
private final StateSpec<BagState<KV<String, Double>>> bufferedFeatures = StateSpecs.bag();
@StateId("count")
private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();
@ProcessElement
public void process(
ProcessContext context,
@StateId("buffer") BagState<KV<String, Double>> bufferState,
@StateId("count") ValueState<Integer> countState
) {
int count = firstNonNull(countState.read(), 0);
count = count + 1;
countState.write(count);
bufferState.add(context.element());
// Only output buffer if count is greater than bufferSize
// Remove last element from buffer if count
// greater than or equals buferSize
if (count >= bufferSize) {
bufferState.read();
createFeatureFrame();
context.output(featureFrame);
bufferState.clear();
countState.clear();
}
}
}
在我开始开发自定义实现之前,我想知道是否可以使用 python sdk 实现相同的目标。关于此事的一些建议会很棒。
解决方案
自 Beam 版本 2.9.0 起,用户状态和计时器可用。该文档尚未更新。
推荐阅读
- javascript - 在反应中从 API 调用渲染嵌套对象属性
- excel - 将 VBA 函数和宏移至网络驱动器外的新工作簿,但现在返回 #Name?而不是适当的价值
- r - 在 txt 文件的段落之间添加额外的行
- python - X-WWW 将请求列表放入 Dict
- javascript - 如何创建使用 react-navigation 的泛型的新泛型对象类型?
- javascript - 正则表达式匹配 / 和 .jsp 之间的字符串
- docker - Chromedriver:Chrome 连接因 Docker 容器中的硒而失败
- c++builder - 任何人都知道/知道 C++Builder 的“通用”字符串类吗?
- swift - SwiftUI:使用非 @State var 时出现意外动画
- python - 如何响应 NetfilterQueue 中的传入数据包?