apache-kafka - Spring Cloud Stream Kafka Stream 加入后未写入目标主题
问题描述
这是我的应用程序,它只是从客户主题(输入绑定)和订单主题(订单绑定)中获取对 KStream 的引用。然后它从客户主题创建一个 KTable 并执行与订单 KStream 的连接:
@Configuration
class ShippingKStreamConfiguration {
@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<Int, Customer>, @Input("order") order: KStream<Int, Order>): KStream<Int, OrderShipped> {
val intSerde = Serdes.IntegerSerde()
val customerSerde = JsonSerde<Customer>(Customer::class.java)
val orderSerde = JsonSerde<Order>(Order::class.java)
val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
.withKeySerde(intSerde)
.withValueSerde(customerSerde)
val customerTable: KTable<Int, Customer> = input.groupByKey(Serialized.with(intSerde, customerSerde))
.reduce({ _, y -> y }, stateStore)
return (order.selectKey { key, value -> value.customerId } as KStream<Int, Order>)
.join(customerTable, { orderIt, customer ->
OrderShipped(orderIt.id)
},
Joined.with(intSerde, orderSerde, customerSerde))
}
}
假设这应该写入输出绑定 ( @SendTo("output")
),指向 ordershipment 主题。但是,没有消息写入该主题。
处理器配置:
interface ShippingKStreamProcessor {
@Input("input")
fun input(): KStream<Int, Customer>
@Input("order")
fun order(): KStream<String, Order>
@Input("output")
fun output(): KStream<String, OrderShipped>
}
**application.yml**
spring:
application:
name: spring-boot-shipping-service
cloud:
stream:
kafka:
streams:
binder:
configuration:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
value:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
input:
destination: customer
contentType: application/json
order:
destination: order
contentType: application/json
output:
destination: ordershipments
contentType: application/json
解决方案
处理器定义错误,这是使用@Output
而不是@Input
:
interface ShippingKStreamProcessor {
@Input("input")
fun input(): KStream<Int, Customer>
@Input("order")
fun order(): KStream<String, Order>
@Output("output")
fun output(): KStream<String, OrderShipped>
}
推荐阅读
- java - 如何将值从 application.yml 获取到 ehcache.xml?
- javascript - 使用状态值来引用另一个状态。(例如,{ this.state.[StateRefHere] } )
- css - ReactJS 生产构建 css 用 0 内部钳位函数替换 0px
- google-sheets - Google 表格查询在结果中返回自动增量
- java - 使用 Spring JPA 在 Mysql 中搜索多个字段
- mysql - 选择带有两个 IN(列表)参数的查询
- cvs - CVS 提交不起作用并且不提供任何反馈
- c# - 删除 WPF FlowDocument 左填充?
- javascript - 需要帮助才能在 Jquery 中使用 .append() 到 div 元素
- bayesian - 具有三个层次结构的多级 stan 模型