首页 > 解决方案 > 如何在kafka中加入两个不同的主题

问题描述

我有一个名为topic1的主题和一个名为topic2的主题。我有以下代码:

-topic1 的

KStream<Long, byte[]> events = builder.stream("topic1", Consumed.with(Serdes.Long(), Serdes.ByteArray()));

- 主题表2

KTable<Long, byte[]> table = builder.table("topic2",
        Consumed.with(Serdes.Long(), Serdes.ByteArray()));

当我与topic1的生产者一起制作时,一切都很好。topic1topic2的值不同,我想使用此流“事件”使用来自第一个主题生产者的生成记录,然后将此事件附加到秒主题表并保存同一键的第二个主题值的状态(key很长,第一个和第二个主题的数字相同)。换句话说,我想基于相同的键和更新表将流与表连接起来。

我的代码:

StoreBuilder<KeyValueStore<Long, byte[]>> store = Stores
        .keyValueStoreBuilder(Stores.persistentKeyValueStore(STORE_TOPIC2), Serdes.Long(), Serdes.ByteArray())
        .withLoggingEnabled(new HashMap<>());

    builder.addStateStore(store);

    events.join(table, KeyValue::new, Joined.with(Serdes.Long(), Serdes.ByteArray(), Serdes.ByteArray()))
        .transform(Update::new, STORE_TOPIC2)
        .to("topic2", Produced.with(Serdes.Long(), Serdes.ByteArray()));

在最后一行中,我为 topic2 生成了连接事件,但在该主题上没有生成任何内容,我的转换器“更新”如下所示:

private static class Update implements Transformer<Long, KeyValue<byte[], byte[]>, KeyValue<Long, byte[]>> {

    private KeyValueStore<Long, byte[]> store1;

    @Override
    @SuppressWarnings("unchecked")
    public void init(ProcessorContext context) {
        store1 = (KeyValueStore<Long, byte[]>) context.getStateStore(store);
    }

    @Override
    public KeyValue<Long, byte[]> transform(final Long key, final KeyValue<byte[], byte[]> updates) {

        System.out.println("Inside event transformer for key: " + key);
        System.out.println("Last produced graph: " + store.get(key));

        CustomClass c = null;
        try {
        c = deserializeModel(store.get(key));
        } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
        }

        if (c == null) {
        c = new CustomClass(...);
        }

        try {
        return KeyValue.pair(companyKey, serializeModel(companyNetwork));
        } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
        return null;
        }
    }

标签: javaapache-kafkaapache-kafka-streams

解决方案


推荐阅读