java - KStreams:你如何获得记录的(起源)主题?
问题描述
我有以下
//Config setup
Properties props = ...; //setup
List<String> topicList = Arrays.asList({"A", "B", "C"});
StreamBuilder builder = new StreamBuilder();
KStream<String, String> source = builder.stream(topicList);
source
.map((k,v) -> {
//How can i get the topic of the record here
})
.to((k,v,r) -> {//busy code for topic routing});
new KafkaStream(builder.build(), properties).start();
解决方案
您可以使用ProcessorContext.topic()获取所需的主题名称。要获得对 ProcessorContext 的访问权限,请使用 KStream.process() 为其提供适当的处理器实现。
您也可以使用 KStream.transform():
KStream<InputKeyType, InputValueType> stream2 = stream.transform(new TransformerSupplier<InputKeyType, InputValueType, KeyValue<OutputKeyType, OutputValueType>>() {
@Override
public Transformer<InputKeyType, InputValueType, KeyValue<OutputKeyType, OutputValueType>> get() {
return new Transformer<InputKeyType, InputValueType, KeyValue<OutputKeyType, OutputValueType>>() {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<OutputKeyType, OutputValueType> transform(InputKeyType key, InputValueType value) {
this.context.topic() // topic name you need
// logic here
return new KeyValue<>(OutputKeyType key, OutputValueType value);
}
@Override
public void close() {
}
};
}
});
推荐阅读
- microsoft-teams - Microsoft Teams - 更新选项卡 URL
- google-sheets - 按月份分组的数据透视表中的堆积条形图?
- reactjs - 在父组件的 onClick 上一次又一次地添加子组件,每次都使用新的道具到子组件
- verilog - 时钟域交叉信号和抖动要求
- email - 自定义域电子邮件不适用于 Heroku
- newrelic - Quarkus 原生容器中的 Newrelic 支持
- emacs - 将光标定位在 yasnippet 中的指定位置
- keras - ValueError:输入数组应具有与目标数组 LSTM Keras 相同数量的样本
- java - Java“空指针异常” - responseEntity.getBody().isNull()
- c++ - 字符串数组C ++中的字符识别不正确