apache-flink - 如何在 Flink 中附加到文件接收器
问题描述
我正在使用 Flink 1.12,并且我有以下简单的代码片段。我想在D:/Sql004_ConnectFileReadAndWrite.csv
每次运行程序时附加一些数据。
当我运行程序时,我发现只有在第一次不存在文件的情况下才能写入数据。但是我想在再次运行应用程序时附加数据。
即使文件已经存在,我也会问如何将数据附加到文件中。
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
object Sql004_ConnectFileReadAndWrite {
def main(args: Array[String]): Unit = {
println("Sql004_ConnectFileReadAndWrite")
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tenv = StreamTableEnvironment.create(env)
val fmt = new Csv().fieldDelimiter(',').deriveSchema()
val schema = new Schema()
.field("a", DataTypes.STRING())
.field("b", DataTypes.STRING())
.field("c", DataTypes.STRING())
val path = "D:/Sql004_ConnectFileReadAndWrite.csv"
tenv.connect(new FileSystem().path(path)).withSchema(schema).withFormat(fmt).createTemporaryTable("sinkTable")
val sourceStream = env.fromElements(("a", "b", "c"), ("d", "e", "f"))
sourceStream.print()
val table = tenv.fromDataStream(sourceStream).as("c1", "c2", "c3")
table.executeInsert("sinkTable")
env.execute("Sql004_ConnectFileReadAndWrite")
}
}
解决方案
Flink 的文件系统抽象不支持附加到现有文件,或覆盖部分先前写入的数据。这是因为 Flink 希望将一些对象存储(例如,S3)视为文件系统,只为所涉及的操作提供最终一致性。
推荐阅读
- powershell - 用于将所有文件从 pc dir 发送到 ftp 的 Power shell 脚本
- python - Numpy - 生成随机数组,限制数组中的单个值 - 投资组合优化
- android - 在 webview (RNCWebViewManager) 中创建 AlertDialg 的问题
- git - git diff 中的 --oneline 有什么作用?
- salesforce - 未找到模块:错误:无法解析“lightning/platformResourceLoader”
- python - 将法国数字格式化为英文数字 - Python
- java - 我可以创建自定义注释以使用其他字段名称初始化静态变量吗?
- c - 使用 Pipes 创建 N 个子节点并向父节点发送消息
- python - 在 pyspark 中的 StringIndexer/OneHotEncoder/VectorAssembler 之后使用 StandardScaler 时出错
- c# - 使用 MaxDegreeOfParallism 进行并行 foreach,有时会在两者之间停止进程