首页 > 解决方案 > 如何在 Pyspark 中使用 Scala 函数?

问题描述

如果有任何方法可以在 Pyspark 中使用 Scala 函数,我一直在寻找一段时间,但我没有找到任何关于这个主题的文档或指南。

我的目标是使用之前人们定义的scala函数appendPrvlngFields隐式函数。然后我想在python环境中使用这个函数而不重新定义它,但是通过一些类型的方式,比如注册scala函数

假设我在 Scala 中创建了一个使用用户定义库的简单对象,例如:

%scala 
package com.Example
import org.library.DataFrameImplicits._
import org.apache.spark.sql.DataFrame


object ScalaFunction {

    def appendPrvlngFields(df: DataFrame,
                          otherDF: DataFrame,
                          colsToAppend: List[(String)] = List[(String)](),
                          mapColName: Option[String] = None,
                          partitionByCols: List[(String)],
                          sort: List[(String)],
                          sortBfirst: Boolean = false,
                          subsequent: Boolean = false,
                          existingPartitionsOnly: Boolean = false,
                          otherDFPrefix: String = "prvlg",
                          enforceLowerCase: Boolean = false
                          ): DataFrame = {
      
                          return df.appendPrvlngFields(otherDF,
                                                      colsToAppend,
                                                      mapColName,
                                                      partitionByCols,
                                                      sort,
                                                      sortBfirst,
                                                      subsequent,
                                                      existingPartitionsOnly,
                                                      otherDFPrefix,
                                                      enforceLowerCase
                                                      )
                          }
                       }

然后在python环境中,我通过定义这个函数来调用函数appendPrvlngFields:

def pyAppendPrvlngFields(df: DataFrame, 
                         otherDF: DataFrame, 
                         colsToAppend: list,  
                         partitionByCols: list, 
                         sort: list, 
                         mapColName = None,
                         sortBfirst = False, 
                         subsequent = False, 
                         existingPartitionsOnly = False,
                         otherDFPrefix = "prvlg",
                         enforceLowerCase = False) -> DataFrame:
  
  return(DataFrame(sc._jvm.com.SRMG.ScalaPySpark.appendPrvlngFields(df._jdf,
                                                     otherDF._jdf,
                                                     colsToAppend,  
                                                     mapColName,
                                                     partitionByCols, 
                                                     sort,sortBfirst,
                                                     subsequent),
    sqlContext))

我知道我需要将 df 转换为 df._jdf,但是如何将列表、字符串、Option、Boolean 转换为 java 类型?

标签: javapythonscalapyspark

解决方案


首先,您将 ScalaUDF 代码分离到单独的项目中并创建 jar 文件。
接下来,将其传递给 python。
这是怎么做的。

# create the jar using SBT
sbt clean assembly

# Pass the jar to the PySpark session
pyspark --jars [path/to/jar/x.jar]

参考:https ://medium.com/wbaa/using-scala-udfs-in-pyspark-b70033dd69b9


推荐阅读