首页 > 解决方案 > 如何根据来自另一个主题的响应从 Kafka 主题中读取消息

问题描述

我在 Kafka 中有 2 个主题:MetaData 和 MasterData。我正在使用 Kafka Listners 实时读取数据。有2种情况:

  1. 我必须先阅读 MetaData 主题,然后再阅读 MasterData 主题。
  2. 也有可能 MetaData 主题中没有新的消息,但 MasterData 主题中插入了一条消息。在这种情况下,MasterData 主题的消费应该继续进行。

蚂蚁建议如何实现这一目标?

标签: javaspring-bootapache-kafkakafka-consumer-apispring-kafka

解决方案


在评论中澄清之后,这是我所理解的,你想要实现的目标:

你有一个类似数据库的系统,来自 kafka 主题的元数据用于调整表结构。然后实际数据进入您想要插入到表中的主 kafka 主题。换句话说:您想使用 kafka 定义数据的结构。

我不确定,如果这就是我所说的 Kafka 的常规用例,因为它旨在成为交换事件(即数据)的接口。您尝试做的是使用系统作为定义数据结构的手段。但这只是我的意见。

正如前面在评论中所说,实际上没有办法说在一个主题中不会有消息。你只能说:还没有消息“还”。你“可以”做的大概是:当master主题中的消息到达时,您首先检查meta主题,是否也有消息。如果有,则应用数据结构的更改,然后从主主题导入消息。

另一种选择是使用 Kafka Streams 而不是原始消费者来将两个主题结合在一起,join例如。

无论如何,要定义数据结构,通常会使用 Avro 之类的东西,它可以为您提供模式演变等等。然后编写您的应用程序以了解架构更改并相应地将它们应用于您的数据库表。

更新:如何使用 Kafka Streams 解决

如上所述,使用 Kafka Streams 可能是您的情况的一种解决方案。让我解释一下,我在一些伪 kafka 流代码中的意思是:

import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.GlobalKTable;

import java.nio.charset.StandardCharsets;

/**
 * Just an incomplete Kafka Streams Code Demo to show how the problem could
 * be solved with this framework instead of using the consumers directly.
 * Kafka Streams Boilerplate code not included.
 */
public class Example {
    private static final String META_TOPIC = "meta";
    private static final String MASTER_TOPIC = "master";

    // You have to make sure, the meta data is stored under this
    // specific key in the meta topic.
    private static final byte[] META_KEY = MetaEvent.class.getName().getBytes(StandardCharsets.UTF_8);

    public static void main(String[] args) {
        new Example().createTopology();
    }

    public void createTopology() {

        final StreamsBuilder builder = new StreamsBuilder();

        final GlobalKTable<byte[], MetaEvent> metaTable = builder.globalTable(META_TOPIC);

        builder.<byte[], MasterEvent>stream(MASTER_TOPIC)
                .leftJoin(
                        metaTable,
                        (k, v) -> META_KEY,
                        MasterWithMeta::new)
                .foreach(this::handleEvent);
    }

    private void handleEvent(byte[] key, MasterWithMeta masterWithMeta) {
        // 1) check if meta has changed, if so, apply changes to database
        // 2) import master data to database
    }
}

class MasterWithMeta {
    private final MasterEvent master;
    private final MetaEvent meta;

    public MasterEvent getMaster() {
        return master;
    }

    public MetaEvent getMeta() {
        return meta;
    }

    public MasterWithMeta(MasterEvent master, MetaEvent meta) {
        this.master = master;
        this.meta = meta;
    }

    public static MasterWithMeta create(MasterEvent master, MetaEvent meta) {
        return new MasterWithMeta(master, meta);
    }
}

class MetaEvent {
    // ...
}

class MasterEvent {
    // ...
}


推荐阅读