首页 > 解决方案 > 左连接时的拓扑异常

问题描述

我正在尝试做一个简单stream.leftJoin(table)的但在运行时得到以下异常:

TopologyException: Invalid topology: StateStore null is not added yet

这是我的代码大致的样子,我注释掉了实现细节以保持简短:

val streamsConfiguration: Properties = {
  val p = new Properties()
  // api config
  p.put(StreamsConfig.APPLICATION_ID_CONFIG /**/)
  p.put(StreamsConfig.CLIENT_ID_CONFIG /**/)
  // kafka broker
  p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  // local state store
  p.put(StreamsConfig.STATE_DIR_CONFIG, "./streams-state")
  p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
  // serdes
  p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, classOf[StringSerde])
  p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, classOf[StringSerde])
  p
}

val builder = new StreamsBuilderS()

val rawInfoTable: KTableS[String, String] = builder.table("station_info")
val infoTable: KTableS[String, StationInfo] = rawInfoTable.mapValues{jsonString =>
  /** */
}.filter(/** */)
  .mapValues((/** */)

val rawStatusStream: KStreamS[String, String] = builder.stream("station_status")
val statusStream: KStreamS[String, StationStatus] = rawStatusStream.flatMapValues{jsonString =>
  /** */
}

val outputStream: KStreamS[String, String] = statusStream
  .leftJoin(infoTable, calculateStats)
  .filter((_, availability) => {
    /** */
  })
  .map((stationId: String, availability) => {
    /** */
  })

outputStream.to("low_availability")

val streams = new KafkaStreams(builder.build(), streamsConfiguration)

streams.cleanUp()
streams.start()

我什至尝试手动添加一个StateStore通过:

val store = Stores.inMemoryKeyValueStore("my-store")
val storeBuilder = Stores.keyValueStoreBuilder(store, new StringSerde(), new StringSerde())
val builder = new StreamsBuilderS()
builder.addStateStore(storeBuilder)

但它似乎并没有改变任何东西。我正在使用来自 lightbend 的 kafka 流包装器:"com.lightbend" %% "kafka-streams-scala" % "0.2.1"

我检查的所有示例似乎都不关心添加状态存储,所以我有点困惑。有人可以指出我正确的方向吗?这有什么关系STATE_DIR_CONFIG吗?还是使用我在本地运行的 Kafka 集群?

标签: scalaapache-kafkaapache-kafka-streams

解决方案


推荐阅读