首页 > 解决方案 > 合并来自单个流的 Kafka 记录值

问题描述

我有一个主题,它接收可能包含部分数据的 JSON 记录。我想合并这些数据,所以我尝试在最终数据记录中收集尽可能多的信息。

 t1: { id: '1234', attribute1: 'foo' }
 t2: { id: '1234', attribute2: 'bar' }

合并记录值后所需的流:

 t1: { id: '1234', attribute1: 'foo' }
 t2: { id: '1234', attribute1: 'bar', attribute2: 'bar' }

为了实现这一点,我尝试了:

 //key of the topic is id
 KStream<String, MyObject> input = ...
 return input.groupByKey().reduce((current, newEvent) -> return newEvent.merge(current)).toStream();

但这只会产生一个条目,因为 groupy/reduce 会产生一个 KTable。有没有可能实现这一目标?


编辑:流定义是正确的,默认情况下reduce似乎不会向下游发送所有消息,而是在这样做之前缓存它们。要禁用此行为,配置属性:

 cache.max.bytes.buffering: 0

必须设置。

标签: javaapache-kafka-streamsspring-kafka

解决方案


尝试这个

public class MergeStreams {

    public Topology buildTopology(Properties allProps) {
        final StreamsBuilder builder = new StreamsBuilder();

        final String rockTopic = allProps.getProperty("input.rock.topic.name");
        final String classicalTopic = allProps.getProperty("input.classical.topic.name");
        final String allGenresTopic = allProps.getProperty("output.topic.name");

        KStream<String, SongEvent> rockSongs = builder.stream(rockTopic);
        KStream<String, SongEvent> classicalSongs = builder.stream(classicalTopic);
        KStream<String, SongEvent> allSongs = rockSongs.merge(classicalSongs);

        allSongs.to(allGenresTopic);
        return builder.build();
    }

    public void createTopics(Properties allProps) {
        AdminClient client = AdminClient.create(allProps);

        List<NewTopic> topics = new ArrayList<>();

        topics.add(new NewTopic(
                allProps.getProperty("input.rock.topic.name"),
                Integer.parseInt(allProps.getProperty("input.rock.topic.partitions")),
                Short.parseShort(allProps.getProperty("input.rock.topic.replication.factor"))));

        topics.add(new NewTopic(
                allProps.getProperty("input.classical.topic.name"),
                Integer.parseInt(allProps.getProperty("input.classical.topic.partitions")),
                Short.parseShort(allProps.getProperty("input.classical.topic.replication.factor"))));

        topics.add(new NewTopic(
                allProps.getProperty("output.topic.name"),
                Integer.parseInt(allProps.getProperty("output.topic.partitions")),
                Short.parseShort(allProps.getProperty("output.topic.replication.factor"))));

        client.createTopics(topics);
        client.close();
    }

    public Properties loadEnvProperties(String fileName) throws IOException {
        Properties allProps = new Properties();
        FileInputStream input = new FileInputStream(fileName);
        allProps.load(input);
        input.close();

        return allProps;
    }

    public static void main(String[] args) throws Exception {
        if (args.length < 1) {
            throw new IllegalArgumentException("This program takes one argument: the path to an environment configuration file.");
        }

        MergeStreams ms = new MergeStreams();
        Properties allProps = ms.loadEnvProperties(args[0]);
        allProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        allProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
        allProps.put(SCHEMA_REGISTRY_URL_CONFIG, allProps.getProperty("schema.registry.url"));
        Topology topology = ms.buildTopology(allProps);

        ms.createTopics(allProps);

        final KafkaStreams streams = new KafkaStreams(topology, allProps);
        final CountDownLatch latch = new CountDownLatch(1);

        // Attach shutdown handler to catch Control-C.
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close(Duration.ofSeconds(5));
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

需要更多详细信息,请点击这里


推荐阅读