java - Kafka SpringBoot StreamListener - 如何按顺序消费多个主题?
问题描述
我有多个使用不同主题的 StreamListener 注释方法。但是其中一些主题需要从“最早”偏移量中读取以填充内存映射(类似于状态机),然后从其他主题中使用,这些主题中可能包含应针对“最新”执行的命令状态机。
当前代码如下所示:
@Component
@AllArgsConstructor
@EnableBinding({InputChannel.class, OutputChannel.class})
@Slf4j
public class KafkaListener {
@StreamListener(target = InputChannel.EVENTS)
public void event(Event event) {
// do something with the event
}
@StreamListener(target = InputChannel.COMMANDS)
public void command(Command command) {
// do something with the command only after all events have been processed
}
}
我试图添加一些可怕的代码,从传入的事件消息中获取 kafka 主题偏移元数据,然后使用信号量来阻止命令,直到事件达到总偏移量的某个百分比。它有点工作,但让我难过,一旦我们有 20 个左右的主题都相互依赖,维护起来会很糟糕。
SpringBoot / Spring Streams 是否有任何内置机制来执行此操作,或者是否有一些我不知道的人们使用的常见模式?
TL;DR:我如何在使用来自主题 B 的任何消息之前处理来自主题 A 的所有消息,而不做一些肮脏的事情,比如在主题 B 的消费者中粘贴 Thread.sleep(60000)?
解决方案
查看kafka 消费者绑定属性 resetOffsets
重置偏移量
是否将消费者的偏移量重置为
startOffset
. 如果提供了 a,则必须为 falseKafkaRebalanceListener
;请参阅使用 KafkaRebalanceListener。默认值:假。
开始偏移
新组的起始偏移量。允许值:
earliest
和latest
。如果为消费者“绑定”明确设置了消费者组(通过 spring.cloud.stream.bindings..group),则“startOffset”设置为最早。否则,匿名消费者组将其设置为最新。另请参阅resetOffsets
(此列表的前面部分)。默认值:null(相当于最早)。
您还可以添加一个KafkaBindingRebalanceListener并对消费者执行搜索。
编辑
您还可以autoStartup
在第二个侦听器上设置为 false,并在准备好时开始绑定。这是一个例子:
@SpringBootApplication
@EnableBinding(Sink.class)
public class Gitter55Application {
public static void main(String[] args) {
SpringApplication.run(Gitter55Application.class, args);
}
@Bean
public ConsumerEndpointCustomizer<KafkaMessageDrivenChannelAdapter<?, ?>> customizer() {
return (endpoint, dest, group) -> {
endpoint.setOnPartitionsAssignedSeekCallback((assignments, callback) -> {
assignments.keySet().forEach(tp -> callback.seekToBeginning(tp.topic(), tp.partition()));
});
};
}
@StreamListener(Sink.INPUT)
public void listen(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] key) {
System.out.println(new String(key) + ":" + value);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template,
BindingsEndpoint bindings) {
return args -> {
while (true) {
template.send("gitter55", "foo".getBytes(), "bar".getBytes());
System.out.println("Hit enter to start");
System.in.read();
bindings.changeState("input", State.STARTED);
}
};
}
}
spring.cloud.stream.bindings.input.group=gitter55
spring.cloud.stream.bindings.input.destination=gitter55
spring.cloud.stream.bindings.input.content-type=text/plain
spring.cloud.stream.bindings.input.consumer.auto-startup=false
推荐阅读
- javascript - 在 Web Audio 中,为什么 contextTime 比 baseLatency 滞后 currentTime?
- oop - 实际类型随叫随到
- python - `setCalculateVarImportance` 更改导致 `cv2.ml.RTrees` 模型
- javascript - 我想在其子项处于活动状态时使父导航链接侧边栏处于活动状态(当前页面)
- sql - 使用联合联接查询需要 Oracle 联合澄清
- ng-zorro-antd - 有没有办法在反应形式中使用 nz-checkbox 或 nz-checkbox-group 或 nz-checkbox-wrapper
- php - 获取有关在 PHP LDAP 服务器上执行的任何更新的通知
- angular - 使用 Flex-Layout 的选定扩展面板的最大高度
- c - 这是对 realloc() 的正确使用吗?
- xcode - 有没有办法让 AppleScript 中的按钮和窗口透明?