callback - Kafka Producer:如何使用 Reactive 和 Non-Blocking Producer 编写 onSuccess/onError 回调代码
问题描述
我正在尝试遵循Micronaut Kafka 指南。它显示了这段代码:
Single<Book> sendBook(
@KafkaKey String author,
Single<Book> book
);
我尝试以这种方式实施但没有成功
制片人
package com.tolearn.producer
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.messaging.annotation.Header
import io.reactivex.Single
@KafkaClient(
id = "demo-producer",
acks = KafkaClient.Acknowledge.ALL)
@Header(name = "X-Token", value = "\${my.application.token}")
public interface DemoProducer {
//Reactive and Non-Blocking
@Topic("demotopic")
fun sendDemoMsgReactive(
@KafkaKey key: String?,
msg: Single<String>): Single<String?>?
}
并从服务层调用它
package com.tolearn.service
import com.tolearn.producer.DemoProducer
import io.reactivex.Single
import io.reactivex.SingleOnSubscribe
import javax.inject.Inject
import javax.inject.Named
import javax.inject.Singleton
@Singleton
class DemoService {
@Inject
@Named("dp")
lateinit var dp : DemoProducer
fun postDemo(key: String, msg: String){
//Reactive and No-blocking
val singleReturned:Single<String> = dp.sendDemoMsgReactive(key, SingleOnSubscribe<String> msg ).subscribe()
singleReturned.doOnSuccess{
print("ok")
}
singleReturned.doOnError ((e)->print(e))
}
}
基本上,我想要的是使用 io.reactivex.Single 向 kafka“无阻塞”样式发布消息。我知道我必须订阅然后编写两个回调:onSuccess 和 onError。当然,我缺少一些关于 ReactiveX 的基本概念。请注意,任何线索将不胜感激。
解决方案
推荐阅读
- ruby-on-rails - Rails - 在 Nginx 上添加 SSL 证书后乘客不工作
- assembly - 如何简要解释 ADDM 指令后的时钟周期(Assembly - MIPS Architectue)?
- mongodb - 在一个集合中,有多个名称可能具有不同的值,我们如何找到具有差异的那些名称。MongoDB中的价值
- python - 给定数字的所有可能的子数字
- python - 如何获取 json 对象中的键值(Python)
- java - BOM 覆盖顺序(具有重叠 BOM)
- android - 如何导出 android 库以使其在另一台计算机上可用?(出现“403 禁止访问错误”)
- google-apps-script - 我在尝试运行代码时收到拒绝访问:Google 脚本上的 DriveApp 错误
- python - 无法使用 TCP 远程连接到 MariaDB 服务器
- java - Kotlin 不适用于 vs 代码我已经尝试了所有方法但没有任何效果