java - Spring-Kafka:反序列化 kafka 消息时出现问题 - 类不在“受信任的包”中?
问题描述
我得到以下异常,因为我从一个项目生产,而消费者从另一个项目消费。我怎样才能解决这个问题。显然,包装不一样。那么如何确保有正确的 json 序列化。
The class 'com.lte.assessment.assessments.AssessmentAttemptRequest' is not in the trusted packages: [java.util, java.lang, com.lte.assessmentanalytics.model
消费者配置
@EnableKafka
@Configuration
public class KafkaConfig {
static Map<String, Object> config = new HashMap();
static {
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
}
@Bean
public ConsumerFactory<String, AssessmentAttemptRequest> assessmentAttemptDetailsEntityConsumerFactory() {
JsonDeserializer<AssessmentAttemptRequest> deserializer = new JsonDeserializer<>();
deserializer.addTrustedPackages("com.lte.assessment.assessments");
return new DefaultKafkaConsumerFactory(config, new StringDeserializer(), deserializer);
}
}
生产者配置
@Configuration
public class KafkaConfiguration {
@Bean
public ProducerFactory producerConfig() {
Map<String, Object> config = new HashMap();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory(config);
}
@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate(producerConfig());
}
@Bean
public ConcurrentKafkaListenerContainerFactory aaKafkaListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String, AssessmentAttemptDetailsEntity> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(assessmentAttemptDetailsEntityConsumerFactory());
return factory;
}
}
解决方案
您可以通过如下更改将您的包列入白名单assessmentAttemptDetailsEntityConsumerFactory()
:
@Bean
public ConsumerFactory<String, AssessmentAttemptDetailsEntity> assessmentAttemptDetailsEntityConsumerFactory() {
JsonDeserializer<AssessmentAttemptDetailsEntity>
deserializer = new JsonDeserializer<>();
deserializer.addTrustedPackages("com.lte.assessment.assessments");//your package
return new DefaultKafkaConsumerFactory(config,deserializer);
}
推荐阅读
- c# - 在 ASP.NET Core 3 MVC Web 应用程序中设置路由
- docker - 是否可以将 docker 映像与包含源文件的文件夹分开?
- graphql - ApolloGraphQL 中的 useQuery 在第一次调用时会更新缓存,但是 fetchMore 呢?
- unity3d - 在 Unity 中从 nuget 导入 HDF5DotNet 包
- android - 具有自定义 Swipe 视图的 Android Recycler
- gitlab - Argocd 与 GITLAB 存储库的连接
- asp.net - 模态中的动态复选框
- reactjs - 使用 BottomTabNavigator 防止在状态更改后重新渲染
- c# - C# - 将对象序列化为常量变量或直接在 NUnit 的测试用例中使用
- arrays - 如何将 numpy 数组转换为 Zarr 数组