apache-kafka - Kafka 连接器配置错误:filter.condition:定义的 json 路径无效
问题描述
我正在尝试将 Confluent 的Filter SMT与 Debezium 示例 unwrap -smt一起使用。
我将以下配置添加到源连接器(Debezium MySQL)配置:
"transforms": "route,csFilter",
...
...
"transforms.csFilter.type": "io.confluent.connect.transforms.Filter$Value",
"transforms.csFilter.filter.condition": "$.payload.after.source == 2",
"transforms.csFilter.filter.type": "exclude",
"transforms.csFilter.missing.or.null.behavior": "fail"
由于这个 Filter SMT 是由 Confluent 提供的,所以我下载了jar 文件并将 (connect-transforms, connect-utils, json-path) jar 文件复制到path-to-kafka/connect/debezium-connector-mysql
目录中。
当我尝试注册 Debezium MySQL 源连接器时,
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json"
localhost:8083/connectors/ -d @source_connector_config.json
我收到了这个错误:
{"error_code":400,
"message":"Connector configuration is invalid and contains the following 1 error(s):\n
Invalid value $.payload.after.source == 2 for configuration filter.condition: Invalid json path defined.
Please refer to https://github.com/json-path/JsonPath README for correct use of json path.\n
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}
我使用本指南中提供的示例检查了 JSON 路径表达式。似乎没问题。
你能指出我正确的方向吗?我错过了什么?谢谢。
解决方案
请尝试使用此条件:$.payload.after[?(@.source == 2)]
推荐阅读
- c - 如何从 C 中的 **argv 派生 argc
- java - 使用 Jenkinsfile、Maven 和 Java 从参数化 Jenkins 项目传递变量
- prolog - 包装值的 Prolog 添加
- javascript - 在 React Native 中发布和订阅事件
- r - 从数据框中创建具有缺失值的 ts 时间序列
- java - Retrofit 中未正确处理 REST 方法中的参数
- c++ - std::filesystem::create_directories Visual Studio 2017
- sparql - 如何使用 SPARQL 加减数字?
- cordova - Ionic 3 闪屏在某些 iPhone 上冻结
- javascript - getElementById 其中 Element 是动态创建并附加到 DOM