java - Reactor Kafka 中基于分区排序的并发处理
问题描述
我正在开发一个示例应用程序,该应用程序将从 Kafka 主题的不同分区读取,同时处理基于分区排序的记录并将记录写入另一个主题的不同分区。这是我写的示例代码
public class MetricsTransposer {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
static abstract class SetKafkaProperties {
final String SOURCE_TOPIC;
final String DESTINATION_TOPIC;
final Map<String, Object> consumerProps;
final Map<String, Object> producerProps;
SetKafkaProperties(Map<String, Object> consumerPropsOverride, Map<String, Object> producerPropsOverride, String bootstrapServers, String sourceTopic, String destTopic) {
SOURCE_TOPIC = sourceTopic;
DESTINATION_TOPIC = destTopic;
consumerProps = new HashMap<String, Object>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "reactive-group-" + System.currentTimeMillis());
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0");
if(consumerPropsOverride != null) {
consumerProps.putAll(consumerPropsOverride);
}
producerProps = new HashMap<String, Object>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0");
producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, String.valueOf(Long.MAX_VALUE));
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class);
if(producerPropsOverride != null) {
producerProps.putAll(producerPropsOverride);
}
}
}
static class ReactiveTranspose extends SetKafkaProperties {
SenderOptions<Integer, String> senderOptions =
SenderOptions.<Integer, String>create(producerProps)
.maxInFlight(1024);
KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions);
ReceiverOptions<Integer, String> receiverOptions =
ReceiverOptions.<Integer, String>create(consumerProps)
.subscription(Collections.singleton(SOURCE_TOPIC));
ReactiveTranspose(Map<String, Object> consumerPropsOverride, Map<String, Object> producerPropsOverride, String bootstrapServers, String sourceTopic, String destTopic) {
super(consumerPropsOverride, producerPropsOverride, bootstrapServers, sourceTopic, destTopic);
}
public Disposable ReadProcessWriteRecords() {
Scheduler scheduler = Schedulers.newBoundedElastic(60, 60, "writerThreads");
return KafkaReceiver.create(receiverOptions)
.receive()
.doOnNext( r -> System.out.printf("Record received: {0}", r.value()))
.groupBy(m -> m.receiverOffset().topicPartition())
.flatMap(partitionFlux ->
partitionFlux.publishOn(scheduler)
.map(r -> processRecord(partitionFlux.key(), r))
.sample(Duration.ofMillis(5000))
.concatMap(offset -> offset.commit()))
.subscribe();
}
private ReceiverOffset processRecord(TopicPartition topicPartition, ReceiverRecord<Integer, String> message) {
System.out.printf("Processing record {} from partition {} in thread{}",
message.value(), topicPartition, Thread.currentThread().getName());
return message.receiverOffset();
}
}
public static void RunReactiveTranformProcess(String sourceTopic, String destinationTopic) {
ReactiveTranspose transpose = new ReactiveTranspose(null, null, BOOTSTRAP_SERVERS, sourceTopic, destinationTopic);
transpose.ReadProcessWriteRecords();
}
public static void main(String[] args) throws Exception {
String sourceTopic = "metrics";
String destinationTopic = "cleanmetrics";
RunReactiveTranformProcess(sourceTopic, destinationTopic);
}
}
当我运行应用程序时,我没有在控制台中看到打印语句。我确实有要在该主题中使用的数据。所以我想知道代码是否与主题有关。我正在寻求帮助,以了解如何检查它是否连接到主题并阅读消息或这里可能出现的问题。
我是 Java、反应式编程和 Kafka 的新手。这是一个自学项目,我很可能遗漏了一些简单明了的东西。
更多信息:这是我的日志的快照。我有一个名为 metrics 的主题,有 3 个分区
更新:我没有看到我的打印语句,因为我的主题中有数据,但 auto.offset.reset 设置为最新。将其更改为最早消耗现有数据。
解决方案
你的问题在这里:
public void ReadProcessWriteRecords() {
Scheduler scheduler = Schedulers.newBoundedElastic(60, 60, "writerThreads");
// Here you are ignoring the return
// Nothing happens until you subscribe
// So this is merly a statement not a execution.
KafkaReceiver.create(receiverOptions)
.receive()
.doOnNext( r -> System.out.printf("Record received: {0}", r.value()))
.groupBy(m -> m.receiverOffset().topicPartition())
.flatMap(partitionFlux ->
partitionFlux.publishOn(scheduler)
.map(r -> processRecord(partitionFlux.key(), r))
.sample(Duration.ofMillis(5000))
.concatMap(offset -> offset.commit()));
}
反应式文档在入门部分中对此进行了介绍nothing happens until you subscribe
。在上面的代码中,您正在创建一个反应流,但是没有人订阅它。
由于您的应用程序是流的使用者,您应该在subscribe
某处添加一条语句。
我个人不会返回 void (您通常会尽量避免在反应式编程中使用 void 函数,因为这些通常会导致副作用并且难以测试),我会producer
一直返回到 main 函数,以便可以对代码进行单元测试.
这样生成的主要功能看起来像这样。
public static void main(String[] args) throws Exception {
String sourceTopic = "metrics";
String destinationTopic = "cleanmetrics";
RunReactiveTranformProcess(sourceTopic, destinationTopic).subscribe();
}
推荐阅读
- c# - 了解 C# 中的资源/资源文件及其用途的问题
- mariadb - mariadb.service:无法设置挂载命名空间:权限被拒绝/在命名空间生成步骤中失败
- python - pandas multiindex - 根据子索引的数量删除行
- dictionary - 如何将 JuMP 变量的值存储到 julia 中的嵌套字典中
- google-apps-script - 我可以使用 DriveApp searchFiles 功能来查找祖父母而不是父文件夹吗?
- flutter - Flutter:对话的语义——VoiceOver 读出“dismiss”
- reactjs - 在 setState 的回调中调用 setState 是否可以接受?
- python - 如何在python列表元素前面附加一个字母而不指示''
- java - 如何使用 WebView 从 Google Drive 下载 PDF 文件
- sql - 使用数据框和 dbSendQuery 更新表