scala - Flink StreamingFileSink RowFormatBuilder with BucketAssigner 返回 Any?
问题描述
为什么这个配置会导致 Any 类型?我不能调用 .build()!我的 flink 版本是 1.10.0,scala 版本是 2.11 Link to screenshot
val sink = StreamingFileSink
.forRowFormat(new Path("s3a://123"), csvEncoder)
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMinutes(5))
.withInactivityInterval(TimeUnit.MINUTES.toMinutes(5))
.withMaxPartSize(128 * 1024 * 1024)
.build()
)
.withBucketAssigner(
new BucketAssigner[UserEvent, String] {
override def getBucketId(element: UserEvent, context: BucketAssigner.Context): String = element.getType.name
override def getSerializer: SimpleVersionedSerializer[String] = new SimpleVersionedStringSerializer
}
) // this returns Any!!!
.build() // can't call .build()
解决方案
问题在于 Scala 的类型推断与构建者使用的自我类型习语相结合StreamingFileSink
。
作为快速修复,您可以插入演员表:
val sink = StreamingFileSink
.forRowFormat(new Path("s3a://123"), csvEncoder)
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMinutes(5))
.withInactivityInterval(TimeUnit.MINUTES.toMinutes(5))
.withMaxPartSize(128 * 1024 * 1024)
.build()
)
.withBucketAssigner(
new BucketAssigner[UserEvent, String] {
override def getBucketId(element: UserEvent, context: BucketAssigner.Context): String = element.getType.name
override def getSerializer: SimpleVersionedSerializer[String] = new SimpleVersionedStringSerializer
}
).asInstanceOf[StreamingFileSink.RowFormatBuilder[UserEvent, String, _]]
.build()
正确的修复需要对 Flink 进行更改。您可以跟踪FLINK-16684以在问题得到正确解决时得到通知。
更新
Flink 1.10.1 和 1.11.0 已修复该问题。
推荐阅读
- html - 如何使图片适合半圆
- javascript - express js内部服务器错误500,代码中没有错误
- html - 在 RStudio 中从 HTML 中检索数据
- python - JupyterLab 中的散景滑块未更新
- html - 如何对静态服务内容使用 jwt 身份验证?
- django - 通过无服务器运行 Django 时出现 Psycopg2 错误
- python - 通过部分字符串在熊猫数据框列表中查找字符串
- python - TypeError:预期的字符串或类似字节的对象“
- ms-word - Rmd to word with flex table - 只需要增加单词输出中表格的边距
- asp.net - 关于 ibm.data.db2 nuget 包