首页 > 解决方案 > DataStax Cassandra Sink Connector - 根据条件从 Kafka 主题中摄取数据

问题描述

我正在尝试将 Kafka 主题中的数据实时提取到 Cassandra 表中。为此,我使用 DataStax Cassandra Sink 连接器。主题中事件的格式将是 JSON。我可以将主题中事件的 JSON 字段直接映射到表中,但这并不是我所需要的。

我的场景是这样的:有一个主题需要映射到多个 Cassandra 表。主题中的事件应该可以根据某些条件进入表格。假设有具有不同类型标头 A、B 和 C 的事件。具有标头 A 的事件需要转到 Cassandra 表 A,标头 B 事件需要转到表 B,标头 C 到表 C。在连接器配置 JSON 文件中,我我能做到这一点吗?

我对单个主题和单个表进行了一些尝试,并尝试在“ID”字段上提取具有特定条件的事件。

我当前的连接器配置文件:

{
  "name": "cassandra-json-sink",
  "config": {
    "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
    "tasks.max": "1",
    "topics": "json_test_topic",
    "contactPoints": "cassandra",
    "loadBalancing.localDc": "datacenter1",
    "port": 9042,
    "auth.username": "cassandra",
    "auth.password": "cassandra",
    "topic.json_test_topic.kconnect_json.customer.mapping": "id=key, name=value.name, lname=value.lname, adress=value.adress",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": false,
    "value.converter.schemas.enable": false
  }
}

Cassandra 表的详细信息:

USE kconnect_json;
CREATE TABLE customer (id TEXT PRIMARY KEY, name TEXT, lname TEXT, adress TEXT);

我发布到我的主题“json_test_topic”的示例消息:

abc:{"name":"john", "lname":"doe", "adress":"WY"}
efg:{"name":"wanda", "lname":"hill", "adress":"CA"}

在这个简单的尝试中,我想将带有 "name" field = "john" 的事件提取到我的 Cassandra 表中。我已将连接器的先前配置文件更改为:

{
  "name": "cassandra-json-sink",
  "config": {
    "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
    "tasks.max": "1",
    "topics": "json_test_topic",
    "contactPoints": "cassandra",
    "loadBalancing.localDc": "datacenter1",
    "port": 9042,
    "auth.username": "cassandra",
    "auth.password": "cassandra",
    "topic.json_test_topic.kconnect_json.customer.mapping": "id=key, name=value.name, lname=value.lname, adress=value.adress",
    "topic.json_test_topic.kconnect_json.customer.query": "INSERT INTO kconnect_json.customer(id, name, lname, adress) SELECT :id, :name, :lname, :adress FROM topic.json_test_topic WHERE :name = 'john';",
    "topic.json_test_topic.kconnect_json.musteri.deletesEnabled": false,
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": false,
    "value.converter.schemas.enable": false
  }
}

在执行此操作时,我尝试使用此文档并稍微自定义示例:https ://docs.datastax.com/en/kafka/doc/kafka/kafkaCqlQuery.html

但是这种定制不起作用,因为 Cassandra 端的查询无效。我无法在 Cassandra 上找到带有内部 SELECT 语句的 INSERT 语句,所以这可能是一个问题,但我不知道替代方案。

如果有人可以提供帮助,我会很高兴。

标签: apache-kafkacassandraapache-kafka-connect

解决方案


Cassandra的kafka-sink连接器不支持条件映射。

正如我们之前建议您的那样,连接器可以将一个主题映射到多个 CQL 表,但不可能将条件应用于消息的映射方式。干杯!


推荐阅读