首页 > 解决方案 > 关于序列化/反序列化的 Spring Boot Kafka 新手问题

问题描述

我是 Kafka 的新手(使用 Spring Boot 2.2.4),我看到 KafkaTemplate 的示例是字符串,字符串只是发送一个字符串。我正在研究发送 Json 对象,我在那里看到了 2 种不同的方法……有些人正在使用 String、Object,有些人正在使用 String、TheActualModelClass。

两者之间有优缺点吗?我有点假设主要差异是类型化模板仅适用于一个模型,而对象可以将任何类型发送到任何主题?除此之外还有什么?

标签: javaspringspring-bootapache-kafka

解决方案


虽然我可能会迟到回答它,但它可能对那些正在寻找解决方案的人有用。您可以在https://github.com/CODINGSAINT/kafka-stream-spring查看详细的解决方案

想想我们是否有一个自定义的 java bean

public class Quote {
private String content;
private Set<String> tags;
private String author;
}

您需要编写 Kafka Producer 以及 Consumer 配置

 /**
 * Configurations for KafkaStreams
 * @param kafkaProperties Will take defaults from application YAML or Properties file with spring.kafka
 * @return kafkaConfiguration
 */
@Bean(name= KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kafkaConfiguration(final KafkaProperties kafkaProperties){
    Map<String, Object> config = new HashMap<>();
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getClientId());
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, QuoteSerde.class.getName() );
    config.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
    return new KafkaStreamsConfiguration(config);
}

/**
 * The Stream which delegates each incoming topic to respective destination topic
 * @param kStreamsBuilder
 * @return
 */
@Bean
public KStream<String,Quote> kStream(StreamsBuilder kStreamsBuilder){
    KStream<String,Quote> stream=kStreamsBuilder.stream(inputTopic);
    for(String topic:allTopics){
        stream.filter((s, quote) -> quote.getTags().contains(topic)).to(topic);
    }
    return stream;

}

/**
 * Kafka ConsumerFactory configurations
 * @return
 */
@Bean
public ConsumerFactory<String, Quote> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            kafkaProperties.getBootstrapServers());
    props.put(
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
    props.put(
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            BytesDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

/**
 * Required Configuration for POJO to JSON
 * @return ConcurrentKafkaListenerContainerFactory
 */
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Quote>
kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, Quote> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setMessageConverter(new StringJsonMessageConverter());
    return factory;
}

然后我们需要一个序列化器

public class QuoteSerializer implements Serializer<Quote> {

    @Override
    public byte[] serialize(String s, Quote quote) {
        byte[] retVal = null;
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            retVal = objectMapper.writeValueAsString(quote).getBytes();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return retVal;
    }
}

和一个解串器

public class QuoteDeserializer implements Deserializer<Quote> {

    @Override
    public Quote deserialize(String s, byte[] bytes) {
        ObjectMapper mapper = new ObjectMapper();
        Quote quote = null;
        try {
            quote = mapper.readValue(bytes, Quote.class);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return quote;
    }
}

使用 Serializer 和 Deserializer 一个 Serde

public class QuoteSerde implements Serde<Quote> {
    public QuoteSerde() {
    }

    @Override
    public Serializer<Quote> serializer() {
        return new QuoteSerializer();
    }

    @Override
    public Deserializer<Quote> deserializer() {
        return new QuoteDeserializer();
    }
}

现在我们的听众可以听了

@Component
public class TopicConsumers {
    private static final Logger LOGGER = LoggerFactory.getLogger(TopicConsumers.class);

    @Value("#{'${kafka.topic.output}'.split(',')}")
    private List<String> allTopics;

    /**
     * For simplicity we are listening all topics at one listener
     */

    @KafkaListener(id = "allTopics", topics = "#{'${kafka.topic.output}'.split(',')}",
            containerFactory = "kafkaListenerContainerFactory")
    public void consume(@Payload Quote quote,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) String incomingTopic,
                        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
    ) {
        LOGGER.info("Incoming quote {}-> {}", incomingTopic, quote);
    }
}

下面是 application.yml 文件

spring:
  kafka:
    listener:
      missing-topics-fatal: false
    client-id : quotes-app
    bootstrap-server:
      - localhost:9091
      - localhost:9001
      - localhost:9092
    template:
      default-topic: quotes
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: com.codingsaint.learning.kafkastreamspring.QuoteSerializer
    consumer:
      properties:
        partition:
          assignment:
            strategy: org.apache.kafka.clients.consumer.RoundRobinAssignor
      group-id: random-consumer
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: com.codingsaint.learning.kafkastreamspring.QuoteDeserializer
---
kafka:
  topic:
    input: quotes
    output: business,education,faith,famous-quotes,friendship,future,happiness,inspirational,life,love,nature,politics,proverb,religion,science,success,technology

推荐阅读