首页 > 解决方案 > 如何忽略从同一主题读取和写入不同事件类型的 Kafka Streams 应用程序中的某些类型的消息

问题描述

假设 Spring Cloud Stream 应用程序KStreamorder topic. OrderCreated {"id":x, "productId": y, "customerId": z}它对事件感兴趣。一旦到达,它就会对其进行处理并生成一个输出事件OrderShipped {"id":x, "productId": y, "customerName": <, "customerAddress": z}到相同的order topic.

我面临的问题是,由于它从/向同一主题读取和写入,Kafka Stream 应用程序正在尝试处理自己的写入,这没有任何意义。

如何阻止此应用程序处理它生成的事件?

更新:正如 Artem Bilan 和 sobychako 指出的那样,我曾考虑过使用KStream.filter(),但有一些细节让我怀疑如何处理这个问题:

现在 KStream 应用程序看起来像这样:

interface ShippingKStreamProcessor {
    ...
    @Input("order")
    fun order(): KStream<String, OrderCreated>

    @Output("output")
    fun output(): KStream<String, OrderShipped>

KStream 配置

    @StreamListener
    @SendTo("output")
    fun process(..., @Input("order") order: KStream<Int, OrderCreated>): KStream<Int, OrderShipped> {

订单和输出绑定都指向作为目的地的订单主题。

订单创建类:

data class OrderCreated(var id: Int?, var productId: Int?, var customerId: Int?) {
    constructor() : this(null, null, null)
}

OrderShipped 类

data class OrderShipped(var id: Int?, var productId: Int?, var customerName: String?, var customerAddress: String?) {
    constructor() : this(null, null, null, null)
}

我使用JSON作为消息格式,所以消息如下所示:

考虑到这一点,我正在寻找过滤掉不需要的消息的最佳方法

如果我KStream.filter()现在就使用,当我得到{"id":1, "productId": 7, "customerName": "X", "customerAddress": "Y"}我的时候,我KStream<Int, OrderCreated>会将 OrderShipped 事件解组为带有一些空字段的 OrderCreated 对象:OrderCreated(id:1, productId: 7, customerId: null)。检查空字段听起来并不可靠。

一种可能的解决方案eventType = OrderCreated|OrderShipped是向使用该主题的每种消息/类添加另一个字段 , 。即使在这种情况下,我最终也会拥有一个带有 eventType=OrderShipped 属性的 OrderCreated 类(请记住 KStream< Int,OrderCreated >)。这看起来像一个丑陋的解决方法。有什么改进的办法吗?

还有另一种更自动的方法来处理这个问题吗?例如,如果消息不符合预期的模式(OrderCreated) ,另一种序列化( AVRO ?)会阻止消息被处理吗?根据这篇文章,这种在同一主题中支持多个模式(事件类型)的方式似乎是一个很好的做法:https ://www.confluent.io/blog/put-several-event-types-kafka-topic/ 但是目前尚不清楚如何解组/反序列化不同类型。

标签: apache-kafkaspring-cloudavroapache-kafka-streamsspring-cloud-stream

解决方案


我已经接受布鲁诺的回答作为解决这个问题的有效方法。但是,我认为我已经提出了一种更直接/合乎逻辑的方式,使用带有注释的事件层次结构 JsonTypeInfo

首先,您需要 Order 事件的基类并指定所有子类。请注意,将有一个 type 属性添加到 JSON 文档中,这将有助于 Jackson 编组/解组 DTO:

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
@JsonSubTypes(value = [
    JsonSubTypes.Type(value = OrderCreatedEvent::class, name = "orderCreated"),
    JsonSubTypes.Type(value = OrderShippedEvent::class, name = "orderShipped")
])
abstract class OrderEvent

data class OrderCreatedEvent(var id: Int?, var productId: Int?, var customerId: Int?) : OrderEvent() {
    constructor() : this(null, null, null)
}

data class OrderShippedEvent(var id: Int?, var productId: Int?, var customerName: String?, var customerAddress: String?) : OrderEvent () {
    constructor() : this(null, null, null, null)
}

有了这个,OrderCreatedEvent 对象的生产者将生成如下消息:

key: 1 value: {"type":"orderCreated","id":1,"productId":24,"customerId":1}

现在轮到KStream了。我已将签名更改为,KStream<Int, OrderEvent>因为它可以接收 OrderCreatedEvent 或 OrderShippedEvent。在接下来的两行...

orderEvent.filter { _, value -> value is OrderCreatedEvent }
                .map { key, value -> KeyValue(key, value as OrderCreatedEvent) }

...我过滤以仅保留 OrderCreatedEvent 类的消息并将它们映射以将其KStream<Int, OrderEvent>转换为KStream<Int, OrderCreatedEvent>

完整的 KStream 逻辑:

@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<Int, Customer>, @Input("order") orderEvent: KStream<Int, OrderEvent>): KStream<Int, OrderShippedEvent> {

        val intSerde = Serdes.IntegerSerde()
        val customerSerde = JsonSerde<Customer>(Customer::class.java)
        val orderCreatedSerde = JsonSerde<OrderCreatedEvent>(OrderCreatedEvent::class.java)

        val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
                Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
                        .withKeySerde(intSerde)
                        .withValueSerde(customerSerde)

        val customerTable: KTable<Int, Customer> = input.groupByKey(Serialized.with(intSerde, customerSerde))
                .reduce({ _, y -> y }, stateStore)


        return (orderEvent.filter { _, value -> value is OrderCreatedEvent }
                .map { key, value -> KeyValue(key, value as OrderCreatedEvent) }
                .selectKey { _, value -> value.customerId } as KStream<Int, OrderCreatedEvent>)
                .join(customerTable, { orderIt, customer ->
                    OrderShippedEvent(orderIt.id, orderIt.productId, customer.name, customer.address)
                }, Joined.with(intSerde, orderCreatedSerde, customerSerde))
                .selectKey { _, value -> value.id }
                //.to("order", Produced.with(intSerde, orderShippedSerde))
    }

在此过程之后,我将在订单主题中生成一条新消息key: 1 value: {"type":"orderShipped","id":1,"productId":24,"customerName":"Anna","customerAddress":"Cipress Street"},但这将被流过滤掉。


推荐阅读