首页 > 解决方案 > Kafka Producer:如何使用 Reactive 和 Non-Blocking Producer 编写 onSuccess/onError 回调代码

问题描述

我正在尝试遵循Micronaut Kafka 指南。它显示了这段代码:

Single<Book> sendBook(
    @KafkaKey String author,
    Single<Book> book
);

我尝试以这种方式实施但没有成功

制片人

package com.tolearn.producer

import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.messaging.annotation.Header
import io.reactivex.Single

@KafkaClient(
        id = "demo-producer",
        acks = KafkaClient.Acknowledge.ALL)
        @Header(name = "X-Token", value = "\${my.application.token}")
public interface DemoProducer {

    //Reactive and Non-Blocking
    @Topic("demotopic")
    fun sendDemoMsgReactive(
            @KafkaKey key: String?,
            msg: Single<String>): Single<String?>?
}

并从服务层调用它

package com.tolearn.service

import com.tolearn.producer.DemoProducer
import io.reactivex.Single
import io.reactivex.SingleOnSubscribe
import javax.inject.Inject
import javax.inject.Named
import javax.inject.Singleton

@Singleton
class DemoService {
    @Inject
    @Named("dp")
    lateinit var dp : DemoProducer

    fun postDemo(key: String, msg: String){

        //Reactive and No-blocking
        val singleReturned:Single<String> = dp.sendDemoMsgReactive(key, SingleOnSubscribe<String> msg ).subscribe()

        singleReturned.doOnSuccess{
            print("ok")
        }
        singleReturned.doOnError ((e)->print(e))
    }
}

基本上,我想要的是使用 io.reactivex.Single 向 kafka“无阻塞”样式发布消息。我知道我必须订阅然后编写两个回调:onSuccess 和 onError。当然,我缺少一些关于 ReactiveX 的基本概念。请注意,任何线索将不胜感激。

标签: callbackreactive-programmingmicronautreactivexmicronaut-kafka

解决方案


推荐阅读