首页 > 解决方案 > 如何从 Debezium 创建的 avro 消息中获取字段?

问题描述

我想根据 ts_ms 时间过滤我的消息。问题是我无法从 avro 消息中获取 ts_ms。这是我精简的 avro .avsc 文件:

{
  "type": "record",
  "name": "Envelope",
  "namespace": "mysql.company.scores",
  "fields": [
    {
      "name": "before",
      "type": [
        "null",
        {
          "type": "record",
          "name": "Value",
          "fields": [
            <Some fields based on scores table>
          ],
          "connect.name": "mysql.company.scores.Value"
        }
      ],
      "default": null
    },
    {
      "name": "after",
      "type": [
        "null",
        "Value"
      ],
      "default": null
    },
    {
      "name": "source",
      "type": {
        "type": "record",
        "name": "Source",
        "namespace": "io.debezium.connector.mysql",
        "fields": [
          {
            "name": "version",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "connector",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "name",
            "type": "string"
          },
          {
            "name": "server_id",
            "type": "long"
          },
          {
            "name": "ts_sec",
            "type": "long"
          },
          {
            "name": "gtid",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "file",
            "type": "string"
          },
          {
            "name": "pos",
            "type": "long"
          },
          {
            "name": "row",
            "type": "int"
          },
          {
            "name": "snapshot",
            "type": [
              {
                "type": "boolean",
                "connect.default": false
              },
              "null"
            ],
            "default": false
          },
          {
            "name": "thread",
            "type": [
              "null",
              "long"
            ],
            "default": null
          },
          {
            "name": "db",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "table",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "query",
            "type": [
              "null",
              "string"
            ],
            "default": null
          }
        ],
        "connect.name": "io.debezium.connector.mysql.Source"
      }
    },
    {
      "name": "op",
      "type": "string"
    },
    {
      "name": "ts_ms",
      "type": [
        "null",
        "long"
      ],
      "default": null
    }
  ],
  "connect.name": "mysql.company.scores.Envelope"
}

我可以在之前或之后访问,但是当我可以使用 getTs_ms 进行以下方法时,我得到符号无法找到方法:

private boolean isRecordNew(mysql.company.scores.Envelope value){
        return value.getTs_ms() > 1580988600000L;
    }

这是我的 serde 类的相关部分:

public static Serde<mysql.company.scores.Envelope> getEnvelopeSerde() {
        SpecificAvroSerde<mysql.company.scores.Envelope> scoreSerde = new SpecificAvroSerde();
        scoreSerde.configure(
                Collections.singletonMap(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
                        schemaRegistryUrl), false);
        return scoreSerde;
    }

我应该能够使用相同的 serde 类访问 ts_ms 字段,还是应该更改它以将其包含在值中?

标签: javaavrodebezium

解决方案


正如@cricket_007 在评论中提到的,我查看了生成的类并命名了该字段,getTsMs()并通过使用此方法解决了它。


推荐阅读