apache-kafka - Quarkus Kafka - 批量/批量消息消费者
问题描述
我想批处理。在我的用例中,发送 kafka 生产者消息是一一发送的。我想将它们作为消费者应用程序中的列表阅读。我可以在 Spring Kafka 图书馆做到这一点。Spring Kafka 批处理侦听器
有没有办法用quarkus-smallrye-reactive-messaging-kafka库做到这一点?
我尝试了下面的示例,但出现错误。
ERROR [io.sma.rea.mes.provider] (vert.x-eventloop-thread-3) SRMSG00200: The method org.MyConsumer#aggregate has thrown an exception: java.lang.ClassCastException: class org.TestConsumer cannot be cast to class io.smallrye.mutiny.Multi (org.TestConsumer is in unnamed module of loader io.quarkus.bootstrap.classloading.QuarkusClassLoader @6f2c0754; io.smallrye.mutiny.Multi is in unnamed module of loader io.quarkus.bootstrap.classloading.QuarkusClassLoader @4c1638b)
应用程序属性:
kafka.bootstrap.servers=hosts
mp.messaging.connector.smallrye-kafka.group.id=KafkaQuick
mp.messaging.connector.smallrye-kafka.auto.offset.reset=earliest
mp.messaging.incoming.test-consumer.connector=smallrye-kafka
mp.messaging.incoming.test-consumer.value.deserializer=org.TestConsumerDeserializer
TestConsumerDeserializer:
public class TestConsumerDeserializer extends JsonbDeserializer<TestConsumer>{
public TestConsumerDeserializer(){
// pass the class to the parent.
super(TestConsumer.class);
}
}
我的消费者:
@ApplicationScoped
public class MyConsumer {
@Incoming("test-consumer")
//@Outgoing("aggregated-channel")
public void aggregate(Multi<Message<TestConsumer>> in) {
System.out.println(in);
}
}
解决方案
我不明白问题中 ClassNotFoundException 的原因。但我找到了使用quarkus-smallrye-reactive-messaging-kafka
.
解决方案1:
@Incoming("test-consumer-topic")
@Outgoing("aggregated-channel")
public Multi<List<TestConsumer>> aggregate(Multi<TestConsumer> in) {
return in.groupItems().intoLists().every(Duration.ofSeconds(5));
}
@Incoming("aggregated-channel")
public void test(List<TestConsumer> test) {
System.out.println("size: "+ test.size());
}
解决方案2:
@Incoming("test-consumer-topic")
@Outgoing("events-persisted")
public Multi<Message<TestConsumer>> processPayloadStream(Multi<Message<TestConsumer>> messages) {
return messages
.groupItems().intoLists().of(4)
.emitOn(Infrastructure.getDefaultWorkerPool())
.flatMap(messages1 -> {
persist(messages1);
return Multi.createFrom().items(messages1.stream());
}).emitOn(Infrastructure.getDefaultExecutor());
}
public void persist(List<Message<TestConsumer>> messages){
System.out.println("messages size:"+ messages.size());
}
@Incoming("events-persisted")
public CompletionStage<Void> messageAcknowledging(Message<TestConsumer> message){
return message.ack();
}
注意:在问题中使用 application.properties 配置。
参考:
推荐阅读
- c - 把一个词变成***符号
- android - 如何在 Android 上使用 WebRTC 将相机预览渲染到 SurfaceTexture (openGL)
- javascript - VueJS 点击在移动设备上不起作用,未检测到点击监听器
- angular - 带管道的角度条件类
- javascript - 如果 URL 包含某个列表字符串,则显示 DIV
- javascript - chrome调试器打开时过滤简单数据表时出错
- sql - SQL 计算从当月的第一天到指定日期的差异
- bash - bash 中的勾股定理作为函数
- oauth - 其他人登录我的网站后,我如何在他们的 Google 帐户中远程创建活动?
- postgresql - Use functions or expressions in Postgres alias names - possible?