kafka-consumer-api - 嵌入式 Kafka 未显示消费者偏移量
问题描述
有一个嵌入式 kafka 实例作为测试的一部分运行。我正在尝试验证是否已阅读所有消息,但从 kafka 管理客户端得到一个空结果。
Map<TopicPartition, OffsetAndMetadata> partitionOffset = embeddedKafkaRule.getEmbeddedKafka().doWithAdminFunction(admin -> {
try{
return admin.listConsumerGroupOffsets(COUNTER_GROUP).partitionsToOffsetAndMetadata().get();
}catch (Exception e){
throw new RuntimeException(e);
}
});
地图总是空的。我已经尝试设置 ack all 并设置 100ms autoOffsetCommit 等待看看这是否有任何区别,但没有运气。
System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers", embeddedKafkaRule.getEmbeddedKafka()
.getBrokersAsString());
System.setProperty("spring.cloud.stream.bindings.enricher-in-0.destination", COUNTER_TOPIC);
System.setProperty("spring.cloud.stream.bindings.enricher-in-0.group", COUNTER_GROUP);
System.setProperty("spring.cloud.stream.bindings.enricher-out-0.destination", ENRICHED_COUNTER_TOPIC);
System.setProperty("spring.cloud.stream.kafka.bindings.enricher-in-0.consumer.ackEachRecord", "true");
System.setProperty("spring.cloud.stream.kafka.bindings.enricher-in-0.autoCommitOffset", "true"); System.setProperty("spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms", "100");
解决方案
你什么时候设置这些系统属性?您确定绑定使用的是嵌入式代理吗?
这对我来说很好。
@SpringBootApplication
public class So65329718Application {
public static void main(String[] args) {
SpringApplication.run(So65329718Application.class, args);
}
@Bean
Consumer<String> consume() {
return System.out::println;
}
}
spring.cloud.stream.bindings.consume-in-0.group=theGroup
@SpringBootTest
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class So65329718ApplicationTests {
@Autowired
KafkaTemplate<byte[], byte[]> template;
@Autowired
EmbeddedKafkaBroker broker;
@Test
void test() throws InterruptedException {
this.template.send("consume-in-0", "foo".getBytes());
Thread.sleep(10_000);
this.broker.doWithAdmin(admin -> {
try {
System.out.println(admin.listConsumerGroupOffsets("theGroup").partitionsToOffsetAndMetadata()
.get(10, TimeUnit.SECONDS));
}
catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
});
}
}
foo
...
{consume-in-0-0=OffsetAndMetadata{offset=1, leaderEpoch=null, metadata=''}}
(spring.kafka.bootstrap-servers
如果没有特定于粘合剂的属性,则使用粘合剂)。
推荐阅读
- javascript - 点击时仅显示一篇 tumblr 帖子的标签(jquery)
- python - 如何比较两个 JSON 文档并返回更改的内容?
- laravel - 使用 Vue.js 在 Laravel 8 中使用 axios 显示加载的 API 数据的问题
- html - jQuery 需要什么权限/设置来运行一个非常简单的 $.get?
- android - 使用 ByteBuffer 中的像素数据连续更新 SurfaceView 时性能不佳
- regex - 删除多行有条件重复的字符串
- sql - 在 postgresql 中,如何防止基于来自不同表的列的表上的重复
- google-api - reddit 如何在不使用 cookie 的情况下使 google auth 工作
- excel - VBA Excel - 对象不支持此属性或方法
- php - 如何运行不仅可以在我的电脑上也可以在 LAN 上运行的 Laravel 服务?