首页 > 解决方案 > 设置sourceOffset,唯一键,日志压缩后Kafka Connect在主题中复制消息

问题描述

我尝试执行以下操作:

public List<SourceRecord> poll() throws InterruptedException {
  List<SourceRecord> records = new ArrayList<>();

  JSONArray jsonRecords = getRecords(0, 3);

  for (Object jsonRecord: jsonRecords) {
   JSONObject j = new JSONObject(jsonRecord.toString());

   Map sourceOffset = Collections.singletonMap("block", j.get("block").toString());
   Object value = j.get("data").toString();

   records.add(new SourceRecord(
    Collections.singletonMap("samesourcepartition", "samesourcepartition"), // sourcePartition
    sourceOffset, // sourceOffset
    "mytopic", // topic
    Schema.STRING_SCHEMA, // keySchema
    j.get("block").toString, // key: "0", "1", "2", "3"
    Schema.STRING_SCHEMA, // valueSchema
    value // value
   ));

   log.info("added record for block: " + j.get("block"));
  }

  log.info("Returning {} records", records.size());

  return records;
}

我对如何使用感到困惑sourceOffset。(https://docs.confluent.io/current/connect/devguide.html#task-example-source-task

的一个例子block可能是"3"。我希望如果 Kafka 已经读过这个sourceOffset,它不应该再读它。但它似乎完全忽略了这一点,offset继续增长超过 3 并在无限循环中不断重复相同的 0-3 数据。例如,如果我查看 Confluent 仪表板 > 主题 > 检查,我希望最高offset记录key为“3”,但它超过 100 多个重复的键和值。

我的 poll() 是否需要增加 0->3 以便知道何时“停止”?当前的行为不断重复 0->3, 0->3, ... 添加new SourceRecord(),但我想sourceOffset和唯一key应该是幂等的。

我确定我误解了一些东西。我也尝试打开log compaction,但即使使用相同的键仍然会重复。sourceOffset有人可以展示每个/的消息的正确用途key吗?

标签: javaapache-kafkaapache-kafka-connect

解决方案


推荐阅读