java - 将 Kafka Stream 字数示例转换为输出 JSON 对象
问题描述
我正在研究字数统计示例,目前有:
stream.flatMapValues(value -> Arrays.asList(WORD_PATTERN.split(value.getMessage().toLowerCase())))
.groupBy((key, value) -> value).count(Named.as("word_counts")).toStream()
.to(KafkaTopicConfig.xxx3_REPLY_TOPIC);
我从文档中了解到,这将产生 aKTable<String, Long>
并且回复主题中的内容将是:
the 3
cat 4
etc.
甚至不确定这是否是好的做法(如果不是,请告诉我哈哈)......但我正在尝试将其变成更“可用”的 json 格式:
[
{
"word": "the",
"count": "3"
},
{
"word": "cat",
"count": "4"
}
]
这是可行的吗?我得到流永远不会结束,所以它会继续以这种格式发送 jsons ......
我已经定义了一个类 WordCountPairDto 来保存 String、Long 对,我想我正在尝试将流序列化为 aArrayList<WordCountPairDto>
并让 Json serde 将其序列化为 Json。
任何指针将不胜感激!
解决方案
您的输出不会是 JSON 数组。这将是带有任何键的两条消息。
你可以像这样得到那个输出
stream.flatMapValues(value -> Arrays.asList(WORD_PATTERN.split(value.getMessage().toLowerCase())))
.groupBy((key, value) -> value).count(Named.as("word_counts")).toStream()
.map(this::createJSONMessage)
.to(KafkaTopicConfig.xxx3_REPLY_TOPIC);
在哪里定义public KeyValueMapper<String, String, KeyValue<String, String>> createJSONMessage(String key, String value)
方法以将JSON 对象字符串作为值返回
您还可以Produced.with
用来定义不同的 serdes。
推荐阅读
- sql-server - 用于提取 SQL Server 元数据的 PostgreSQL TDS_FDW 连接
- uitableview - 在 iOS 13 上构建时,UITableViewCell selectedBackgroundView 的颜色不可见
- haskell - 如何使用 Conduit 组合器实现类似 takeWhile 的功能?
- java - Android 应用程序崩溃 java.lang.RuntimeException
- macos - 在 Visual Studio for Mac (Xamarin) 中签署 Mac 应用程序
- java-11 - 将 sun.reflect 包与 openjdk11 一起使用
- kotlin - 使用镜头更改数据类的多个属性
- perl - 在 perl 的映射中使用函数名
- amazon-web-services - 使用“Alexa”一词时,Alexa 技能未打开
- pandoc - Pandoc tex to html:如何处理自定义环境?