首页 > 解决方案 > 如何更改@KafkaStreamsStateStore kafka Stream Cloud 的默认 serdes

问题描述

如何更改@KafkaStreamsStateStore 的默认 serdes?我知道在 Kafka 流云中的新版本 3.0.1 中,这里解释了这种方式:https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.1。发布/参考/html/spring-cloud-stream-binder-kafka.html#_state_store。但是由于我使用的是 2.1.12,请您帮忙提供一些代码示例。我搜索了很多地方都没有找到。

@KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE, keySerde = "?????", valueSerde = "?????") 这也没有帮助。

https://www.bountysource.com/issues/87943127-consider-sharing-the-default-serdes-of-kafkastreamsstatestore

我试过:

@KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE, keySerde = "VALUE_SERDE", valueSerde = "VALUE_SERDE")

public class CustomSerde {
    static public final class CustomSerdes extends WrapperSerde<Entity> {
        public CustomSerdes () {
            super(new JsonPOJOSerializer<Entity>(), new JsonPOJODeserializer<Entity>());
        }
        
    }
}

    public static final String VALUE_SERDE = "CustomSerde$CustomSerdes";

    public class JsonPOJODeserializer<T> implements Deserializer<T> {
        private ObjectMapper objectMapper = new ObjectMapper();
    
        private Class<T> tClass;
    
        /**
         * Default constructor needed by Kafka
         */
        public JsonPOJODeserializer() {
        }
    
        @SuppressWarnings("unchecked")
        @Override
        public void configure(Map<String, ?> props, boolean isKey) {
            tClass = (Class<T>) props.get("JsonPOJOClass");
        }
    
        @Override
        public T deserialize(String topic, byte[] bytes) {
            if (bytes == null)
                return null;
    
            T data;
            try {
                data = objectMapper.readValue(bytes, tClass);
            } catch (Exception e) {
                throw new SerializationException(e);
            }
    
            return data;
        }
    
        @Override
        public void close() {
    
        }
    }

    public class JsonPOJOSerializer<T> implements Serializer<T> {
        private final ObjectMapper objectMapper = new ObjectMapper();
    
        private Class<T> tClass;
    
        /**
         * Default constructor needed by Kafka
         */
        public JsonPOJOSerializer() {
    
        }
        @SuppressWarnings("unchecked")
        @Override
        public void configure(Map<String, ?> props, boolean isKey) {
            tClass = (Class<T>) props.get("JsonPOJOClass");
        }
    
        @Override
        public byte[] serialize(String topic, T data) {
            if (data == null)
                return null;
    
            try {
                return objectMapper.writeValueAsBytes(data);
            } catch (Exception e) {
                throw new SerializationException("Error serializing JSON 
               message", e);
            }
        }
    
        @Override
        public void close() {
        }
    
    }

但不工作。请指教。

标签: apache-kafkaapache-kafka-streamsspring-cloud-stream

解决方案


在上面的代码片段中,你有这个:

@KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE, keySerde = "VALUE_SERDE", valueSerde = "VALUE_SERDE")

究竟是VALUE_SERDE什么?是实现Serde接口吗?只要它实现了正确的Serde接口,那应该可以工作。活页夹将此值转发到StoreBuilder内部。你有任何错误吗?如果您仍然遇到问题,请与我们分享一个小示例应用程序,我们可以进一步研究它。


推荐阅读