首页 > 解决方案 > 使用 Databricks 提高 PySpark 或 Delta Table 中的连接和模糊处理性能

问题描述

我正在使用 7 个不同的数据框,其中 3 个来自存储在容器/blob 存储中的 CSV,另外 4 个来自 sqldw 中的查询。这些数据帧来自不同的来源,但具有相同的结构

在此处输入图像描述

我在加入后执行加入并消除空值,添加一列来注册它来自哪个源。这一切都相当简单,但对于我的最后一个,我执行了状态连接,然后与名称进行模糊匹配。

在此处输入图像描述

我的数据框有大约 500k 行,但在加入后它会上升到 3000 万行,执行加入和模糊匹配需要 30 多分钟。

我阅读了有关 Delta Tables 以及它如何真正快速地处理 PB 的信息。我还没有真正理解它是如何工作的。我已经尝试从教程中创建增量文件和增量表,但是当我运行代码时,它花了一个多小时才创建增量表。

谁能帮我优化这个?使用镶木地板或增量表,不确定什么是最好的方法。

我的笔记本连接配置如下:

### STORAGE ACCOUNT ###
storage_account_name = "[storageAccount]"
container_name = "[containerName]"
storage_account_access_key = "[key1]"
temp_blob_name = "[blobPath]"

storage_account_account_key = "fs.azure.account.key.{}.blob.core.windows.net".format(storage_account_name)
inputSource = "wasbs://{}@{}.blob.core.windows.net".format(container_name, storage_account_name)

spark.conf.set(
    storage_account_account_key,
    storage_account_access_key)

spark._jsc.hadoopConfiguration().set(
    storage_account_account_key,
    storage_account_account_key)

### DW DATABASE ###
dwDatabase = "[database]"
dwServer = "[server]"
dwJdbcPort = "[port]"

dwUser = "[user]"
dwPass = "[password]"

dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
sqlDwUrl = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass + ";$dwJdbcExtraOptions"
sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass

spark.conf.set(
    "spark.sql.parquet.writeLegacyFormat",
    "true")

并阅读我的数据框:

### CSVs ###
df_csv = spark.read \
    .format('csv') \
    .option("inferSchema", "true") \
    .option("delimiter", ";") \
    .option("header", "true") \
    .load(inputSource + "/sftp_files/{}/{}".format(dateFolder, fileName))

### SQL ###
df_sql = spark.read \
    .format("com.databricks.spark.sqldw") \
    .option("url", sqlDwUrlSmall) \
    .option("tempDir", inputSource + temp_blob_name) \
    .option("forwardSparkAzureStorageCredentials", "true") \
    .option("query", queryText) \
    .load()

模糊匹配后我有这个:

df = df\
    .orderBy(['Name','State','bestScore'], ascending=[True,True,False], na_position='last')\
    .drop_duplicates(subset=['ID'])
threshold = 85
df = df.filter('bestScore >= {}'.format(threshold))

标签: apache-sparkpysparkdatabricksazure-databricks

解决方案


推荐阅读