首页 > 解决方案 > 消费者从 avro 模式读取数据两次

问题描述

我有一个流式应用程序,它在其中侦听一些数据,然后通过将数据推送到新主题来转换数据。我使用 avro 模式将我的数据读/写到主题中。问题是当我使用下面的命令使用来自最终目的地的数据时。但是,我的数据有点复杂,其中包含一些数组和 json,我怀疑我的 avro 模式可能不适合我的目的。没有错误或任何东西,我可以看到关于我最后一个主题的所有数据,但“宠物”字段由于某种原因重复,我不明白为什么。事实上,我只在 avro 模式中的现有数据中添加了一个新字段(job_id),我在转换它时不会对其进行大的更改。

./bin/kafka-console-consumer --topic my_topic \
--bootstrap-server localhost:9092 \

这是我拥有的 json 数据

{
   "Person":{
      "id":"104440",
      "Name":"William",
      "LastName":"Dorsey",
      "archived":false,
      "Timezone":"America/Los_Angeles",
      "brandCompanyName":"Twitter",
      "brandID":"cf545a7b",
      "creatorID":"1234",
      "currency":"USD",
      "dateCreated":"2020-09-07T02:56:22Z",
      "dateModified":"2020-09-07T02:57:24Z",
      "disabled":false,
      "endDate":"2020-11-29T19:51:00-08:00",
      "startDate":"2020-08-31T20:55:00-07:00",
      "totalBudget":0
   },
   "Pets":[
      {
         "Name":"Pawny",
         "Id":"4214",
         "budget":"0",
         "adoptionDate":"2020-09-07T02:56:22Z",
         "year":"2",
         "type":"Golden",
         "gender":"male"
      }
   ],
   "CreationTime":"1604036638"
}

我的 avro 架构

{
  "name": "MyClass",
  "type": "record",
  "namespace": "com.acme.avro",
  "fields": [
    {
      "name": "Person",
      "type": {
        "name": "Person",
        "type": "record",
        "fields": [
          {
            "name": "id",
            "type": "string"
          },
          {
            "name": "Name",
            "type": "string"
          },
          {
            "name": "LastName",
            "type": "string"
          },
          {
            "name": "archived",
            "type": "boolean"
          },
          {
            "name": "Timezone",
            "type": "string"
          },
          {
            "name": "brandCompanyName",
            "type": "string"
          },
          {
            "name": "brandID",
            "type": "string"
          },
          {
            "name": "creatorID",
            "type": "string"
          },
          {
            "name": "currency",
            "type": "string"
          },
          {
            "name": "dateCreated",
            "type": "int",
            "logicalType": "date"
          },
          {
            "name": "dateModified",
            "type": "int",
            "logicalType": "date"
          },
          {
            "name": "disabled",
            "type": "boolean"
          },
          {
            "name": "endDate",
            "type": "int",
            "logicalType": "date"
          },
          {
            "name": "startDate",
            "type": "int",
            "logicalType": "date"
          },
          {
            "name": "totalBudget",
            "type": "int"
          }
        ]
      }
    },
    {
      "name": "Pets",
      "type": {
        "type": "array",
        "items": {
          "name": "Pets_record",
          "type": "record",
          "fields": [
            {
              "name": "Name",
              "type": "string"
            },
            {
              "name": "Id",
              "type": "string"
            },
            {
              "name": "budget",
              "type": "string"
            },
            {
              "name": "adoptionDate",
              "type": "int",
              "logicalType": "date"
            },
            {
              "name": "year",
              "type": "string"
            },
            {
              "name": "type",
              "type": "string"
            },
            {
              "name": "gender",
              "type": "string"
            }
          ]
        }
      }
    },
    {
      "name": "CreationTime",
      "type": "string"
    },
    {
      "name":"jobID",
      "type":"string"
    }
  ]
}

当我使用主题时,我的主题中的输出 - 宠物字段由于某种原因而重复?我不知道为什么

{
      "id":"104440",
      "Name":"William",
      "LastName:"Dorsey",
      "archived":false,
      "Timezone":"America/Los_Angeles",
      "brandCompanyName":"Twitter",
      "brandID":"cf545a7b",
      "creatorID":"1234",
      "currency":"USD",
      "dateCreated":"2020-09-07T02:56:22Z",
      "dateModified":"2020-09-07T02:57:24Z",
      "disabled":false,
      "endDate":"2020-11-29T19:51:00-08:00",
      "startDate":"2020-08-31T20:55:00-07:00",
      "totalBudget":0,
      "Pets":[
      {
         "Name":"Pawny",
         "Id":"4214",
         "budget":"0",
         "adoptionDate":2020-09-07T02:56:22Z",
         "year":"2",
         "type":"Golden",
         "gender":"male"
      }
   ],
   "CreationTime":1604036638,
   "jobID":12512,
   "pets":[
      {
         "Name":"Pawny",
         "Id":"4214",
         "budget":"0",
         "adoptionDate":2020-09-07T02:56:22Z",
         "year":"2",
         "type":"Golden",
         "gender":"male"
      }
   ]
}

标签: javaapache-kafkastreamschemaavro

解决方案


这是因为我在我的字段名称中使用了大写名称......在无休止的循环中徘徊 24 小时,如果有人遇到同样的问题,我终于能够弄清楚这一点。请阅读此处并为您的字段名使用小写字母。当我将字段名称更改为“宠物”时。副本消失了


推荐阅读