首页 > 解决方案 > PySpark 使用 Delta 表 - 使用 Union 进行循环优化

问题描述

我目前在数据块中工作,并且有一个包含 20 多列的增量表。我基本上需要从每一行的 1 列中获取一个值,将其发送到一个返回两个值/列的 api,然后创建另外 26 个以将值合并回原始增量表。所以输入是 28 列,输出是 28 列。目前我的代码如下所示:

from pyspark.sql.types import *
from pyspark.sql import functions as F
import requests, uuid, json
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.functions import col,lit
from functools import reduce

spark.conf.set("spark.sql.adaptive.enabled","true")
spark.conf.set("spark.databricks.adaptive.autoOptimizeShuffle.enabled", "true")
spark.sql('set spark.sql.execution.arrow.pyspark.enabled = true')
spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true")
spark.conf.set("spark.sql.parquet.compression.codec","gzip")
spark.conf.set("spark.sql.inMemorycolumnarStorage.compressed","true")
spark.conf.set("spark.databricks.optimizer.dynamicFilePruning","true");

output=spark.sql("select * from delta.`table`").cache()

SeriesAppend=[]

for i in output.collect():
    #small mapping fix
    if i['col1']=='val1':
      var0='a'
    elif i['col1']=='val2':
      var0='b'
    elif i['col1']=='val3':
      var0='c'
    elif i['col1']=='val4':
      var0='d'

    var0=set([var0])
    req_var = set(['a','b','c','d'])
    var_list=list(req_var-var0)

    #subscription info

    headers = {header}

    body = [{
      'text': i['col2']
    }]
    
    if len(i['col2'])<500:
      request = requests.post(constructed_url, params=params, headers=headers, json=body)
      response = request.json()
      dumps=json.dumps(response[0])
      loads = json.loads(dumps)
      json_rdd = sc.parallelize(loads)
      json_df = spark.read.json(json_rdd)
      json_df = json_df.withColumn('col1',lit(i['col1']))
      json_df = json_df.withColumn('col2',lit(i['col2']))
      json_df = json_df.withColumn('col3',lit(i['col3']))
      ...
      SeriesAppend.append(json_df)
    
    else:
      pass

Series_output=reduce(DataFrame.unionAll, SeriesAppend)

SAMPLE DF 只有 3 列:

df = spark.createDataFrame(
    [
        ("a", "cat","owner1"),  # create your data here, be consistent in the types.
        ("b", "dog","owner2"),
        ("c", "fish","owner3"),
        ("d", "fox","owner4"),
        ("e", "rat","owner5"),
    ],
    ["col1", "col2", "col3"])  # add your column names here

我真的只需要将响应 + 其他列值写入 delta 表,因此不一定需要数据帧,但没有找到比上述更快的方法。现在,我可以运行 5 个输入,如果没有 unionAll,它会在 25.3 秒内返回 15 个。随着工会的加入,它变成了 3 分钟。

最终输出如下所示:

df = spark.createDataFrame(
    [
        ("a", "cat","owner1","MI", 48003),  # create your data here, be consistent in the types.
        ("b", "dog","owner2", "MI", 48003),
        ("c", "fish","owner3","MI", 48003),
        ("d", "fox","owner4","MI", 48003),
        ("e", "rat","owner5","MI", 48003),
    ],
    ["col1", "col2", "col3", "col4", "col5"])  # add your column names here

我怎样才能在火花中更快地做到这一点?

标签: pysparkdatabricksunion-alldelta-lake

解决方案


正如我在评论中提到的,您应该使用 UDF 将更多工作负载分配给工作人员,而不是collect让一台机器(驱动程序)来运行它。这只是错误的方法并且不可扩展。

# This is your main function, pure Python and you can unittest it in any way you want.
# The most important about this function is:
# - everything must be encapsulated inside the function, no global variable works here
def req(col1, col2):
    if col1 == 'val1':
        var0 = 'a'
    elif col1 == 'val2':
        var0 = 'b'
    elif col1 == 'val3':
        var0 = 'c'
    elif col1 == 'val4':
        var0 = 'd'
    
    var0=set([var0])
    req_var = set(['a','b','c','d'])
    var_list = list(req_var - var0)
    
    #subscription info

    headers = {header} # !!! `header` must available **inside** this function, global won't work

    body = [{
      'text': col2
    }]
    
    if len(col2) < 500:
        # !!! same as `header`, `constructed_url` must available **inside** this function, global won't work
        request = requests.post(constructed_url, params=params, headers=headers, json=body)
        response = request.json()
        return (response.col4, response.col5)
    else:
        return None

# Now you wrap the function above into a Spark UDF.
# I'm using only 2 columns here as input, but you can use as many columns as you wish.
# Same as output, I'm using only a tuple with 2 elements, you can make it as many items as you wish.
df.withColumn('temp', F.udf(req, T.ArrayType(T.StringType()))('col1', 'col2')).show()

# Output
# +----+----+------+------------------+
# |col1|col2|  col3|              temp|
# +----+----+------+------------------+
# |   a| cat|owner1|[foo_cat, bar_cat]|
# |   b| dog|owner2|[foo_dog, bar_dog]|
# |   c|fish|owner3|              null|
# |   d| fox|owner4|              null|
# |   e| rat|owner5|              null|
# +----+----+------+------------------+

# Now all you have to do is extract the tuple and assign to separate columns
# (and delete temp column to cleanup)
(df
    .withColumn('col4', F.col('temp')[0])
    .withColumn('col5', F.col('temp')[1])
    .drop('temp')
    .show()
)

# Output
# +----+----+------+-------+-------+
# |col1|col2|  col3|   col4|   col5|
# +----+----+------+-------+-------+
# |   a| cat|owner1|foo_cat|bar_cat|
# |   b| dog|owner2|foo_dog|bar_dog|
# |   c|fish|owner3|   null|   null|
# |   d| fox|owner4|   null|   null|
# |   e| rat|owner5|   null|   null|
# +----+----+------+-------+-------+


推荐阅读