java - 线程“主”org.apache.kafka.streams.errors.InvalidStateStoreException 中的异常:
问题描述
我正在尝试访问我在同一个 java 程序中创建的 inMemoryStore。但返回异常为“线程“主”org.apache.kafka.streams.errors.InvalidStateStoreException 中的异常:状态存储 storeName 可能已迁移到另一个实例。”
当我使用 persistentKeyValueStore 时,它工作正常并且能够创建存储并返回值。
package com.bakdata.streams_store.demo;
import java.util.Collection;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.stre7ams.state.Stores;
import org.apache.kafka.streams.state.StreamsMetadata;
public class InMemoryStore {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-id-0001");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
String storeName = "sample";
KeyValueBytesStoreSupplier stateStore = Stores.inMemoryKeyValueStore(storeName);
StoreBuilder<KeyValueStore<String, String>> storeBuilder = Stores.keyValueStoreBuilder(stateStore, Serdes.String(), Serdes.String());
StreamsBuilder builder = new StreamsBuilder();
builder.addStateStore(storeBuilder);
KStream<String, String> inputStream = builder.stream("material_test1");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
try {
streams.start();
Thread.sleep(30000);
} catch (final Throwable e) {
System.exit(1);
}
final ReadOnlyKeyValueStore<String, String> keyValueStore = streams.store(storeName, QueryableStoreTypes.keyValueStore());
KeyValueIterator<String, String> range = keyValueStore.all();
while (range.hasNext()) {
KeyValue<String, String> next = range.next();
System.out.println("Key: " + next.key + ", value: " + next.value);
}
}
}
线程“主”org.apache.kafka.streams.errors.InvalidStateStoreException 中的异常:状态存储示例可能已迁移到另一个实例。在 org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:62) 在 org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1067) 在 com.bakdata.streams_store.demo .InMemoryStore.main(InMemoryStore.java:59)
我期待打印 ReadOnlyStoreQuery 中的值。
解决方案
您不能在流上拥有 StateStore,因为单个键可能有多个值。您需要先将其变成 KTable ( streams.table(...)
) 或 GlobalKtable ( streams.globalTable(...)
)。
科特林示例:
val businessObjects = streamsBuilder.globalTable("topic", eventStore("store-name"))
哪里eventStore
是:
fun eventStore(name: String) = Materialized.`as`<String, String>(Stores.inMemoryKeyValueStore(name))
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String())
在你开始流之后:
var store: ReadOnlyKeyValueStore<String, String> = streams.store("store-name", keyValueStore<String, String>())
KafkaStreams.StateListener
注意:当流准备好时,还有一个接口
override fun onChange(newState: KafkaStreams.State?, oldState: KafkaStreams.State?) =
Option.fromNullable(newState)
.filter { REBALANCING == oldState && RUNNING == it }
.map { store = streams.store("store-name", keyValueStore<String, String>()) }
.getOrElse { log.info("Waiting for Kafka being in REBALANCING -> RUNNING, but it is $oldState -> $newState") }
或者你也可以把你的流变成一个KTable
with
stream.groupByKey().reduce(...)
就像这里描述的那样。
推荐阅读
- excel - 用户将输入列范围,每列将保存在新工作簿中
- android - 在 Android Studio 的 Navigation Drawer 预览中呈现问题 java.lang.IllegalArgumentException: java.lang.ClassCastException@47d5a628
- csv - 从 Cloud Storage 中的 csv 到 Big Query 表的 Dataprep 流不完整(未加载所有记录)
- c++ - 模板类实例之间的类型转换
- javascript - 从多级嵌套对象中动态提取相同的对象名称
- git - 为什么全局 git config "remote.origin.push" 会覆盖本地 "remote.origin.push"?
- python - 如何将 python 控制台插入 tkinter 屏幕
- python - 为什么 ProcessPoolExecutor 和 Pool 在调用 super() 时会崩溃?
- python - 如何在 python 中读取 csv 文件的前 100 行附加逗号、序列号和句号?
- mysql - 在 MySQL 中导出 Excel 文件时出现“意外的子句排序”错误