pyspark - 在 Apache Hudi 中删除 - 胶水作业
问题描述
我必须构建一个胶水作业来更新和删除 Athena 表中的旧行。当我运行我的工作以删除它时,它会返回一个错误:
AnalysisException: 'Unable to infer schema for Parquet. It must be specified manually.;'
我的胶水工作:
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "test-database", table_name = "test_table", transformation_ctx = "datasource0")
datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "test-database", table_name = "test_table_output", transformation_ctx = "datasource1")
datasource0.toDF().createOrReplaceTempView("view_dyf")
datasource1.toDF().createOrReplaceTempView("view_dyf_output")
ds = spark.sql("SELECT * FROM view_dyf_output where id in (select id from view_dyf where op like 'D')")
hudi_delete_options = {
'hoodie.table.name': 'test_table_output',
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.table.name': 'test_table_output',
'hoodie.datasource.write.operation': 'delete',
'hoodie.datasource.write.precombine.field': 'name',
'hoodie.upsert.shuffle.parallelism': 1,
'hoodie.insert.shuffle.parallelism': 1
}
from pyspark.sql.functions import lit
deletes = list(map(lambda row: (row[0], row[1]), ds.collect()))
df = spark.sparkContext.parallelize(deletes).toDF(['id']).withColumn('name', lit(0.0))
df.write.format("hudi"). \
options(**hudi_delete_options). \
mode("append"). \
save('s3://data/test-output/')
roAfterDeleteViewDF = spark. \
read. \
format("hudi"). \
load("s3://data/test-output/")
roAfterDeleteViewDF.registerTempTable("test_table_output")
spark.sql("SELECT * FROM view_dyf_output where id in (select distinct id from view_dyf where op like 'D')").count()
我有 2 个数据源;第一个旧的 Athena 表,其中数据必须更新或删除,第二个表是新的更新或删除的数据。
在ds
我选择了旧表中必须删除的所有行。
op
用于操作;'D' 表示删除,'U' 表示更新。
有谁知道我在这里想念什么?
解决方案
hoodie.datasource.write.operation 的值在您的代码中无效,支持的写入操作为:UPSERT/Insert/Bulk_insert。检查Hudi 文档。
另外,您删除记录的意图是什么:硬删除还是软删除?对于硬删除,您必须提供 {'hoodie.datasource.write.payload.class': 'org.apache.hudi.common.model.EmptyHoodieRecordPayload}
推荐阅读
- java - springboot jpa 实体关系映射
- php - PHP - 从 XML 内容中获取值
- php - Laravel 8 的 @include 问题和来自控制器的变量
- node.js - cookie-session 未显示在浏览器 Cookie 上
- c++ - 为什么我们需要 std::initializer_list 的私有构造函数?
- powershell - DataBricks API Powershell
- asp.net-core - 使用 .NET Core 在 Mac OS 上进行 Windows 身份验证
- excel - 将宏从一个 Excel 工作表添加到另一个工作表
- javascript - node.js tmi.js twitch bot 对象返回 [object, Object]
- git - `git branch -a` 的输出中的颜色是什么意思?