apache-spark - 使用 Databricks 提高 PySpark 或 Delta Table 中的连接和模糊处理性能
问题描述
我正在使用 7 个不同的数据框,其中 3 个来自存储在容器/blob 存储中的 CSV,另外 4 个来自 sqldw 中的查询。这些数据帧来自不同的来源,但具有相同的结构
我在加入后执行加入并消除空值,添加一列来注册它来自哪个源。这一切都相当简单,但对于我的最后一个,我执行了状态连接,然后与名称进行模糊匹配。
我的数据框有大约 500k 行,但在加入后它会上升到 3000 万行,执行加入和模糊匹配需要 30 多分钟。
我阅读了有关 Delta Tables 以及它如何真正快速地处理 PB 的信息。我还没有真正理解它是如何工作的。我已经尝试从教程中创建增量文件和增量表,但是当我运行代码时,它花了一个多小时才创建增量表。
谁能帮我优化这个?使用镶木地板或增量表,不确定什么是最好的方法。
我的笔记本连接配置如下:
### STORAGE ACCOUNT ###
storage_account_name = "[storageAccount]"
container_name = "[containerName]"
storage_account_access_key = "[key1]"
temp_blob_name = "[blobPath]"
storage_account_account_key = "fs.azure.account.key.{}.blob.core.windows.net".format(storage_account_name)
inputSource = "wasbs://{}@{}.blob.core.windows.net".format(container_name, storage_account_name)
spark.conf.set(
storage_account_account_key,
storage_account_access_key)
spark._jsc.hadoopConfiguration().set(
storage_account_account_key,
storage_account_account_key)
### DW DATABASE ###
dwDatabase = "[database]"
dwServer = "[server]"
dwJdbcPort = "[port]"
dwUser = "[user]"
dwPass = "[password]"
dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
sqlDwUrl = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass + ";$dwJdbcExtraOptions"
sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass
spark.conf.set(
"spark.sql.parquet.writeLegacyFormat",
"true")
并阅读我的数据框:
### CSVs ###
df_csv = spark.read \
.format('csv') \
.option("inferSchema", "true") \
.option("delimiter", ";") \
.option("header", "true") \
.load(inputSource + "/sftp_files/{}/{}".format(dateFolder, fileName))
### SQL ###
df_sql = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", sqlDwUrlSmall) \
.option("tempDir", inputSource + temp_blob_name) \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("query", queryText) \
.load()
模糊匹配后我有这个:
df = df\
.orderBy(['Name','State','bestScore'], ascending=[True,True,False], na_position='last')\
.drop_duplicates(subset=['ID'])
threshold = 85
df = df.filter('bestScore >= {}'.format(threshold))
解决方案
推荐阅读
- flutter - 为什么在 Fluttter 中调用加载状态后调用 loadInitTopStory 方法?
- sockets - 处理来自 Socket 的异构信息
- c# - SignalR 客户端未收到服务器消息
- docker - 如何更改 Zabbix 代理以监控 tls 安全的 Docker 守护进程套接字
- json - Ionic3 - 使用 Sqlite 同步大 JSON 文件数据的最快方法
- javascript - 如何在 ExpressJS 中添加内容安全策略以从 CDN 加载外部 JS 文件?
- python - ValueError:无法挤压暗淡 [1],预期尺寸为 1,输入形状为 '{{node ctc/Squeeze}} 得到 3:[?,3]
- javascript - 如何读取txt文件并将其保存在html中javascript中的数组中
- javascript - 如何在地图中获取 X 项?
- c++ - 错误 LNK2001:Visual Studio 2019 中未解析的外部符号 CWbemProviderGlue::FrameworkLogoffDLL(wchar_t const *)"