首页 > 解决方案 > Flink:RichSinkFunction 的实现不可序列化

问题描述

我正在尝试将流下沉到 s3 存储桶并实现Encoder接口。

case class Info(vecId: Long, bkCode: String, state: String)

class S3Encoder extends Encoder[Info] {

    private val gson = new Gson()

    override def encode(element: Info, stream: OutputStream): Unit = {

      val json = new util.HashMap[String, Any]()

      json.put("vecId", element.vecId)
      json.put("bk", element.bkCode)
      json.put("state", element.state)

//      println(gson.toJson(json))
      stream.write(gson.toJson(json).getBytes("UTF-8"))
      stream.write('\n')

    }

我把水槽加到

val sink: StreamingFileSink[Info] = StreamingFileSink
            .forRowFormat(new Path(s3_path), new S3Encoder)
            .build()

但是遇到了错误non-serializable,任何人都可以阐明这一点吗?非常感谢!

标签: scalaapache-flink

解决方案


推荐阅读