首页 > 解决方案 > Kafka connect :停止任务,是否提交了偏移量

问题描述

我在 kafka connect SinkTask 实现中有如下代码。对于关键异常,我想stop()在任务中调用并停止任务。

我需要知道我是否转到stop()kafka connect 提交 kafka 主题分区中的偏移量?即,如果我处理我的异常,记录它,根据异常执行一些活动并调用stop(),是否提交了主题的偏移量(应用程序主题),因为没有异常put()?我假设因为没有成功运行put()或批处理没有完成(基于最大轮询记录)偏移量不应该完成,所以当我重试时,不会错过任何记录。如果没有提交偏移量,请告诉我是否有人可以提供建议?

代码 :

public class ExampleSinkTask extends SinkTask {
    @Override
    public void start(Map<String, String> map) {
    ....

    }

    @Override
    private void doSomeCriticalOperation()  {
        // call stop if critical operation fails. 

        ...

        // Exception handled, close the task (allow user interference in such cases) 
        stop();
    }

    @Override
    public void put(Collection<SinkRecord> collection)  {
        ..
        doSomeCriticalOperation();
        ..

    }

    @Override
    public void stop() {
        log.info("*** Stopping Kafka task completely ***");
        System.exit(0);
    }
}

标签: apache-kafkaapache-kafka-connect

解决方案


推荐阅读