apache-kafka - DataStax Cassandra Sink Connector - 根据条件从 Kafka 主题中摄取数据
问题描述
我正在尝试将 Kafka 主题中的数据实时提取到 Cassandra 表中。为此,我使用 DataStax Cassandra Sink 连接器。主题中事件的格式将是 JSON。我可以将主题中事件的 JSON 字段直接映射到表中,但这并不是我所需要的。
我的场景是这样的:有一个主题需要映射到多个 Cassandra 表。主题中的事件应该可以根据某些条件进入表格。假设有具有不同类型标头 A、B 和 C 的事件。具有标头 A 的事件需要转到 Cassandra 表 A,标头 B 事件需要转到表 B,标头 C 到表 C。在连接器配置 JSON 文件中,我我能做到这一点吗?
我对单个主题和单个表进行了一些尝试,并尝试在“ID”字段上提取具有特定条件的事件。
我当前的连接器配置文件:
{
"name": "cassandra-json-sink",
"config": {
"connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
"tasks.max": "1",
"topics": "json_test_topic",
"contactPoints": "cassandra",
"loadBalancing.localDc": "datacenter1",
"port": 9042,
"auth.username": "cassandra",
"auth.password": "cassandra",
"topic.json_test_topic.kconnect_json.customer.mapping": "id=key, name=value.name, lname=value.lname, adress=value.adress",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false
}
}
Cassandra 表的详细信息:
USE kconnect_json;
CREATE TABLE customer (id TEXT PRIMARY KEY, name TEXT, lname TEXT, adress TEXT);
我发布到我的主题“json_test_topic”的示例消息:
abc:{"name":"john", "lname":"doe", "adress":"WY"}
efg:{"name":"wanda", "lname":"hill", "adress":"CA"}
在这个简单的尝试中,我想将带有 "name" field = "john" 的事件提取到我的 Cassandra 表中。我已将连接器的先前配置文件更改为:
{
"name": "cassandra-json-sink",
"config": {
"connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
"tasks.max": "1",
"topics": "json_test_topic",
"contactPoints": "cassandra",
"loadBalancing.localDc": "datacenter1",
"port": 9042,
"auth.username": "cassandra",
"auth.password": "cassandra",
"topic.json_test_topic.kconnect_json.customer.mapping": "id=key, name=value.name, lname=value.lname, adress=value.adress",
"topic.json_test_topic.kconnect_json.customer.query": "INSERT INTO kconnect_json.customer(id, name, lname, adress) SELECT :id, :name, :lname, :adress FROM topic.json_test_topic WHERE :name = 'john';",
"topic.json_test_topic.kconnect_json.musteri.deletesEnabled": false,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false
}
}
在执行此操作时,我尝试使用此文档并稍微自定义示例:https ://docs.datastax.com/en/kafka/doc/kafka/kafkaCqlQuery.html
但是这种定制不起作用,因为 Cassandra 端的查询无效。我无法在 Cassandra 上找到带有内部 SELECT 语句的 INSERT 语句,所以这可能是一个问题,但我不知道替代方案。
如果有人可以提供帮助,我会很高兴。
解决方案
Cassandra的kafka-sink连接器不支持条件映射。
正如我们之前建议您的那样,连接器可以将一个主题映射到多个 CQL 表,但不可能将条件应用于消息的映射方式。干杯!
推荐阅读
- javascript - 机器人不会在 Direct line 中自行发送欢迎消息
- python - 在 Scrapy 中,如何使用 JSON 加载项来填充新字段?
- spring-boot - Spring Data Rest 不能做集成测试?
- c++ - GCC - 如果定义了函数,如何发出警告或错误
- angular - 带有 selectionModel 的 mat-checkbox - 选中的属性不起作用
- java - 就地交换逻辑在快速排序中不起作用,但使用临时变量进行交换有效。为什么?
- haskell - Haskell 标志模式
- terraform - 切换到新工作区时,Terraform 尝试再次创建 S3 后端
- python - python for in循环n次
- arduino - 是否可以将带有 AND 比较的 if 语句放入语句或 for 循环中