scala - 左连接时的拓扑异常
问题描述
我正在尝试做一个简单stream.leftJoin(table)
的但在运行时得到以下异常:
TopologyException: Invalid topology: StateStore null is not added yet
这是我的代码大致的样子,我注释掉了实现细节以保持简短:
val streamsConfiguration: Properties = {
val p = new Properties()
// api config
p.put(StreamsConfig.APPLICATION_ID_CONFIG /**/)
p.put(StreamsConfig.CLIENT_ID_CONFIG /**/)
// kafka broker
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
// local state store
p.put(StreamsConfig.STATE_DIR_CONFIG, "./streams-state")
p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
// serdes
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, classOf[StringSerde])
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, classOf[StringSerde])
p
}
val builder = new StreamsBuilderS()
val rawInfoTable: KTableS[String, String] = builder.table("station_info")
val infoTable: KTableS[String, StationInfo] = rawInfoTable.mapValues{jsonString =>
/** */
}.filter(/** */)
.mapValues((/** */)
val rawStatusStream: KStreamS[String, String] = builder.stream("station_status")
val statusStream: KStreamS[String, StationStatus] = rawStatusStream.flatMapValues{jsonString =>
/** */
}
val outputStream: KStreamS[String, String] = statusStream
.leftJoin(infoTable, calculateStats)
.filter((_, availability) => {
/** */
})
.map((stationId: String, availability) => {
/** */
})
outputStream.to("low_availability")
val streams = new KafkaStreams(builder.build(), streamsConfiguration)
streams.cleanUp()
streams.start()
我什至尝试手动添加一个StateStore
通过:
val store = Stores.inMemoryKeyValueStore("my-store")
val storeBuilder = Stores.keyValueStoreBuilder(store, new StringSerde(), new StringSerde())
val builder = new StreamsBuilderS()
builder.addStateStore(storeBuilder)
但它似乎并没有改变任何东西。我正在使用来自 lightbend 的 kafka 流包装器:"com.lightbend" %% "kafka-streams-scala" % "0.2.1"
我检查的所有示例似乎都不关心添加状态存储,所以我有点困惑。有人可以指出我正确的方向吗?这有什么关系STATE_DIR_CONFIG
吗?还是使用我在本地运行的 Kafka 集群?
解决方案
推荐阅读
- google-oauth - Google openID 客户端 userinfo 端点提供的 G Suite 自定义字段
- kotlin - 如何使用来自 kotlin 的第三方 jar 中的 internal 关键字访问类?
- python - Python非标准日期时间分组/选择
- node.js - “jest --watch”:错误:ENOSPC:已达到文件观察者数量的系统限制,请观看
- python - 如何使用 Altair Viz 在凹凸图中鼠标悬停上的线条着色?
- c - C:代码生成segfault non-cost point to string?
- spring - Spring JUnit 5 测试中的异常
- python - Sqlalchemy 不会为 Kaggle 笔记本加载
- python - 最大化所有护士在调度中的分布
- c# - 如何检查checkListBox中的一项,然后在同一个checkListBox中输出选中的框,但在Form2.cs中