apache-camel - 带有 Spring FileStateRepository 的 Camel kafka
问题描述
我试图将Kafka偏移量保存到我使用Spring引导的文件中,似乎偏移量正在写入文件但没有读取,所以骆驼将在重启时从kafka主题的开头开始读取
@Component
public class Route extends RouteBuilder {
@Override
public void configure() throws Exception {
from(kafka())
.to("log:TEST?level=INFO")
.process(Route::commitKafka);
}
private String kafka() {
String kafkaEndpoint = "kafka:";
kafkaEndpoint += "topic";
kafkaEndpoint += "?brokers=";
kafkaEndpoint += "localhost:9092";
kafkaEndpoint += "&groupId=";
kafkaEndpoint += "TEST";
kafkaEndpoint += "&autoOffsetReset=";
kafkaEndpoint += "earliest";
kafkaEndpoint += "&autoCommitEnable=";
kafkaEndpoint += false;
kafkaEndpoint += "&allowManualCommit=";
kafkaEndpoint += true;
kafkaEndpoint += "&offsetRepository=";
kafkaEndpoint += "#fileStore";
return kafkaEndpoint;
}
@Bean(name = "fileStore")
private FileStateRepository fileStateRepository() {
FileStateRepository fileStateRepository = FileStateRepository.fileStateRepository(new File("/kafka/offset_repo/repo.dat"));
// This will be empty
// System.out.println(fileStateRepository.getCache());
return fileStateRepository;
}
private static void commitKafka(Exchange exchange) {
KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
manual.commitSync();
}
}
解决方案
我终于找到了一个解决方案,但它没有出现在文档中,必须调用 start 方法来在启动时初始化 repo
@Bean(name = "fileStore")
private FileStateRepository fileStateRepository() {
FileStateRepository fileStateRepository = FileStateRepository.fileStateRepository(new File("/kafka/offset_repo/repo.dat"));
try {
fileStateRepository.start();
} catch (Exception e) {
e.printStackTrace();
}
return fileStateRepository;
}
推荐阅读
- php - PHP 超全局 $_FILES[] 已设置,即使未设置
- python - 有没有一种有效的方法(不是 for 循环)在 numpy 中初始化一个数组,其中每个单元格都是前一个单元格的倍数?
- sql - 使用 SQL 根据给定规则生成 SessionID
- jquery - jQuery 选择器的父级和兄弟级
- crystal-reports - 在 Crystal Report 中对一列求和,何时为空 Sum = 0
- sql-server - 错误格式化查询,可能是无效参数
- google-sheets - 如何使用过滤后的结果进行 VLOOKUP
- c++ - 尝试打开并读取 .txt 文件,但它从原始文件中删除了实际文本(Qt GUI C++
- xcode - Xcode 11.2 和 11.2.1 GM SEED -- 无法安装到设备
- python - 更改全息视图热图分类刻度标签