java - 在 ConsumerAwareRebalanceListener 上测试异常
问题描述
在我之前的问题How to test a ConsumerAwareRebalanceListener? ConsumerAwarereBalanceListener
,我能够使用 Spring Kafka 2.1.x设置一个。
但是,下面的代码
Map<TopicPartition, Long> queries = new HashMap<>();
final Long rewindTime = getRewindTime();
for (TopicPartition partition : partitions) {
queries.put(partition, rewindTime);
}
Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(queries);
result.forEach((key, value) -> consumer.seek(key, value.offset()));
NullPointerException
如果对主题的查询结果为空,则抛出 a 。在这种情况下,value
是null
。堆栈跟踪如下。
java.lang.NullPointerException: null
at org.rcardin.MyConsumerAwareRebalancedListener.lambda$onPartitionsAssigned$0(MyConsumerAwareRebalancedListener.java:40) ~[classes/:na]
at java.util.HashMap.forEach(HashMap.java:1288) ~[na:1.8.0_144]
at org.rcardin.MyConsumerAwareRebalancedListener.onPartitionsAssigned(MyConsumerAwareRebalancedListener.java:40) ~[classes/:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$1.onPartitionsAssigned(KafkaMessageListenerContainer.java:596) ~[spring-kafka-2.1.10.RELEASE.jar:2.1.10.RELEASE]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264) [kafka-clients-1.0.2.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) [kafka-clients-1.0.2.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) [kafka-clients-1.0.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) [kafka-clients-1.0.2.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) [kafka-clients-1.0.2.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111) [kafka-clients-1.0.2.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:700) [spring-kafka-2.1.10.RELEASE.jar:2.1.10.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_144]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_144]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
我正在尝试编写一个测试,其目的是验证这种行为。但是,由于异常发生在与主线程不同的线程上,因此我无法检索此类异常。
有什么建议吗?
非常感谢,
解决方案
根据offsetsForTimes()
Javadocs,这看起来像您的代码中的错误:
* @return a mapping from partition to the timestamp and offset of the first message with timestamp greater
* than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
* such message.
所以,你绝对应该value.offset()
用空检查来保护你:
result.entrySet()
.stream()
.filter(e -> e.getValue() != null)
.forEach(e -> consumer.seek(e.getKey(), e.getValue().offset()));
推荐阅读
- momentjs - 如何在没有警告的情况下将日期“2019 年 4 月 2 日”格式化为“YYYY-MM-DD”?
- input - 如何验证材料输入中的输入文本长度?
- angular - Ionic 4:让表单随每个输入而自我更新
- archlinux - pgadmin4 显示 500(内部服务器错误)
- python - Python while 循环正在暂停
- python - 使用带有 Python 的数据库跟踪在线用户
- python - 无法安装聊天机器人库
- node.js - 如何捆绑 node js 和 express js 用于生产?
- html - 订购弹性物品
- numpy - 从二进制文件创建 2D numpy 数组?