首页 > 解决方案 > 如何使用 panda_udf 对系列运行多个函数

问题描述

我有大量从二进制编码的图像文件。我正在尝试对它们进行解码,然后将它们保存到 hdfs。我有代码可以在 python 中成功执行此操作,但是由于需要完成大量文件,因此必须导入所有内容,转换为 panda,然后运行我的代码,即使它都适合单个节点作为熊猫,它不会。我一直在尝试将我的代码从 panda 函数转换为 pyspark panda_udf 但没有成功。下面是我的代码。如果有人有任何想法,我将不胜感激!

def image_save(bin,file):
    start = datetime.now()
    b64_encoded_img_binary = bin
    img_binary = np.fromstring(b64_encoded_img_binary.decode('base64'), np.uint8)
    img = cv2.imdecode(img_binary, cv2.IMREAD_COLOR)
    cv2.imwrite('/dsaa/ml_shared/image_output/'+ str(file).replace(' ','').replace('JPEG','jpg'), img)
    end = datetime.now()
    time = (end - start).total_seconds()
    return time

df['seconds'] = df.apply(lambda x: 
image_save(x['b64_encoded_image_binary'],x['file_name']), axis=1)

该代码适用于熊猫并正确保存文件。但是当我尝试用@pandas_udf 包装代码时,我收到一个错误,它期望返回一个系列而不是一个浮点数。我知道标量 pandas udf 输出一个系列,但我不知道如何更改代码以使其工作。

标签: pythonapache-sparkpyspark

解决方案


这个我想多了。我不认为下一步是熊猫是由系列组成的。所以它可以工作,我可以采用系列输入并将它们作为函数中的数据框。

def FPD(a, b):

    def F2(a, b):
        c = cv2.imdecode(np.fromstring(a.decode('base64'), np.uint8), cv2.IMREAD_COLOR)
        cv2.imwrite('/dsaa/ml_shared/image_output/' + b, c)
        return(b)

    pdf = pd.DataFrame({'a': a, 'b' : b })
    pdf.loc[:, 'c'] = pdf.apply(lambda x: F2(x['a'], x['b']), axis = 1)
    return(b)

推荐阅读