scala - Flink:RichSinkFunction 的实现不可序列化
问题描述
我正在尝试将流下沉到 s3 存储桶并实现Encoder
接口。
case class Info(vecId: Long, bkCode: String, state: String)
class S3Encoder extends Encoder[Info] {
private val gson = new Gson()
override def encode(element: Info, stream: OutputStream): Unit = {
val json = new util.HashMap[String, Any]()
json.put("vecId", element.vecId)
json.put("bk", element.bkCode)
json.put("state", element.state)
// println(gson.toJson(json))
stream.write(gson.toJson(json).getBytes("UTF-8"))
stream.write('\n')
}
我把水槽加到
val sink: StreamingFileSink[Info] = StreamingFileSink
.forRowFormat(new Path(s3_path), new S3Encoder)
.build()
但是遇到了错误non-serializable
,任何人都可以阐明这一点吗?非常感谢!
解决方案
推荐阅读
- php - 子文件夹中路径wordpress的问题
- powershell - 从每行的输出中删除字符
- reactjs - 带有 react-virtuoso 的 Material UI 自动完成虚拟化
- c++ - 警告:删除数组“.......” munmap_chunk():无效指针 命令由信号 6 终止
- c# - 如何根据类型 ID 在 C# 中增量分配整数(如最后一列)
- android - android应用程序在电池电量高于一定百分比时显示通知
- python - 在继承的类数据结构中更新 self
- javascript - 如何在单独的 html 页面上显示来自提交表单的计算值?
- gitlab - 如何获取 GitLab 上所有存储库的用户总提交数?
- java - Play中的请求之前和之后