python - 使用 pyspark 插入数据块
问题描述
我正在尝试创建一个 df 并将其存储为增量表并尝试执行 upsert。我在网上找到了此功能,但只是对其进行了修改以适合我尝试使用的路径。
delta_store='s3://raw_data/ETL_test/Delta/'
我创建的 df
Employee = Row("id", "FirstName", "LastName", "Email")
employee1 = Employee('1', 'Basher', 'armbrust', 'bash@gmail.com')
employee2 = Employee('2', 'Daniel', 'meng', 'daniel@stanford.edu')
employee3 = Employee('3', 'Muriel', None, 'muriel@waterloo.edu')
employee4 = Employee('4', 'Rachel', 'wendell', 'rach_3@imaginea.com')
employee5 = Employee('5', 'Zach', 'galifianakis', 'zach_g@pramati.co')
employee6 = Employee('6', 'Ramesh', 'Babu', 'ramesh@pramati.co')
employee7 = Employee('7', 'Bipul', 'Kumar', 'bipul@pramati.co')
employee8 = Employee('8', 'Sampath', 'Kumar', 'sam@pramati.co')
employee9 = Employee('9', 'Anil', 'Reddy', 'anil@pramati.co')
employee10 = Employee('10', 'Mageswaran', 'Dhandapani', 'mageswaran@pramati.co')
compacted_df = spark.createDataFrame([employee1, employee2, employee3, employee4, employee5, employee6, employee7, employee8, employee9, employee10])
display(compacted_df)
插入函数:
def upsert(df, path=DELTA_STORE, is_delete=False):
"""
Stores the Dataframe as Delta table if the path is empty or tries to merge the data if found
df : Dataframe
path : Delta table store path
is_delete: Delete the path directory
"""
if is_delete:
dbutils.fs.rm(path, True)
if os.path.exists(path):
print("Modifying existing table...")
delta_table = DeltaTable.forPath(spark,delta_store)
match_expr = "delta.{} = updates.{}".format("id", "id") and "delta.{} = updates.{}".format("FirstName", "FirstName")
delta_table.alias("delta").merge(
df.alias("updates"), match_expr) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
else:
print("Creating new Delta table")
df.write.format("delta").save(delta_store)
然后我运行以下代码来修改数据并遇到如下错误:
employee14 = Employee('2', 'Daniel', 'Dang', 'ddang@stanford.edu')
employee15 = Employee('15', 'Anitha', 'Ramasamy', 'anitha@pramati.co')
ingestion_updates_df = spark.createDataFrame([employee14, employee15])
upsert(df=ingestion_updates_df, is_delete=False)
错误:
AnalysisException: s3://raw_data/ETL_test/Delta already exists.
有人可以解释我在这里做错了什么吗?
解决方案
这可能只是一个 python - S3 逻辑错误。
这os.path.exists(path)
可能总是返回 false,因为它只理解 posix 文件系统而不是 S3 blob 存储路径。
在第二次进入您的函数时,您的代码将沿着ELSE
分支向下并最终尝试(再次)保存到相同的路径而不使用.mode("OVERWRITE")
选项。
推荐阅读
- android - 在android flow + room中,如何监控一个Flow列表
- javascript - 使用 ajax 调用从 JSON 获取数据到 HTML 表
- haskell - 比较记录的字段
- java - 有什么方法可以避免在 android embed realmobject 中出现重复?
- asp.net-web-api - 谁能告诉我如何在主键类型为 uniqueidentifier 的 sql server 中编写更新存储过程?
- sql - 阻止插入操作并在 oracle 中记录对另一个审计表的尝试
- python - 如何在pygame中改变蛇头方向?
- python - Pygame TypeError: rect 参数无效 - python 速成课程
- sql - SELECT DISTINCT 不正确的名称
- javascript - 请解释javascript函数代码行的逻辑