apache-kafka - 向 Kafka 发送 Flux 响应
问题描述
我从 Flux 的 REST 端点的 url 列表中获取数据,
Flux.fromIterable(sensorUrls)
.publishOn(Schedulers.boundedElastic())
.map(url -> webClient.get()
.uri(url)
.retrieve()
.bodyToFlux(Object.class)
.retry()
.subscribe(this::sendMessage)
).subscribe();
发送消息方法需要将数据发送到 Kafka 系统。
我的 Kafka 配置如下,
public static KafkaSender<String, String> createKafkaSender(){
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producer");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
SenderOptions<String, String> senderOptions = SenderOptions.create(props);
return KafkaSender.create(senderOptions);
}
发送消息的方法是,
private void sendMessage(Object data) {
Flux<ProducerRecord<Integer, String>> flux = Flux.just(new ProducerRecord<>("test-topic", 2, gson.toJson(data)));
kafkaSender.createOutbound()
.sendTransactionally(flux) // ISSUE
.then()
.doOnError(a -> logger.error("error"))
.doOnSuccess(a -> logger.error("success"));
}
}
编译器错误告诉我,
Required type: Publisher <? extends Publisher<? extends ProducerRecord<String, String>>>
Provided: Flux<ProducerRecord<Integer, String>>
有任何建议或帮助转换为 Publisher 类型以使其工作吗?
解决方案
推荐阅读
- wordpress - Wordpress JWT 身份验证令牌问题
- events - 如何在 Nuxt.js 中的图标链接上正确实现悬停效果
- python-3.x - 我有一个 .CSV 文件,其中包含这些日期的日期和 gms 值。是否可以对此应用线性回归?
- python - 替换部分字符串 Python
- dart - 列中的定位小部件导致卡片溢出
- python - 如果需要多个标准输入,python asyncio 会死锁
- php - 仅比较数组的值并获得差异
- laravel - Laravel Notification 重复错误,通过(数据库,电子邮件)发送多个通知
- android - 对话框中 Spinner 和 EditText 不为空时启用按钮
- android - ExecutionException:com.google.android.apps.gsa.sidekick.main.hn:无法完成刷新条目的计划请求。客户端错误代码:3