java - 如何在kafka中加入两个不同的主题
问题描述
我有一个名为topic1的主题和一个名为topic2的主题。我有以下代码:
-topic1 的流:
KStream<Long, byte[]> events = builder.stream("topic1", Consumed.with(Serdes.Long(), Serdes.ByteArray()));
- 主题表2:
KTable<Long, byte[]> table = builder.table("topic2",
Consumed.with(Serdes.Long(), Serdes.ByteArray()));
当我与topic1的生产者一起制作时,一切都很好。topic1和topic2的值不同,我想使用此流“事件”使用来自第一个主题生产者的生成记录,然后将此事件附加到秒主题表并保存同一键的第二个主题值的状态(key很长,第一个和第二个主题的数字相同)。换句话说,我想基于相同的键和更新表将流与表连接起来。
我的代码:
StoreBuilder<KeyValueStore<Long, byte[]>> store = Stores
.keyValueStoreBuilder(Stores.persistentKeyValueStore(STORE_TOPIC2), Serdes.Long(), Serdes.ByteArray())
.withLoggingEnabled(new HashMap<>());
builder.addStateStore(store);
events.join(table, KeyValue::new, Joined.with(Serdes.Long(), Serdes.ByteArray(), Serdes.ByteArray()))
.transform(Update::new, STORE_TOPIC2)
.to("topic2", Produced.with(Serdes.Long(), Serdes.ByteArray()));
在最后一行中,我为 topic2 生成了连接事件,但在该主题上没有生成任何内容,我的转换器“更新”如下所示:
private static class Update implements Transformer<Long, KeyValue<byte[], byte[]>, KeyValue<Long, byte[]>> {
private KeyValueStore<Long, byte[]> store1;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
store1 = (KeyValueStore<Long, byte[]>) context.getStateStore(store);
}
@Override
public KeyValue<Long, byte[]> transform(final Long key, final KeyValue<byte[], byte[]> updates) {
System.out.println("Inside event transformer for key: " + key);
System.out.println("Last produced graph: " + store.get(key));
CustomClass c = null;
try {
c = deserializeModel(store.get(key));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (c == null) {
c = new CustomClass(...);
}
try {
return KeyValue.pair(companyKey, serializeModel(companyNetwork));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return null;
}
}
解决方案
推荐阅读
- wordpress - 按钮链接在 WP 面包店的定价平面形式中不起作用
- swagger - 不应该有额外的属性 additionalProperty content
- scala - 无法解决来自 Nexus Repository Manager sbt 的快照依赖关系
- reactjs - 如何在 ReactJs 中使用不同的文件进行不同的构建
- android - 如何阻止 PDF 查看应用程序将以前的 PDF 保存在堆栈中?
- function - 将 char** 映射到列表
使用 SWIG - python - 使用 boto3 附加的 EBS 卷没有 NVMe 块设备
- javascript - 从退出应用程序后离开的屏幕开始 -RN
- python - 如何在 django 测试中设置 ldap_user?
- python - Python Tensorflow“X”值无穷无尽