apache-spark - 执行合并操作时 databricks 增量表的性能改进 (upsert)
问题描述
我有一个包含 350 亿条记录的增量表“targetTable”。每天我从源获得 1 亿条记录,我必须对 targetTable 执行 upsert 操作。
目标表:
- 25列
- 250 亿条记录
- 1 列 commID,每行唯一
截至目前 - 完成这项工作需要 45 分钟 - 1 小时,而且还在不断增加。
任何建议是否使用 Z 顺序或文件分区以获得更好的性能分开或任何其他
解决方案
您可以使用并行线程概念。请找到我们为 s3 编写处理的示例代码。您可以为 adls 写作调整相同的逻辑。
dbcurl = f"jdbc:teradata://{server}/database={db}, TMODE=TERA"
driver = "com.teradata.jdbc.TeraDriver"
query = query.replace("startdt", "'"+start_date+"'").replace("enddt", "'"+end_date+"'")
print(f"Query - {query}")
data_df = spark.read \
.format('jdbc') \
.options(url= jdbcurl, user= username,password= password, query=query, driver= driver,numPartitions=100) \
.option('customSchema', schema[1:-1]) \
.option('ConnectionRetries', '3') \
.option('ConnectionRetryInterval', '2000') \
.option("fetchSize",1000000) \
.load()
print(data_df.count())
# DBTITLE 1,Multithreaded S3 raw/server write
from datetime import timedelta, date,datetime
from concurrent import futures
from pyspark.sql import functions as F
def daterange(start_date, end_date):
for n in range(int((end_date - start_date).days)):
yield start_date + timedelta(n)
def writeS3(curr_date):
print(f"Starting S3 write for date - {curr_date}")
curr_df = data_df.filter(f"dt_id='{curr_date}'")
print(f"curr_date - {curr_date} and count - {curr_df.count()}")
curr_df.write.format(format).mode("overwrite").option("replaceWhere", f"{partition_column}= '{curr_date}'").partitionBy(partition_column).save(f"{raw_bucket}/{db}/{table}/")
serve_df = curr_df.withColumn('az_ld_ts', F.current_timestamp())
serve_df.write.format(format).mode("overwrite").option("replaceWhere", f"{partition_column}= '{curr_date}'").partitionBy(partition_column).save(f"{serve_bucket}/{db}/{table}/")
print(f"completed for {curr_date}")
start_date = datetime.strptime(start_date, "%Y-%m-%d")
end_date = datetime.strptime(end_date, "%Y-%m-%d")
total_days = abs(end_date-start_date).days
print(f"total days - {total_days}. Creating {total_days} threads..")
jobs = []
results_done = []
with futures.ThreadPoolExecutor(max_workers=total_days+1) as e:
print(f"{raw_bucket}/{db}/{table}/")
for single_date in daterange(start_date, end_date):
curr_date = single_date.strftime("%Y-%m-%d")
jobs.append(e.submit(writeS3, curr_date))
for job in futures.as_completed(jobs):
result_done = job.result()
print(f"Job Completed - {result_done}")
print("Task complete")
推荐阅读
- android - 未解决的参考:kotlin 中的硬币
- c++ - 如何比较catch2中的浮点数
- python - 每晚导入事件数据的过程,其 ID 每 45 天循环一次,每个 ID 具有最大值和最小值
- python - 如何通过两个限制之间的列值索引数据框
- python - 如何导出具有特定列数的文件的行
- node.js - 如何在不接受 Atlassian 设计指南许可的情况下使用 npm 包的替代实现来使用 Atlaskit
- javascript - 如何使用 NodeJS 脚本中的 firebase-tools?
- angular - 在日期末尾用小写字母 h 格式化 Angular 9 中的日期
- python - Python 字典 - 如果 dict 值为 None,则 dict.get() 返回值
- php - 如何将 PHP 变量传递给触发表?