rx-java - RxJava kafka 消费者
问题描述
我在使用 RxJava 中的 KafkaConsumer 时遇到问题
RxKafkaUtils.source(consumer, List.of("es_updatess")).subscribeOn(Schedulers.io())
.buffer(2000, TimeUnit.MILLISECONDS, 10)
.filter(b -> !b.isEmpty())
.doOnNext(x -> {
System.out.println("processing " + Thread.currentThread().getName());
Thread.sleep(2000);
System.out.println(x.stream().map(ConsumerRecord::value).map(b -> {
var bbs = (byte[]) b;
return new String(bbs);
}).collect(Collectors.toList()));
})
.doOnNext(x -> {
System.out.println("committing " + Thread.currentThread().getName());
var offsets = new HashMap < TopicPartition,
OffsetAndMetadata > ();
for (var cr: x) {
offsets.put(new TopicPartition(cr.topic(), cr.partition()), new OffsetAndMetadata(cr.offset() + 1));
}
consumer.commitSync(offsets);
})
.doOnNext(x -> {
System.out.println("subscribed" + Thread.currentThread().getName());
})
.subscribe();
RxKafkaUtils.source(...) {
Flowable. < ConsumerRecord, Iterator < ConsumerRecord < Object, Object >>> generate(
() -> {
consumer.subscribe(topics);
return ConsumerRecords.empty().iterator();
},
(state, emitter) -> {
if (state.hasNext()) {
System.out.println("reusing old iter");
emitter.onNext(state.next());
return state;
} else {
try {
System.out.println("polling " + Thread.currentThread().getName());
Iterator < ConsumerRecord < Object, Object >> iter = consumer.poll(3000).iterator();
System.out.println("polled " + Thread.currentThread().getName());
while (iter.hasNext() == false) {
System.out.println("polling " + Thread.currentThread().getName());
iter = consumer.poll(2000).iterator();
System.out.println("polled " + Thread.currentThread().getName());
}
emitter.onNext(iter.next());
return iter;
} catch (WakeupException w) {
emitter.onComplete();
return state;
} catch (Throwable t) {
emitter.onError(t);
return state;
}
}
},
x -> {
System.out.print("state had records?: ");
System.out.println(x.hasNext());
})
.onErrorComplete();
}
KafkaConsumer 不允许并发访问其方法和 throws ConcurrentModificationException
。这里的问题是buffer
它要求Long.MAX_VALUE
上游的元素数量(在本例中为 Kafka)。所以基本上我的源代码在我到达提交操作的同时运行
如果一切都在同一个调度程序上处理,这将非常有效
实施这种协调的正确方法是什么?
解决方案
推荐阅读
- ios - 如何检测用户是否在 iOS 中离开我的应用程序?
- azure - 使用 Azure CLI 创建 Cosmos DB 容器时出现“找不到资源”错误
- sql - 计算经验而不重叠
- javascript - DynamoDB:ValidationException:提供的关键元素与架构不匹配
- ajax - 如何在 ajax 中为 laravel 发送 id
- c# - 使用带有 asp.net cshtml 的数据库
- garbage-collection - -XX:+CMSIncrementalMode 是在应用程序线程还是在 GC 专用线程上运行?
- c++ - 解决异常字符串以对向量进行浮动操作?
- c - 函数不返回预期值且 printf 不返回任何值
- javascript - 如何让我的 React 组件及时呈现?