首页 > 解决方案 > Rest api 设计与 apache kafka java 和 micronaut 应用程序

问题描述

在此处输入图像描述

在使用 Micronaut 框架处理 Rest API CRUD 操作时,我有上图。我有单向流,控制器需要知道从 Kafka 消费者 API 执行的操作。

例如

  1. 要从数据库中获取所有项目列表,它是在消费者级别执行的
  2. 消费者级别的添加/更新/删除操作

我在消费者级别(侦听器)有以下 Micronaut 反应性代码,以从 mongo DB 获取产品列表

@Topic(ProductTopicConstants.GET_FREE_TEXT_SEARCH)
public Flowable<Product> findByFreeText(String text) {
    LOG.info(String.format("Listener --> Listening value = %s", text));
    return Flowable.fromPublisher(repository.getCollection("product", Product.class)
            .find(new Document("$text",
                    new Document("$search", text)
                            .append("$caseSensitive", false)
                            .append("$diacriticSensitive", false)
            )));
}

生产者界面

@KafkaClient
public interface IProductProducer {
    @Topic(ProductTopicConstants.GET_FREE_TEXT_SEARCH)
        Flowable<Product> findFreeText(String text);
}

生产者的实现

@Override
    public Flowable<ProductViewModel> findFreeText(String text) {
        LOG.info("Manager --> Finding all the products");
        List<ProductViewModel> model = new ArrayList<>();
        iProductProducer.findFreeText(text).subscribe(item -> {
            System.out.println(item);
        }, error ->{
            System.out.println(error);
        });
        return Flowable.just(new ProductViewModel());
    }

来自 Micronaut 文档https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#kafkaListenerMethods

根据 Micronaut 文档接收和返回反应类型

@Topic("reactive-products")
public Single<Product> receive(
        @KafkaKey String brand,  
        Single<Product> productFlowable) { 
    return productFlowable.doOnSuccess((product) ->
            System.out.println("Got Product - " + product.getName() + " by " + brand) 
    );
}

在我的生产者实现中,我在订阅时从未收到任何值。

iProductProducer.findFreeText(text).subscribe(item -> {
                System.out.println(item);
            }, error ->{
                System.out.println(error);
            });

我想知道上述流程是否是使用 kafka 执行 REST API CRUD 操作的正确方法?

标签: javaapache-kafkarx-javamicronautmicronaut-kafka

解决方案


推荐阅读