scala - 如何使用 Flink 过滤具有公共字段(但不同模式)的镶木地板文件
问题描述
我有一个文件夹,其中包含具有不同模式的镶木地板文件,它们都有一个保证存在的公共字段。我想根据该字段过滤行并将其写回其他镶木地板文件。
spark中的类似动作将相当简单,看起来像
val filtered = rawsDF.filter(!col("id").isin(idsToDelete: _*))
问题是,如果我要扩展 ParquetInputFormat,我还必须提供可能不同的模式
ParquetInputFmt(path: Path, messageType: MessageType) extends ParquetInputFormat[User](path, messageType)
或使用这样的源函数:
class ParquetSourceFunction extends SourceFunction[String]{
override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
val inputPath = "s3a://foo/day=01/"
val conf = new Configuration()
conf.setBoolean("recursive.file.enumeration", true)
conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
val hadoopFile = HadoopInputFile.fromPath(new Path(inputPath), conf)
val readFooter = ParquetFileReader.open(hadoopFile)
val metadata = readFooter.getFileMetaData
val schema = metadata.getSchema
val parquetFileReader = new ParquetFileReader(hadoopFile, ParquetReadOptions.builder().build())
parquetFileReader.getFilteredRecordCount
var pages: PageReadStore = null
try {
while ({ pages = parquetFileReader.readNextRowGroup; pages != null }) {
val rows = pages.getRowCount
val columnIO = new ColumnIOFactory().getColumnIO(schema)
val recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema))
(0L until rows).foreach { _ =>
val group: Group = recordReader.read()
val ind = group.getType.getFieldIndex("id")
val id = group.getInteger(ind, ind)
if (!listOfIds.contains(id))
ctx.collect(?) // how can I get the original row ?
}
}
}
}
我对后者的问题是我无法获取原始数据
有任何想法吗 ?
解决方案
推荐阅读
- cakephp-4.x - CakePHP 4.0.3 中的路由问题“找不到匹配 X 的路由”
- c# - Google Firestore 设置值失败
- javascript - React JS:从 json 对象填充 rsuite 下拉列表
- python - admin.site.register(Question) doesn't work
- windows - dpinst 无法静默安装驱动程序
- c# - 尝试列出后台进程时,Xamarin.Android 中的 NulReferenceException
- java - HTTP 请求不起作用,使用 setRequestProperty 方法设置授权
- python - 在 Django 中向前端用户显示 Celery 任务活动的最佳实践
- python - 如何计算按钮的点击次数
- python - Django Rss Feed 将图像添加到描述中