apache-kafka - 如何在 Quarkus 中注入 KafkaTemplate
问题描述
我正在尝试注入 aKafkaTemplate
以发送一条消息。我正在开发一个位于反应式方法之外的小功能。
我只能找到使用@Ingoing
和@Outgoing
来自 Smallrye 的示例,但我不需要KafkaStream
.
我尝试使用 Kafka-CDI,但无法注入SimpleKafkaProducer
.
有任何想法吗?
对于克莱门特的回答
这似乎是正确的方向,但执行orders.send("hello");
我收到此错误:
(vert.x-eventloop-thread-3) Unhandled exception:java.lang.IllegalStateException: Stream not yet connected
我正在通过命令行从我的主题中消费,Kafka 已启动并正在运行,如果我手动生成,我可以看到消费的消息。
这似乎与doc的这句话有关:
要将 Emitter 用于流 hello,您需要在代码(或配置)中的某处使用 @Incoming("hello") 。
我的课上有这段代码:
@Incoming("orders")
public CompletionStage<Void> consume(KafkaMessage<String, String> msg) {
log.info("Received message (topic: {}, partition: {}) with key {}: {}", msg.getTopic(), msg.getPartition(), msg.getKey(), msg.getPayload());
return msg.ack();
}
也许我忘记了一些配置?
解决方案
所以,你只需要使用一个Emitter
:
@Inject
@Stream("orders") // Emit on the channel 'orders'
Emitter<String> orders;
// ...
orders.send("hello");
在你的application.properties
,声明:
## Orders topic (WRITE)
mp.messaging.outgoing.orders.type=io.smallrye.reactive.messaging.kafka.Kafka
mp.messaging.outgoing.orders.topic=orders
mp.messaging.outgoing.orders.bootstrap.servers=localhost:9092
mp.messaging.outgoing.orders.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.orders.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.orders.acks=1
为避免Stream not yet connected
异常,如 doc 所建议的:
要将 Emitter 用于流 hello,您需要在代码(或配置)中的某处使用 @Incoming("hello") 。
假设你的 application.properties 中有这样的东西:
# Orders topic (READ)
smallrye.messaging.source.orders-r-topic.type=io.smallrye.reactive.messaging.kafka.Kafka
smallrye.messaging.source.orders-r-topic.topic=orders
smallrye.messaging.source.orders-r-topic.bootstrap.servers=0.0.0.0:9092
smallrye.messaging.source.orders-r-topic.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
smallrye.messaging.source.orders-r-topic.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
smallrye.messaging.source.orders-r-topic.group.id=my-group-id
添加如下内容:
@Incoming("orders-r-topic")
public CompletionStage<Void> consume(KafkaMessage<String, String> msg) {
log.info("Received message (topic: {}, partition: {}) with key {}: {}", msg.getTopic(), msg.getPartition(), msg.getKey(), msg.getPayload());
return msg.ack();
}
推荐阅读
- java - 我应该如何创建 ArrayList 的新实例?
- ios - 在移动到数组中的下一个项目之前完成项目的功能。iOS
- javascript - 使用 localStorage 记住弹出框的关闭状态
- excel - 使用循环插入新列并遇到错误 1004
- mongodb - 用于密码和其他敏感字段的 Nifi 安全参数存储
- c# - 将多个 ObjectList 合并为一个
- java - 屏蔽 CXF 记录 SOAP 请求中的敏感数据
- python - Python - 导入类模块时出现名称错误
- javascript - 为什么我的复选框在点击后没有更新?
- reactjs - 访问令牌的 Facebook 登录问题