> 未找到编解码器问题,cassandra,apache-flink,cassandra-3.0"/>

首页 > 解决方案 > Flink 卡桑德拉地图> 未找到编解码器问题

问题描述

我需要map<String, list<String>>使用 Apache flink cassandra 连接器存储在 cassandra 表中。我尝试了多种方式,但得到以下异常:

Caused by: com.datastax.driver.core.exceptions.CodecNotFoundException: Codec not found for requested operation: [list<varchar> <-> ? extends java.util.List<java.lang.String>]
    at com.datastax.driver.core.CodecRegistry.notFound(CodecRegistry.java:679)
    at com.datastax.driver.core.CodecRegistry.createCodec(CodecRegistry.java:532)
    at com.datastax.driver.core.CodecRegistry.findCodec(CodecRegistry.java:506)
    at com.datastax.driver.core.CodecRegistry.maybeCreateCodec(CodecRegistry.java:581)
    at com.datastax.driver.core.CodecRegistry.createCodec(CodecRegistry.java:524)
    at com.datastax.driver.core.CodecRegistry.findCodec(CodecRegistry.java:506)
    at com.datastax.driver.core.CodecRegistry.access$200(CodecRegistry.java:140)
    at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:211)
    at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:208)

共享示例 POC 代码

import com.datastax.driver.mapping.Mapper

import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.cassandra.CassandraSink


fun main() {
  val env = StreamExecutionEnvironment.getExecutionEnvironment()
  val text: DataStream<WordCount> = env.fromElements(
    WordCount("abc", mapOf("a" to listOf("1")))
  )
  CassandraSink.addSink(text).setHost("localhost")
    .setMapperOptions { arrayOf(Mapper.Option.saveNullFields(true)) }
    .build()
    .name("write to cassandra")
  env.execute()
}


import com.datastax.driver.mapping.annotations.Column
import com.datastax.driver.mapping.annotations.Frozen
import com.datastax.driver.mapping.annotations.Table

@Table(keyspace = "test", name = "word_count5")
class WordCount {
  @Column(name = "name")
  var name: String = ""

  @Column(name = "age")
  @Frozen
  var age:Map<String, List<String>> = emptyMap<String, List<String>>()
  constructor() {}

  constructor(name: String, age: Map<String, List<String>>) {
    this.name = name
    this.age = age
  }

  override fun toString(): String {
    return "WordCount(name='$name', age=$age)"
  }
  
}

cassandra 表模式:

CREATE TABLE test.word_count5 (
    name text PRIMARY KEY,
    age <map<text, frozen<list<text>>>>
)

标签: cassandraapache-flinkcassandra-3.0

解决方案


推荐阅读