首页 > 解决方案 > 如何在 Flink 中附加到文件接收器

问题描述

我正在使用 Flink 1.12,并且我有以下简单的代码片段。我想在D:/Sql004_ConnectFileReadAndWrite.csv每次运行程序时附加一些数据。

当我运行程序时,我发现只有在第一次不存在文件的情况下才能写入数据。但是我想在再次运行应用程序时附加数据。

即使文件已经存在,我也会问如何将数据附加到文件中。

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}

object Sql004_ConnectFileReadAndWrite {
  def main(args: Array[String]): Unit = {
    println("Sql004_ConnectFileReadAndWrite")
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tenv = StreamTableEnvironment.create(env)
    val fmt = new Csv().fieldDelimiter(',').deriveSchema()
    val schema = new Schema()
      .field("a", DataTypes.STRING())
      .field("b", DataTypes.STRING())
      .field("c", DataTypes.STRING())
    val path = "D:/Sql004_ConnectFileReadAndWrite.csv"
    tenv.connect(new FileSystem().path(path)).withSchema(schema).withFormat(fmt).createTemporaryTable("sinkTable")



    val sourceStream = env.fromElements(("a", "b", "c"), ("d", "e", "f"))

    sourceStream.print()

    val table = tenv.fromDataStream(sourceStream).as("c1", "c2", "c3")
    table.executeInsert("sinkTable")
    env.execute("Sql004_ConnectFileReadAndWrite")

  }

}

标签: apache-flink

解决方案


Flink 的文件系统抽象不支持附加到现有文件,或覆盖部分先前写入的数据。这是因为 Flink 希望将一些对象存储(例如,S3)视为文件系统,只为所涉及的操作提供最终一致性。


推荐阅读