cassandra - 使用 kafka connect cassandra sink 在 Cassandra 中自动生成表
问题描述
我是using confluent.connect.cassandra.CassandraSinkConnector
,对于 kafka 连接 cassandra sink。
我想知道是否可以使用io.confluent.connect.cassandra.CassandraSinkConnector
连接器从 kafka 主题自动生成 cassandra 表。
如果可能的话,您能否建议设置什么配置来启用此功能。我已经尝试了文档中提到的所有配置,但我没有成功创建表。
这是我正在使用的配置:
{
"name": "cassandra-test4",
"config": {
"connector.class": "io.confluent.connect.cassandra.CassandraSinkConnector",
"tasks.max": "3",
"topics": "orders-topic2",
"cassandra.contact.points": "my_ip",
"cassandra.keyspace": "test_cas",
"cassandra.write.mode": "Insert",
"cassandra.table.manage.enabled": "true",
"cassandra.sink.route": "test_cas.orders",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"flush.size": "1",
"cassandra.keyspace.create.enabled": "true",
"name": "cassandra-test4"
},
"tasks": [
{
"connector": "cassandra-test4",
"task": 0
},
{
"connector": "cassandra-test4",
"task": 1
},
{
"connector": "cassandra-test4",
"task": 2
}
],
"type": null
}
解决方案
这应该通过将cassandra.keyspace.create.enabled
&cassandra.table.manage.enabled
属性设置为 来完成true
。请参阅文档。
但要非常小心 - 在集群中很容易出现架构分歧,然后您需要执行额外的步骤才能从中恢复。最好在启动连接器之前预先创建表...
推荐阅读
- antlr - 在 ANTLR Lexer 规则中包含某些擒纵符号
- java - 空手道输入匹配
- common-lisp - Allegro common lisp in-package 不起作用
- node.js - Swagger 示例正文实现
- python - builtins.AttributeError: 'NoneType' 对象没有属性 'seek'
- kotlin - 除了 String() 之外,如何在 Kotlin 上将字节数组转换为字符串?
- javascript - 如何使用 patchValue 将值数组传递到一个多选列表视图中
- python - setattr 什么时候起作用?
- sql - 在子查询的外部查询中使用别名与不使用别名的区别
- sql - 动态列名 postgresql 触发器