java - 如何从 Debezium 创建的 avro 消息中获取字段?
问题描述
我想根据 ts_ms 时间过滤我的消息。问题是我无法从 avro 消息中获取 ts_ms。这是我精简的 avro .avsc 文件:
{
"type": "record",
"name": "Envelope",
"namespace": "mysql.company.scores",
"fields": [
{
"name": "before",
"type": [
"null",
{
"type": "record",
"name": "Value",
"fields": [
<Some fields based on scores table>
],
"connect.name": "mysql.company.scores.Value"
}
],
"default": null
},
{
"name": "after",
"type": [
"null",
"Value"
],
"default": null
},
{
"name": "source",
"type": {
"type": "record",
"name": "Source",
"namespace": "io.debezium.connector.mysql",
"fields": [
{
"name": "version",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "connector",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "name",
"type": "string"
},
{
"name": "server_id",
"type": "long"
},
{
"name": "ts_sec",
"type": "long"
},
{
"name": "gtid",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "file",
"type": "string"
},
{
"name": "pos",
"type": "long"
},
{
"name": "row",
"type": "int"
},
{
"name": "snapshot",
"type": [
{
"type": "boolean",
"connect.default": false
},
"null"
],
"default": false
},
{
"name": "thread",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "db",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "table",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "query",
"type": [
"null",
"string"
],
"default": null
}
],
"connect.name": "io.debezium.connector.mysql.Source"
}
},
{
"name": "op",
"type": "string"
},
{
"name": "ts_ms",
"type": [
"null",
"long"
],
"default": null
}
],
"connect.name": "mysql.company.scores.Envelope"
}
我可以在之前或之后访问,但是当我可以使用 getTs_ms 进行以下方法时,我得到符号无法找到方法:
private boolean isRecordNew(mysql.company.scores.Envelope value){
return value.getTs_ms() > 1580988600000L;
}
这是我的 serde 类的相关部分:
public static Serde<mysql.company.scores.Envelope> getEnvelopeSerde() {
SpecificAvroSerde<mysql.company.scores.Envelope> scoreSerde = new SpecificAvroSerde();
scoreSerde.configure(
Collections.singletonMap(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
schemaRegistryUrl), false);
return scoreSerde;
}
我应该能够使用相同的 serde 类访问 ts_ms 字段,还是应该更改它以将其包含在值中?
解决方案
正如@cricket_007 在评论中提到的,我查看了生成的类并命名了该字段,getTsMs()
并通过使用此方法解决了它。
推荐阅读
- react-native - 流程如何与本机反应和选项'all = true'一起使用?
- c# - 如何在调试应用程序时打开/编辑/保护本地 app.config 文件?
- delphi - Delphi cxGrid 如何防止在移动列时创建第二行标题?
- python - Python Regex:从字符串中解析键/值对
- laravel - Laravel s3存储:未定义的方法'url'
- python - 包含多个 np.multipy 语句的代码片段优化
- android - 如何将耀斑动画从几秒钟减慢到几小时
- xamarin - Xamarin.Forms iOS 添加联系人(通过依赖服务)
- amazon-web-services - AWS Cloudwatch 过滤日志事件
- html - 缩小边框大小并平均其他边框大小