java - 关于序列化/反序列化的 Spring Boot Kafka 新手问题
问题描述
我是 Kafka 的新手(使用 Spring Boot 2.2.4),我看到 KafkaTemplate 的示例是字符串,字符串只是发送一个字符串。我正在研究发送 Json 对象,我在那里看到了 2 种不同的方法……有些人正在使用 String、Object,有些人正在使用 String、TheActualModelClass。
两者之间有优缺点吗?我有点假设主要差异是类型化模板仅适用于一个模型,而对象可以将任何类型发送到任何主题?除此之外还有什么?
解决方案
虽然我可能会迟到回答它,但它可能对那些正在寻找解决方案的人有用。您可以在https://github.com/CODINGSAINT/kafka-stream-spring查看详细的解决方案
想想我们是否有一个自定义的 java bean
public class Quote {
private String content;
private Set<String> tags;
private String author;
}
您需要编写 Kafka Producer 以及 Consumer 配置
/**
* Configurations for KafkaStreams
* @param kafkaProperties Will take defaults from application YAML or Properties file with spring.kafka
* @return kafkaConfiguration
*/
@Bean(name= KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kafkaConfiguration(final KafkaProperties kafkaProperties){
Map<String, Object> config = new HashMap<>();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
config.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getClientId());
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, QuoteSerde.class.getName() );
config.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
return new KafkaStreamsConfiguration(config);
}
/**
* The Stream which delegates each incoming topic to respective destination topic
* @param kStreamsBuilder
* @return
*/
@Bean
public KStream<String,Quote> kStream(StreamsBuilder kStreamsBuilder){
KStream<String,Quote> stream=kStreamsBuilder.stream(inputTopic);
for(String topic:allTopics){
stream.filter((s, quote) -> quote.getTags().contains(topic)).to(topic);
}
return stream;
}
/**
* Kafka ConsumerFactory configurations
* @return
*/
@Bean
public ConsumerFactory<String, Quote> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaProperties.getBootstrapServers());
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
BytesDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
/**
* Required Configuration for POJO to JSON
* @return ConcurrentKafkaListenerContainerFactory
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Quote>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Quote> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
然后我们需要一个序列化器
public class QuoteSerializer implements Serializer<Quote> {
@Override
public byte[] serialize(String s, Quote quote) {
byte[] retVal = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
retVal = objectMapper.writeValueAsString(quote).getBytes();
} catch (Exception e) {
e.printStackTrace();
}
return retVal;
}
}
和一个解串器
public class QuoteDeserializer implements Deserializer<Quote> {
@Override
public Quote deserialize(String s, byte[] bytes) {
ObjectMapper mapper = new ObjectMapper();
Quote quote = null;
try {
quote = mapper.readValue(bytes, Quote.class);
} catch (Exception e) {
e.printStackTrace();
}
return quote;
}
}
使用 Serializer 和 Deserializer 一个 Serde
public class QuoteSerde implements Serde<Quote> {
public QuoteSerde() {
}
@Override
public Serializer<Quote> serializer() {
return new QuoteSerializer();
}
@Override
public Deserializer<Quote> deserializer() {
return new QuoteDeserializer();
}
}
现在我们的听众可以听了
@Component
public class TopicConsumers {
private static final Logger LOGGER = LoggerFactory.getLogger(TopicConsumers.class);
@Value("#{'${kafka.topic.output}'.split(',')}")
private List<String> allTopics;
/**
* For simplicity we are listening all topics at one listener
*/
@KafkaListener(id = "allTopics", topics = "#{'${kafka.topic.output}'.split(',')}",
containerFactory = "kafkaListenerContainerFactory")
public void consume(@Payload Quote quote,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String incomingTopic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
LOGGER.info("Incoming quote {}-> {}", incomingTopic, quote);
}
}
下面是 application.yml 文件
spring:
kafka:
listener:
missing-topics-fatal: false
client-id : quotes-app
bootstrap-server:
- localhost:9091
- localhost:9001
- localhost:9092
template:
default-topic: quotes
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.codingsaint.learning.kafkastreamspring.QuoteSerializer
consumer:
properties:
partition:
assignment:
strategy: org.apache.kafka.clients.consumer.RoundRobinAssignor
group-id: random-consumer
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: com.codingsaint.learning.kafkastreamspring.QuoteDeserializer
---
kafka:
topic:
input: quotes
output: business,education,faith,famous-quotes,friendship,future,happiness,inspirational,life,love,nature,politics,proverb,religion,science,success,technology
推荐阅读
- ios - `setUpWithError()` 使用的最早的 iOS 版本是什么?
- vba - Outlook 表单的 VBA
- node.js - 在 axios 调用期间承诺(不断)挂起
- android - Nativescript 7.0 空白 Hello World 应用程序大小 30MB
- laravel - 如何使用每个项目的 Homestead.yaml 文件在 Laravel homestead 中将 MYSQL 版本 8 更改为 5.7?
- javascript - 在画布上格式化文本
- python - 当我使用 .apply 时,Jupyter 内核死机
- swiftui - 我们如何在 SwiftUI 中将 ObservedObject 类型的值转换为 Binding?
- ruby-on-rails - 创建操作的自定义 POST 路由不起作用
- javascript - 如何在 View 中使变量成为全局变量?