首页 > 解决方案 > 当 KStream 拓扑中的自定义类中发生错误时,消费者偏移量会发生什么情况?

问题描述

我知道您可以以拓扑的形式定义流处理 Kafka 应用程序,该拓扑隐式了解哪些记录已成功通过,因此可以正确提交消费者偏移量,以便在必须重新启动微服务时,它将继续阅读输入主题而不会丢失消息。

但是当我将自己的处理类引入流时会发生什么?例如,也许我需要将输入记录中的信息提交到启动时间较长的 Web 服务。所以我编写了自己的处理器类,它会累积 1000 条消息,然后向外部服务提交批处理请求,就像这样。

    KStream<String, Prediction> stream = new StreamsBuilder()
        .stream(inputTopic, Consumed.with(Serdes.String(), new MessageSerde()))

        // talk to web service
        .map((k, v) -> new KeyValue<>("", wrapper.consume(v.getPayload())))
        .flatMapValues((ValueMapper<List<Prediction>, Iterable<Prediction>>) value -> value);

        // send downstream
        stream.peek((k, v) -> metrics.countOutgoingMessage())
        .to(outputTopic, Produced.with(Serdes.String(), new PredictionSerde()));

假设外部服务可以为每个输入发出零个、一个或多个某种类型的预测,并且我的包装器分批提交输入以增加吞吐量。在我看来,KStream 不可能跟踪哪个输入记录对应哪个输出记录,因此无论它如何实现,它都不能保证输入主题的正确消费者偏移量被提交。

那么在这个范例中,我怎样才能给库提示哪些消息已成功处理?或者如果做不到这一点,我如何才能访问主题的消费者偏移量并明确执行提交,以免发生数据丢失?

标签: apache-kafkaapache-kafka-streams

解决方案


如果您使用地图,我认为您可能会遇到问题。不建议在 DSL 运营商中组合远程调用。您可能想研究使用处理器 API文档。有了ProcessorContext您可以forwardcommit可以为您提供所需的灵活性。


推荐阅读