cassandra - Kafka 接收器错误“此连接器要求来自 Kafka 的记录包含 Cassandra 表的密钥”
问题描述
我正在尝试使用 kafka 将从 Sap 读取的所有表同步到 cassandra 这是我的 cassandra 配置
{
"name": "cassandra",
"config": {
"connector.class": "io.confluent.connect.cassandra.CassandraSinkConnector",
"tasks.max": "5",
"topics" :"sap_table1,sap_table2",
"cassandra.keyspace": "sap",
"cassandra.compression":"SNAPPY",
"cassandra.consistency.level":"LOCAL_QUORUM",
"cassandra.write.mode":"Update",
"transforms":"prune",
"transforms.prune.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.prune.whitelist":"CreatedAt,Id,Text,Source,Truncated",
"transforms.ValueToKey.fields":"ROWTIME"
}
}
我收到此错误
Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:584) org.apache.kafka.connect.errors.DataException: Record with a null key was encountered. This connector requires that records from Kafka contain the keys for the Cassandra table. Please use a transformation like org.apache.kafka.connect.transforms.ValueToKey to create a key with the proper fields.
从 kafka sap 连接器生成的所有表都没有密钥我不知道如果这是问题
让我知道我是否正在做任何事情
谢谢
解决方案
"ROWTIME"
仅作为 KSQL 概念存在。它实际上不是您的值中的字段,因此键被设置为空。
此外,ValueToKey
未在列表中transforms
列出,因此甚至没有被应用。您还必须添加"transforms.ValueToKey.type"
。
您必须使用不同的转换方法将记录时间戳设置为 ConnectRecord 消息键
推荐阅读
- java - 在 TextureView 上录制来自 rtsp 的视频
- sql-server - DB2 中的查询问题
- actions-on-google - 我的谷歌操作已部署,但在谷歌助手中使用时仍显示测试版本
- node.js - 如何在某些应用程序上实现基于用户的权限?
- python-3.x - 读取文件时跳过带有奇怪字符的行
- c# - 无法使用c#从Excel文件中删除空行
- java - 错误未从被调用服务传播到调用者服务
- javascript - 我如何编辑 Li 项目以使用 javascript 向其中添加文本
- optimization - 是否可以在 Google Apps 脚本中批量处理范围保护?
- java - 如何使用 Reactive Kafka 将字符串数据发送到 kafka 生产者?