首页 > 解决方案 > 向 Kafka 发送 Flux 响应

问题描述

我从 Flux 的 REST 端点的 url 列表中获取数据,

 Flux.fromIterable(sensorUrls)
                .publishOn(Schedulers.boundedElastic())
                .map(url -> webClient.get()
                        .uri(url)
                        .retrieve()
                        .bodyToFlux(Object.class)
                        .retry()
                        .subscribe(this::sendMessage)
                ).subscribe();

发送消息方法需要将数据发送到 Kafka 系统。

我的 Kafka 配置如下,

 public static KafkaSender<String, String> createKafkaSender(){
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producer");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        SenderOptions<String, String> senderOptions = SenderOptions.create(props);
        return KafkaSender.create(senderOptions);
    }

发送消息的方法是,

 private void sendMessage(Object data) {
        Flux<ProducerRecord<Integer, String>> flux = Flux.just(new ProducerRecord<>("test-topic", 2, gson.toJson(data)));
        kafkaSender.createOutbound()
                .sendTransactionally(flux) // ISSUE
                .then()
                .doOnError(a -> logger.error("error"))
                .doOnSuccess(a -> logger.error("success"));
        }
    }

编译器错误告诉我,

Required type: Publisher <? extends Publisher<? extends ProducerRecord<String, String>>>
Provided: Flux<ProducerRecord<Integer, String>>

有任何建议或帮助转换为 Publisher 类型以使其工作吗?

标签: apache-kafkaspring-webfluxproject-reactor

解决方案


推荐阅读