apache-kafka-streams - 从转换中的数据更新全局存储
问题描述
我目前有一个简单的拓扑:
KStream<String, Event> eventsStream = builder.stream(sourceTopic);
eventsStream.transformValues(processorSupplier, "nameCache")
.to(destinationTopic);
我的事件有时有一个键/值对,而其他时候只有键。我希望能够将值添加到那些缺少值的事件中。我可以在本地状态存储中正常工作,但是当我添加更多任务时,有时键/值事件和值事件位于不同的线程中,因此它们没有正确更新。
我想为此使用全局状态存储,但我很难弄清楚当新的键/值对进入时如何更新全局存储。我使用以下代码创建了一个全局状态存储:
builder.addGlobalStore(stateStore, "global_store", Consumed.with(Serdes.String(), Serdes.String()), new ProcessorSupplier<String, String>() {
@Override
public Processor<String, String> get() {
return new Processor<String, String>() {
private ProcessorContext context;
@Override
public void init(final ProcessorContext processorContext) {
this.context = processorContext;
}
@Override
public void process(final String key, final String value) {
context.forward(key, value);
}
@Override
public void close() {
}
};
}
});
据我所知,它正在工作,但由于该主题中没有数据,我不确定。
所以我的问题是如何从 transformValues 内部更新全局存储? store.put()
失败并显示全局存储为只读错误。
我在 Kafka Streams 上找到了 Write to GlobalStateStore,但接受的答案只是说要更新基础主题,但我不知道该怎么做,因为该主题不在我的流中。
---已编辑---
我在接受的答案中更新了 #1 的代码。我看到新的键/值对出现在global_store
. 但是 globalStore 似乎没有看到新键。如果我重新启动应用程序,它会用主题中的数据填充缓存,但在我停止/启动应用程序之前,新键不可见。
我process(String, String)
在全局存储处理器中添加了日志记录,它显示正在处理的新密钥。有任何想法吗?
解决方案
- 你只能在transformValues中获得对全局状态存储的真正访问权限,如果你想更新一个全局状态存储,是的,你必须将更新发送到全局状态存储的底层输入主题,你的状态会更新消费此更新消息时的值。这背后的原因是,全局状态存储在所有应用程序实例上填充,并使用此输入主题进行容错。您可以通过对拓扑进行分支来做到这一点:
KStream<String, Event> eventsStream = builder.stream(sourceTopic);
//processing message as normal
eventsStream.transformValues(processorSupplier, "nameCache")
.to(destinationTopic);
//this transform to the updated message to global state
eventsStream.transform(updateGlobalStateProcessorSupplier, "nameCache")
.to("global_store");
- 使用低级 API手动构建拓扑,因此您可以使用接收器处理器的名称将消息转发到
destinationTopic
主题和global_state
主题,ProcessorContext.forward
以将消息转发到接收器处理器节点。
推荐阅读
- swift - 用户凭证 - 使用 Apple 登录 (SwiftUI)
- outlook - 在 Outlook 加载项中获取浏览器 URL
- c# - 我的通用 Windows 平台软件是否能够获得启动关机所需的权限?
- physics - 1e-26 到 1e-15 的大数量级的参数缩放?
- javascript - 我应该为我的 JS 数据使用 JSON 还是多维数组?
- html - 如何动态打破 FlexBox 列以开始新列
- python - 为什么写入变量会改变其范围?
- python - 有没有一种直接的方法可以使用 sklearn 或同类方法将数据拟合到圆形/螺旋曲线?
- video - 打开/关闭视频流
- reactjs - React-Table 使一个单元格成为链接