首页 > 解决方案 > Kafka 接收器错误“此连接器要求来自 Kafka 的记录包含 Cassandra 表的密钥”

问题描述

我正在尝试使用 kafka 将从 Sap 读取的所有表同步到 cassandra 这是我的 cassandra 配置

{
    "name": "cassandra",
    "config": {
        "connector.class": "io.confluent.connect.cassandra.CassandraSinkConnector",
        "tasks.max": "5",
        "topics" :"sap_table1,sap_table2",
        "cassandra.keyspace": "sap",
        "cassandra.compression":"SNAPPY",
        "cassandra.consistency.level":"LOCAL_QUORUM",
        "cassandra.write.mode":"Update",
        "transforms":"prune", 
       "transforms.prune.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
        "transforms.prune.whitelist":"CreatedAt,Id,Text,Source,Truncated",
        "transforms.ValueToKey.fields":"ROWTIME"

    }
}

我收到此错误

Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:584) org.apache.kafka.connect.errors.DataException: Record with a null key was encountered.  This connector requires that records from Kafka contain the keys for the Cassandra table. Please use a transformation like org.apache.kafka.connect.transforms.ValueToKey to create a key with the proper fields.

从 kafka sap 连接器生成的所有表都没有密钥我不知道如果这是问题

让我知道我是否正在做任何事情

谢谢

标签: cassandraapache-kafkaapache-kafka-connectconfluent-platform

解决方案


"ROWTIME"仅作为 KSQL 概念存在。它实际上不是您的值中的字段,因此键被设置为空。

此外,ValueToKey未在列表中transforms列出,因此甚至没有被应用。您还必须添加"transforms.ValueToKey.type"

您必须使用不同的转换方法将记录时间戳设置为 ConnectRecord 消息键


推荐阅读