java - Kafka Streams:实现一个简单的 KeyValueStore,我可以在其中放置和获取数据
问题描述
我有一个 Kafka 流应用程序,它对传入状态进行操作,需要在写入下一个主题之前存储状态。只有在本地存储中更新状态后才应进行写入。
像这样的东西。
stream.map(this::getAndUpdateState)
.map(this::processStateAndEvent)
.to("topicname");
所以getAndUpdateState()
我可以这样做
state = store.get(key); // or new if null
state = updateState(state, event); // update changes to state
store.put(key, state); // write back the state
return state;
如何在 kafka 商店中实现简单的 get() 和 put() 操作?我已经尝试过使用KeyValueStore但它有问题,因为我必须将它添加一个源和接收器处理器等等。
或者,使用 KTable 或其他概念获取和放入 kafka 的方法也可以。
解决方案
感谢user152468和Matthias J. Sax的建议。
我能够使用transform()
方法在 kafka 流中进行有状态处理。下面给出了基于原始管道示例的完整工作代码。
管道.java:
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class Pipe{
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
// setting Configs
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// initializing a streambuilder for building topology.
final StreamsBuilder builder = new StreamsBuilder();
// creating a KStream that is continuously generating records from its source kafka topic "streams-plaintext-output"
KStream<String, String> source = builder.stream("streams-plaintext-input");
StoreBuilder<KeyValueStore<String, Long>> wordCountsStore = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("WordCountsStore"),
Serdes.String(),
Serdes.Long())
.withCachingEnabled();
builder.addStateStore(wordCountsStore);
source.map((k, v) -> KeyValue.pair("key", v))
.peek((k, s) -> System.out.printf("After keying: %s, value: %s\n", k, s))
.transform(new SampleTransformSupplier(wordCountsStore.name()), wordCountsStore.name())
.peek((k, s) -> System.out.printf("After transform: %s, value: %s\n", k, s));
// writing this source to another kafka topic "streams-pipe-output"
source.to("streams-pipe-output");
// generating the topology
final Topology topology = builder.build();
System.out.print(topology.describe());
// constructing a streams client with the properties and topology
final KafkaStreams streams = new KafkaStreams(topology, properties);
final CountDownLatch latch = new CountDownLatch(1);
// attaching shutdown handler
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run(){
streams.close();
latch.countDown();
}
});
try{
streams.start();
latch.await();
} catch (Throwable e){
System.exit(1);
}
System.exit(0);
}
private static class SampleTransformSupplier implements TransformerSupplier<String, String, KeyValue<String, String>> {
final private String stateStoreName;
public SampleTransformSupplier(String stateStoreName) {
this.stateStoreName = stateStoreName;
}
@Override
public Transformer<String, String, KeyValue<String, String>> get() {
return new Transformer<String, String, KeyValue<String, String>>() {
private KeyValueStore<String, Long> stateStore;
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext processorContext) {
stateStore = (KeyValueStore<String, Long>) processorContext.getStateStore(stateStoreName);
}
@Override
public KeyValue<String, String> transform(String key, String value) {
Long countSoFar = stateStore.get(key);
if(countSoFar == null){
System.out.print("Initializing count so far. this message should be printed only once");
countSoFar = 0L;
}
countSoFar += value.length();
System.out.printf(" Key: %s, Value: %s, Count: %d\n\n", key, value, countSoFar);
stateStore.put(key, countSoFar);
return KeyValue.pair(key, value);
}
@Override
public void close() {
// No need to close as this is handled by kafka.
}
};
}
}
}
推荐阅读
- python-3.x - 如何在自定义 keras 层上使用自定义激活
- api - 方法文件返回方法 Illuminate\Http\Response::file 不存在
- python - 要绘制的点不按顺序
- c - 原始类型声明如何在 C 中工作?
- python - 如何忽略根警告?
- django - 我如何在一个引导表 django 2.1 上填充我的数据
- excel - VBA-excel循环文本框
- javascript - 通过使用 javascript 在 HTML 中单击来更改图片服务器的时间
- python - 网页抓取python多个属性(div和id)
- r - 根据可用库存将项目划分为其他 ID