首页 > 解决方案 > debezium 如何手动提交偏移量

问题描述

我使用 debezium 将数据从 Postgres 同步到 flink,并使用此代码创建引擎

this.engine = DebeziumEngine.create(Connect.class)
            .using(properties)
            .notifying(debeziumConsumer)
            .using((success, message, error) -> {
                if (!success && error != null) {
                    this.reportError(error);
                }
            })
            .build();

我想ChangeEventSourceCoordinator#commitOffset在 flink 做 checkpoint 的时候调用,但是coordinator是 private inBaseSourceTask并且task是 private in EmbeddedEngine,所以我不能调用commitOffset我的代码,有没有其他方法可以实现手动提交?

public final class EmbeddedEngine implements DebeziumEngine<SourceRecord>{
       private SourceTask task;
}
public abstract class BaseSourceTask extends SourceTask {
       private ChangeEventSourceCoordinator coordinator;
}

标签: debezium

解决方案


推荐阅读