首页 > 解决方案 > 如何使用 Flink 过滤具有公共字段(但不同模式)的镶木地板文件

问题描述

我有一个文件夹,其中包含具有不同模式的镶木地板文件,它们都有一个保证存在的公共字段。我想根据该字段过滤行并将其写回其他镶木地板文件。

spark中的类似动作将相当简单,看起来像

val filtered = rawsDF.filter(!col("id").isin(idsToDelete: _*))

问题是,如果我要扩展 ParquetInputFormat,我还必须提供可能不同的模式

ParquetInputFmt(path: Path,  messageType: MessageType) extends ParquetInputFormat[User](path, messageType)

或使用这样的源函数:

class ParquetSourceFunction extends SourceFunction[String]{
  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
     val inputPath = "s3a://foo/day=01/"
    val conf = new Configuration()
    conf.setBoolean("recursive.file.enumeration", true)
    conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")

    val hadoopFile = HadoopInputFile.fromPath(new Path(inputPath), conf)
    val readFooter = ParquetFileReader.open(hadoopFile)
    val metadata = readFooter.getFileMetaData
    val schema = metadata.getSchema
    val parquetFileReader = new ParquetFileReader(hadoopFile, ParquetReadOptions.builder().build())
    parquetFileReader.getFilteredRecordCount
    var pages: PageReadStore = null
    try {
      while ({ pages = parquetFileReader.readNextRowGroup; pages != null }) {
        val rows = pages.getRowCount
        val columnIO = new ColumnIOFactory().getColumnIO(schema)
        val recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema))
        (0L until rows).foreach { _ =>
          val group: Group = recordReader.read()
          val ind = group.getType.getFieldIndex("id")
          val id = group.getInteger(ind, ind)
         if (!listOfIds.contains(id))
              ctx.collect(?) // how can I get the original row ?

        }
      }
    }
  }
    

我对后者的问题是我无法获取原始数据

有任何想法吗 ?

标签: scalaapache-flinkparquetflink-streamingflink-batch

解决方案


推荐阅读