首页 > 解决方案 > 将 Kafka Stream 字数示例转换为输出 JSON 对象

问题描述

我正在研究字数统计示例,目前有:

    stream.flatMapValues(value -> Arrays.asList(WORD_PATTERN.split(value.getMessage().toLowerCase())))
          .groupBy((key, value) -> value).count(Named.as("word_counts")).toStream()
          .to(KafkaTopicConfig.xxx3_REPLY_TOPIC);

我从文档中了解到,这将产生 aKTable<String, Long>并且回复主题中的内容将是:

the 3
cat 4
etc.

甚至不确定这是否是好的做法(如果不是,请告诉我哈哈)......但我正在尝试将其变成更“可用”的 json 格式:

  [
    { 
      "word": "the", 
      "count": "3" 
    },
    { 
      "word": "cat", 
      "count": "4" 
    }
  ]

这是可行的吗?我得到流永远不会结束,所以它会继续以这种格式发送 jsons ......

我已经定义了一个类 WordCountPairDto 来保存 String、Long 对,我想我正在尝试将流序列化为 aArrayList<WordCountPairDto>并让 Json serde 将其序列化为 Json。

任何指针将不胜感激!

标签: javaapache-kafkaapache-kafka-streams

解决方案


您的输出不会是 JSON 数组。这将是带有任何键的两条消息。

你可以像这样得到那个输出

stream.flatMapValues(value -> Arrays.asList(WORD_PATTERN.split(value.getMessage().toLowerCase())))
      .groupBy((key, value) -> value).count(Named.as("word_counts")).toStream()
      .map(this::createJSONMessage)
      .to(KafkaTopicConfig.xxx3_REPLY_TOPIC);

在哪里定义public KeyValueMapper<String, String, KeyValue<String, String>> createJSONMessage(String key, String value)方法以JSON 对象字符串作为值返回

您还可以Produced.with用来定义不同的 serdes。


推荐阅读