java - 设置sourceOffset,唯一键,日志压缩后Kafka Connect在主题中复制消息
问题描述
我尝试执行以下操作:
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> records = new ArrayList<>();
JSONArray jsonRecords = getRecords(0, 3);
for (Object jsonRecord: jsonRecords) {
JSONObject j = new JSONObject(jsonRecord.toString());
Map sourceOffset = Collections.singletonMap("block", j.get("block").toString());
Object value = j.get("data").toString();
records.add(new SourceRecord(
Collections.singletonMap("samesourcepartition", "samesourcepartition"), // sourcePartition
sourceOffset, // sourceOffset
"mytopic", // topic
Schema.STRING_SCHEMA, // keySchema
j.get("block").toString, // key: "0", "1", "2", "3"
Schema.STRING_SCHEMA, // valueSchema
value // value
));
log.info("added record for block: " + j.get("block"));
}
log.info("Returning {} records", records.size());
return records;
}
我对如何使用感到困惑sourceOffset
。(https://docs.confluent.io/current/connect/devguide.html#task-example-source-task)
的一个例子block
可能是"3"
。我希望如果 Kafka 已经读过这个sourceOffset
,它不应该再读它。但它似乎完全忽略了这一点,offset
继续增长超过 3 并在无限循环中不断重复相同的 0-3 数据。例如,如果我查看 Confluent 仪表板 > 主题 > 检查,我希望最高offset
记录key
为“3”,但它超过 100 多个重复的键和值。
我的 poll() 是否需要增加 0->3 以便知道何时“停止”?当前的行为不断重复 0->3, 0->3, ... 添加new SourceRecord()
,但我想sourceOffset
和唯一key
应该是幂等的。
我确定我误解了一些东西。我也尝试打开log compaction
,但即使使用相同的键仍然会重复。sourceOffset
有人可以展示每个/的消息的正确用途key
吗?