首页 > 解决方案 > 执行合并操作时 databricks 增量表的性能改进 (upsert)

问题描述

我有一个包含 350 亿条记录的增量表“targetTable”。每天我从源获得 1 亿条记录,我必须对 targetTable 执行 upsert 操作。

目标表:

截至目前 - 完成这项工作需要 45 分钟 - 1 小时,而且还在不断增加。

任何建议是否使用 Z 顺序或文件分区以获得更好的性能分开或任何其他

标签: apache-sparkmergedatabricksazure-databricksdelta

解决方案


您可以使用并行线程概念。请找到我们为 s3 编写处理的示例代码。您可以为 adls 写作调整相同的逻辑。

dbcurl = f"jdbc:teradata://{server}/database={db}, TMODE=TERA"
driver = "com.teradata.jdbc.TeraDriver"
query = query.replace("startdt", "'"+start_date+"'").replace("enddt", "'"+end_date+"'")
print(f"Query - {query}")
data_df = spark.read \
          .format('jdbc') \
          .options(url= jdbcurl, user= username,password= password, query=query, driver= driver,numPartitions=100) \
          .option('customSchema', schema[1:-1]) \
          .option('ConnectionRetries', '3') \
          .option('ConnectionRetryInterval', '2000') \
          .option("fetchSize",1000000) \
          .load()

print(data_df.count())




# DBTITLE 1,Multithreaded S3 raw/server write
from datetime import timedelta, date,datetime
from concurrent import futures
from pyspark.sql import functions as F

def daterange(start_date, end_date):
    for n in range(int((end_date - start_date).days)):
        yield start_date + timedelta(n)

def writeS3(curr_date):
  print(f"Starting S3 write for date - {curr_date}")
  curr_df = data_df.filter(f"dt_id='{curr_date}'")
  print(f"curr_date - {curr_date} and count - {curr_df.count()}")
  curr_df.write.format(format).mode("overwrite").option("replaceWhere", f"{partition_column}= '{curr_date}'").partitionBy(partition_column).save(f"{raw_bucket}/{db}/{table}/")
  serve_df = curr_df.withColumn('az_ld_ts', F.current_timestamp())
  serve_df.write.format(format).mode("overwrite").option("replaceWhere", f"{partition_column}= '{curr_date}'").partitionBy(partition_column).save(f"{serve_bucket}/{db}/{table}/")
  print(f"completed for {curr_date}")


start_date = datetime.strptime(start_date, "%Y-%m-%d")
end_date = datetime.strptime(end_date, "%Y-%m-%d")
total_days = abs(end_date-start_date).days
print(f"total days - {total_days}. Creating {total_days} threads..")

jobs = []
results_done = []

with futures.ThreadPoolExecutor(max_workers=total_days+1) as e:
  print(f"{raw_bucket}/{db}/{table}/")
  for single_date in daterange(start_date, end_date):
    curr_date = single_date.strftime("%Y-%m-%d")
    jobs.append(e.submit(writeS3, curr_date))

  for job in futures.as_completed(jobs):
    result_done = job.result()
    print(f"Job Completed - {result_done}")

print("Task complete")

推荐阅读