apache-kafka - 我可以使用 Kafka 流读取和写入不同类型的消息吗?
问题描述
我正在编写一个使用 Kafka 流的应用程序。它从主题 A 读取,进行一些转换,然后写入主题 B。在转换期间,值按键分组,因此输出键、值类型与输入值类型不同。Kafka流使用特定类型的Serdes(例如String serdes对字符串进行序列化和反序列化)进行序列化和反序列化,因此在数据转换后它将不起作用。如何在 Streams API 中定义不同的序列化器和反序列化器?
解决方案
你当然可以
当您创建流、调用 groupBy 或将输出写入某个主题时,您可以提供Serde
或Serialized
. 例子:
Serde<String> stringSerde = Serdes.String();
Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
Produced<String, YourCustomItem> produced = Produced.with(stringSerde, new JsonSerde<>(YourCustomItem.class));
KStream<String, String> kStream = streamsBuilder.stream("sourceTopicName", consumed);
KStream<String, YourCustomItem> transformedKStream = kStream.mapValues((key, value) -> new YourCustomItem());
transformedKStream.to("destinationTopicName", produced);
transformedKStream.groupByKey(Serialized.with(Serdes.String(), new JsonSerde<>(YourCustomItem.class)));
从哪里来JsonSerde
的spring-kafka
依赖。或者您可以使用以下内容Serde
:
Serializer<JsonNode> jsonSerializer = new JsonSerializer();
Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
推荐阅读
- regex - 如何在scala中匹配包含加号(+)的字符串?
- arrays - 菜鸟需要帮助循环数组
- visual-studio-code - 禁用 Visual Studio Code 中特定文件的语法高亮显示
- postgresql - postgres 用子句创建表
- php - 有一个从 SQL Server 中提取的 PHP 表来呈现表数据。我想将列标题过滤器添加到列以缩小范围
- itext7 - 有没有办法删除 iText 7 中的所有 XMP 元数据?
- github - 为现有项目创建 GitHub 页面
- kubernetes - 无法访问 Kubernetes 服务
- javascript - Best way to keep certain n fresh values in a array
- kotlin - 带有两个变量 kotlin 的 when 语句