首页 > 解决方案 > RxJava kafka 消费者

问题描述

我在使用 RxJava 中的 KafkaConsumer 时遇到问题

RxKafkaUtils.source(consumer, List.of("es_updatess")).subscribeOn(Schedulers.io())
  .buffer(2000, TimeUnit.MILLISECONDS, 10)
  .filter(b -> !b.isEmpty())
  .doOnNext(x -> {
    System.out.println("processing " + Thread.currentThread().getName());
    Thread.sleep(2000);
    System.out.println(x.stream().map(ConsumerRecord::value).map(b -> {
      var bbs = (byte[]) b;
      return new String(bbs);
    }).collect(Collectors.toList()));
  })
  .doOnNext(x -> {
    System.out.println("committing " + Thread.currentThread().getName());
    var offsets = new HashMap < TopicPartition,
      OffsetAndMetadata > ();
    for (var cr: x) {
      offsets.put(new TopicPartition(cr.topic(), cr.partition()), new OffsetAndMetadata(cr.offset() + 1));
    }
    consumer.commitSync(offsets);
  })
  .doOnNext(x -> {
    System.out.println("subscribed" + Thread.currentThread().getName());
  })
  .subscribe();


RxKafkaUtils.source(...) {
      Flowable. < ConsumerRecord, Iterator < ConsumerRecord < Object, Object >>> generate(
    () -> {
      consumer.subscribe(topics);
      return ConsumerRecords.empty().iterator();
    },
    (state, emitter) -> {
      if (state.hasNext()) {
        System.out.println("reusing old iter");
        emitter.onNext(state.next());
        return state;
      } else {
        try {
          System.out.println("polling " + Thread.currentThread().getName());
          Iterator < ConsumerRecord < Object, Object >> iter = consumer.poll(3000).iterator();
          System.out.println("polled " + Thread.currentThread().getName());
          while (iter.hasNext() == false) {
            System.out.println("polling " + Thread.currentThread().getName());
            iter = consumer.poll(2000).iterator();
            System.out.println("polled " + Thread.currentThread().getName());
          }
          emitter.onNext(iter.next());
          return iter;
        } catch (WakeupException w) {
          emitter.onComplete();
          return state;
        } catch (Throwable t) {
          emitter.onError(t);
          return state;
        }
      }
    },
    x -> {
      System.out.print("state had records?: ");
      System.out.println(x.hasNext());
    })
  .onErrorComplete();
}

KafkaConsumer 不允许并发访问其方法和 throws ConcurrentModificationException。这里的问题是buffer它要求Long.MAX_VALUE上游的元素数量(在本例中为 Kafka)。所以基本上我的源代码在我到达提交操作的同时运行

如果一切都在同一个调度程序上处理,这将非常有效

实施这种协调的正确方法是什么?

标签: rx-java

解决方案


推荐阅读