java - 检索无法反序列化的 Kafka 消息的有效负载和标头
问题描述
我正在使用 spring-kafka 2.1.7 来使用 JSON 消息,并且我想处理无法正确反序列化的消息。
为了覆盖循环相同消息的默认行为,我扩展了 JsonDeserializer 以覆盖反序列化方法。
public class CustomKafkaJsonDeserializer<T> extends JsonDeserializer<T> {
public CustomKafkaJsonDeserializer(Class<T> targetType) {
super(targetType);
this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true);
}
@Override
public T deserialize(String topic, byte[] data) {
try {
return super.deserialize(topic, data);
} catch (Exception e) {
log.error("Problem deserializing data " + new String(data) + " on topic " + topic, e.getMessage());
return null;
}
}
}
这是我的消费者及其配置:
@Service
public class Consumer {
@KafkaListener(topics = "${kafka.topic.out}", containerFactory = "kafkaListenerContainerFactory", errorHandler = "customKafkaListenerErrorHandler")
public void consume(@Payload Lines lines, @Headers MessageHeaders messageHeaders) {
//treatment
}
}
@Configuration
public class ConsumerConfig {
...
@Bean
public ConsumerFactory<String, Lines> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new CustomKafkaJsonDeserializer<>(Lines.class));
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Lines>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Lines> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put("bootstrap.servers", this.bootstrapServers);
props.put("group.id", this.appName);
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", CustomKafkaJsonDeserializer.class);
props.put("security.protocol", this.securityProtocol);
props.put("sasl.mechanism", this.saslMechanism);
props.put("sasl.jaas.config", this.saslJaasConfig);
return props;
}
}
最后,我实现了自己的错误处理程序,以便将错误数据发送到其他主题。
@Component
public class CustomKafkaListenerErrorHandler implements KafkaListenerErrorHandler {
@Autowired
private KafkaErrorService kafkaErrorService;
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception {
log.error("error handler for message: {} [{}], exception: {}", message.getPayload(), message.getHeaders(), exception.getMessage());
kafkaErrorService.sendErrorToKafka(message.getPayload().toString(), exception.getMessage());
throw new RuntimeException(exception);
}
}
当我使用错误消息时会发生这种情况:
- CustomKafkaJsonDeserializer 尝试反序列化消息并捕获异常。
- 可以在 catch 块中检索有效负载,但不能在标头中检索。返回 null 以推进偏移量。
- 它到达错误处理程序的 handleError 方法。message.getHeaders() 返回正确的标头,但 message.getPayload() 返回一个 KafkaNull 对象。因此,我无法在此步骤中同时发送有效负载和标头。
关于如何实现这一目标的任何建议?
解决方案
与其返回 null,不如返回一个包含数据和标头的丰富对象。
推荐阅读
- microsoft-graph-api - 更新邮件和 Outlook Web 缓存?
- php - 在不使用 URL 的情况下,在 PHP 中将值从一个页面传递到另一个页面
- java - 如何解决依赖MongoDB作为Maven
- php - Codeigniter foreach 循环错误
- crystal-reports - 水晶报表查看器显示空白报表
- mysql - 使用批处理脚本将表格导出到 csv 时在末尾添加额外的文本
- apache-spark - spark提交应用程序中的Scala ScriptEngine问题
- php - WordPress 元查询喜欢奇怪的字符串/哈希
- javascript - Javascript 承诺在 React-Enzyme 测试中不返回模拟值
- java - 超过 ConcurrentWebSocketSessionDecorator bufferSizeLimit 时如何刷新缓冲区?