java - 消费者从 avro 模式读取数据两次
问题描述
我有一个流式应用程序,它在其中侦听一些数据,然后通过将数据推送到新主题来转换数据。我使用 avro 模式将我的数据读/写到主题中。问题是当我使用下面的命令使用来自最终目的地的数据时。但是,我的数据有点复杂,其中包含一些数组和 json,我怀疑我的 avro 模式可能不适合我的目的。没有错误或任何东西,我可以看到关于我最后一个主题的所有数据,但“宠物”字段由于某种原因重复,我不明白为什么。事实上,我只在 avro 模式中的现有数据中添加了一个新字段(job_id),我在转换它时不会对其进行大的更改。
./bin/kafka-console-consumer --topic my_topic \
--bootstrap-server localhost:9092 \
这是我拥有的 json 数据
{
"Person":{
"id":"104440",
"Name":"William",
"LastName":"Dorsey",
"archived":false,
"Timezone":"America/Los_Angeles",
"brandCompanyName":"Twitter",
"brandID":"cf545a7b",
"creatorID":"1234",
"currency":"USD",
"dateCreated":"2020-09-07T02:56:22Z",
"dateModified":"2020-09-07T02:57:24Z",
"disabled":false,
"endDate":"2020-11-29T19:51:00-08:00",
"startDate":"2020-08-31T20:55:00-07:00",
"totalBudget":0
},
"Pets":[
{
"Name":"Pawny",
"Id":"4214",
"budget":"0",
"adoptionDate":"2020-09-07T02:56:22Z",
"year":"2",
"type":"Golden",
"gender":"male"
}
],
"CreationTime":"1604036638"
}
我的 avro 架构
{
"name": "MyClass",
"type": "record",
"namespace": "com.acme.avro",
"fields": [
{
"name": "Person",
"type": {
"name": "Person",
"type": "record",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "Name",
"type": "string"
},
{
"name": "LastName",
"type": "string"
},
{
"name": "archived",
"type": "boolean"
},
{
"name": "Timezone",
"type": "string"
},
{
"name": "brandCompanyName",
"type": "string"
},
{
"name": "brandID",
"type": "string"
},
{
"name": "creatorID",
"type": "string"
},
{
"name": "currency",
"type": "string"
},
{
"name": "dateCreated",
"type": "int",
"logicalType": "date"
},
{
"name": "dateModified",
"type": "int",
"logicalType": "date"
},
{
"name": "disabled",
"type": "boolean"
},
{
"name": "endDate",
"type": "int",
"logicalType": "date"
},
{
"name": "startDate",
"type": "int",
"logicalType": "date"
},
{
"name": "totalBudget",
"type": "int"
}
]
}
},
{
"name": "Pets",
"type": {
"type": "array",
"items": {
"name": "Pets_record",
"type": "record",
"fields": [
{
"name": "Name",
"type": "string"
},
{
"name": "Id",
"type": "string"
},
{
"name": "budget",
"type": "string"
},
{
"name": "adoptionDate",
"type": "int",
"logicalType": "date"
},
{
"name": "year",
"type": "string"
},
{
"name": "type",
"type": "string"
},
{
"name": "gender",
"type": "string"
}
]
}
}
},
{
"name": "CreationTime",
"type": "string"
},
{
"name":"jobID",
"type":"string"
}
]
}
当我使用主题时,我的主题中的输出 - 宠物字段由于某种原因而重复?我不知道为什么
{
"id":"104440",
"Name":"William",
"LastName:"Dorsey",
"archived":false,
"Timezone":"America/Los_Angeles",
"brandCompanyName":"Twitter",
"brandID":"cf545a7b",
"creatorID":"1234",
"currency":"USD",
"dateCreated":"2020-09-07T02:56:22Z",
"dateModified":"2020-09-07T02:57:24Z",
"disabled":false,
"endDate":"2020-11-29T19:51:00-08:00",
"startDate":"2020-08-31T20:55:00-07:00",
"totalBudget":0,
"Pets":[
{
"Name":"Pawny",
"Id":"4214",
"budget":"0",
"adoptionDate":2020-09-07T02:56:22Z",
"year":"2",
"type":"Golden",
"gender":"male"
}
],
"CreationTime":1604036638,
"jobID":12512,
"pets":[
{
"Name":"Pawny",
"Id":"4214",
"budget":"0",
"adoptionDate":2020-09-07T02:56:22Z",
"year":"2",
"type":"Golden",
"gender":"male"
}
]
}
解决方案
这是因为我在我的字段名称中使用了大写名称......在无休止的循环中徘徊 24 小时,如果有人遇到同样的问题,我终于能够弄清楚这一点。请阅读此处并为您的字段名使用小写字母。当我将字段名称更改为“宠物”时。副本消失了
推荐阅读
- python - 从 setup.py 安装但 ModuleNotFoundError
- optimization - 当纸浆优化没有在时间限制内解决时,找出哪些约束不正确
- javascript - 如何专门针对渲染更改而不是为每个映射的元素显示它?
- wagtail - 如何将索引号附加到 Wagtail CMS 中的自动生成的 slug
- swift - 将字符串添加到来自用户输入 Swift 的字符串(而不是数组!)
- java - 连接处于活动状态且数据库正常工作时,ResultSet 为空
- flutter - 未定义命名参数“body”。错误
- html - 如何将不同屏幕尺寸的按钮保持在同一位置?
- r - 不支持 UNC 路径。闪亮的应用程序无法通过批处理文件运行
- mysql - MySQL LIKE 查询 - 仅返回 SELECT 中匹配的子字符串