spring - Kafka Spring Deserialzer returnType 静态方法从未被调用
问题描述
这是我得到的错误:
org.apache.kafka.common.errors.SerializationException: Error deserializing
key/value for partition distance-0 at offset 0. If needed, please seek past
the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers
and no default type provided
at org.springframework.util.Assert.state(Assert.java:73)
at org.
springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:370)
和应用程序-dev.yml:
spring:
json:
use:
type:
headers: false
value:
default:
type: Object
method: com.mycompany.mypackage.KafkaConfig.returnType
还尝试过:
consumer:
bootstrap-servers: 10.10.5.189:9092
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
enable-auto-commit: false
auto-offset-reset: earliest
properties:
spring:
json:
trusted:
packages: '*'
value:
default:
method: com.mycompany.mypackage.KafkaConfig.returnTypee
没有警告,但我将 printlin 放在静态方法中并且它从未触发过?是什么赋予了?
@KafkaListener(topics = "distance", groupId = "${kafka.myinfo.id}")
public void handle(CustDeletedEvent custDeletedEvent) {
log.debug("received jsonNode: "+ userDeletedEvent);
KafkaConfig.java
// NEVER CALLED!!!
public static JavaType returnType(byte[] data, Headers headers) {
System.out.println("return type called data.length="+data.length);
JavaType custDeletedEvent =
TypeFactory.defaultInstance().constructType(CustDeletedEvent.class);
return custDeletedEvent;
}
最新配置:
kafka:
bootstrap-servers: 10.10.5.189:9092
producer:
bootstrap-servers: 10.10.5.189:9092
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
bootstrap-servers: 10.10.5.189:9092
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
enable-auto-commit: false
auto-offset-reset: earliest
properties:
spring:
json:
trusted:
packages: '*'
value:
method: mypackage.config.KafkaConfig.returnType
此处配置的 Spring Kafka
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
这是确切的yaml:
spring:
profiles:
active: dev
kafka:
bootstrap-servers: 10.10.5.189:9092
producer:
bootstrap-servers: 10.10.5.189:9092
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
bootstrap-servers: 10.10.5.189:9092
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
enable-auto-commit: false
auto-offset-reset: earliest
properties:
spring:
json:
trusted:
packages: '*'
value:
method: com.service.cust.impl.returnType
解决方案
它需要进去spring.kafka.consumer.properties.spring.json...
。
推荐阅读
- python - 为什么两个 __dict__s 之间的比较是假的?
- wordpress - (未知) Uncaught ReferenceError: u is not defined
- yum - 使用 Centos 6 时,我无法再运行 'yum update',我收到此错误 'Cannot find a valid baseurl for repo: base'
- sql - 重新排序 SQL Server 表
- mariadb - Wildfly 22.0.1 无法使用 MariaDB 数据源部署 EAR
- c# - 将静态列表复制到本地临时列表
- python-3.x - pip 升级例外
- node.js - nodejs在登录时向所有页面显示用户名
- typescript - 访问器仅在针对 ECMAScript 5 及更高版本时可用 - 使用 TypeScript 时出现错误消息
- java - 识别冗余括号