apache-kafka - 如何更改@KafkaStreamsStateStore kafka Stream Cloud 的默认 serdes
问题描述
如何更改@KafkaStreamsStateStore 的默认 serdes?我知道在 Kafka 流云中的新版本 3.0.1 中,这里解释了这种方式:https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.1。发布/参考/html/spring-cloud-stream-binder-kafka.html#_state_store。但是由于我使用的是 2.1.12,请您帮忙提供一些代码示例。我搜索了很多地方都没有找到。
@KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE, keySerde = "?????", valueSerde = "?????") 这也没有帮助。
我试过:
@KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE, keySerde = "VALUE_SERDE", valueSerde = "VALUE_SERDE")
public class CustomSerde {
static public final class CustomSerdes extends WrapperSerde<Entity> {
public CustomSerdes () {
super(new JsonPOJOSerializer<Entity>(), new JsonPOJODeserializer<Entity>());
}
}
}
public static final String VALUE_SERDE = "CustomSerde$CustomSerdes";
public class JsonPOJODeserializer<T> implements Deserializer<T> {
private ObjectMapper objectMapper = new ObjectMapper();
private Class<T> tClass;
/**
* Default constructor needed by Kafka
*/
public JsonPOJODeserializer() {
}
@SuppressWarnings("unchecked")
@Override
public void configure(Map<String, ?> props, boolean isKey) {
tClass = (Class<T>) props.get("JsonPOJOClass");
}
@Override
public T deserialize(String topic, byte[] bytes) {
if (bytes == null)
return null;
T data;
try {
data = objectMapper.readValue(bytes, tClass);
} catch (Exception e) {
throw new SerializationException(e);
}
return data;
}
@Override
public void close() {
}
}
public class JsonPOJOSerializer<T> implements Serializer<T> {
private final ObjectMapper objectMapper = new ObjectMapper();
private Class<T> tClass;
/**
* Default constructor needed by Kafka
*/
public JsonPOJOSerializer() {
}
@SuppressWarnings("unchecked")
@Override
public void configure(Map<String, ?> props, boolean isKey) {
tClass = (Class<T>) props.get("JsonPOJOClass");
}
@Override
public byte[] serialize(String topic, T data) {
if (data == null)
return null;
try {
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error serializing JSON
message", e);
}
}
@Override
public void close() {
}
}
但不工作。请指教。
解决方案
在上面的代码片段中,你有这个:
@KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE, keySerde = "VALUE_SERDE", valueSerde = "VALUE_SERDE")
究竟是VALUE_SERDE
什么?是实现Serde
接口吗?只要它实现了正确的Serde
接口,那应该可以工作。活页夹将此值转发到StoreBuilder
内部。你有任何错误吗?如果您仍然遇到问题,请与我们分享一个小示例应用程序,我们可以进一步研究它。
推荐阅读
- html - 带阴影的球体
- mysql - 在MySQL中,你可以在表中的一行中插入多个值吗
- php - 目标类 [DatabaseSeeder] 不存在
- regex - 带有“3”参数的powershell“ParseExact”:“字符串未被识别为有效的日期时间
- python - 如何在Django Rest Framework中序列化三个具有多对多关系的模型
- javascript - 续集:TypeError:尝试在 DB 上创建数据时无法读取未定义的属性“长度”
- python-3.x - Numpy 的 filterwarning 设置为忽略不会关闭警告,Python 的警告过滤器仍然会捕获它们
- python - 遵循 StackAbuse 指南,但它不起作用
- php - wordpress 允许管理员使用自定义字段编辑用户配置文件
- gstreamer - 如何构建 gstreamer 管道以将图像文件发送到 V4L2 接收器?