cassandra - Flink 卡桑德拉地图> 未找到编解码器问题
问题描述
我需要map<String, list<String>>
使用 Apache flink cassandra 连接器存储在 cassandra 表中。我尝试了多种方式,但得到以下异常:
Caused by: com.datastax.driver.core.exceptions.CodecNotFoundException: Codec not found for requested operation: [list<varchar> <-> ? extends java.util.List<java.lang.String>]
at com.datastax.driver.core.CodecRegistry.notFound(CodecRegistry.java:679)
at com.datastax.driver.core.CodecRegistry.createCodec(CodecRegistry.java:532)
at com.datastax.driver.core.CodecRegistry.findCodec(CodecRegistry.java:506)
at com.datastax.driver.core.CodecRegistry.maybeCreateCodec(CodecRegistry.java:581)
at com.datastax.driver.core.CodecRegistry.createCodec(CodecRegistry.java:524)
at com.datastax.driver.core.CodecRegistry.findCodec(CodecRegistry.java:506)
at com.datastax.driver.core.CodecRegistry.access$200(CodecRegistry.java:140)
at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:211)
at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:208)
共享示例 POC 代码
import com.datastax.driver.mapping.Mapper
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.cassandra.CassandraSink
fun main() {
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val text: DataStream<WordCount> = env.fromElements(
WordCount("abc", mapOf("a" to listOf("1")))
)
CassandraSink.addSink(text).setHost("localhost")
.setMapperOptions { arrayOf(Mapper.Option.saveNullFields(true)) }
.build()
.name("write to cassandra")
env.execute()
}
import com.datastax.driver.mapping.annotations.Column
import com.datastax.driver.mapping.annotations.Frozen
import com.datastax.driver.mapping.annotations.Table
@Table(keyspace = "test", name = "word_count5")
class WordCount {
@Column(name = "name")
var name: String = ""
@Column(name = "age")
@Frozen
var age:Map<String, List<String>> = emptyMap<String, List<String>>()
constructor() {}
constructor(name: String, age: Map<String, List<String>>) {
this.name = name
this.age = age
}
override fun toString(): String {
return "WordCount(name='$name', age=$age)"
}
}
cassandra 表模式:
CREATE TABLE test.word_count5 (
name text PRIMARY KEY,
age <map<text, frozen<list<text>>>>
)
解决方案
推荐阅读
- twitter-bootstrap - Bootstrap navbar-collapse 折叠成三列
- lagom - 在生产中为 Lagom 部署动态服务网关
- c++ - C++ 嵌套类是封装的正确方法吗?
- swift - SwiftUI:如何使用导航链接呈现弹出窗口
- java - 有没有办法将所有 ObjectOutputStream 连接到一个输入流?
- latex - somene如何在投影仪中并排创建两个表?
- javascript - 如何在循环中应用由 JavaScript 更改的 CSS 代码?
- iphone - 在移动设备上滑动时,猫头鹰旋转木马 2 滑块崩溃(Iphone 11 pro)
- javascript - 如何用数组值重构reduce
- c# - 尝试发送 SMTP 电子邮件 - 连接被拒绝