首页 > 解决方案 > 创建 pyspark 的 spark 上下文 py4j java gateway 对象

问题描述

我正在尝试将 java 数据帧转换为 pyspark 数据帧。为此,我在 java 进程中创建一个数据框(或行数据集)并在 java 端启动一个 py4j.GatewayServer 服务器进程。然后在 python 方面,我正在创建一个 py4j.java_gateway.JavaGateway() 客户端对象并将其传递给 pyspark 的 SparkContext 构造函数,以将其链接到已经启动的 jvm 进程。但是我收到了这个错误:-

File: "path_to_virtual_environment/lib/site-packages/pyspark/conf.py", line 120, in __init__
    self._jconf = _jvm.SparkConf(loadDefaults)
TypeError: 'JavaPackage' object is not callable

有人可以帮忙吗?以下是我正在使用的代码:-

Java代码:-

import py4j.GatewayServer
public class TestJavaToPythonTransfer{
    Dataset<Row> df1;
    public TestJavaToPythonTransfer(){
        SparkSession spark = 
              SparkSession.builder().appName("test1").config("spark.master","local").getOrCreate();
        df1 = spark.read().json("path/to/local/json_file");
    }
    public Dataset<Row> getDf(){
        return df1;  
    }
    public static void main(String args[]){
       GatewayServer gatewayServer = new GatewayServer(new TestJavaToPythonTransfer());
       gatewayServer.start();
       System.out.println("Gateway server started");
    }
}

Python代码:-

from pyspark.sql import SQLContext, DataFrame
from pyspark import SparkContext, SparkConf
from py4j.java_gateway import JavaGateway
gateway = JavaGateway()
conf = SparkConf().set('spark.io.encryption.enabled','true')
py_sc = SparkContext(gateway=gateway,conf=conf)
j_df = gateway.getDf()
py_df = DataFrame(j_df,SQLContext(py_sc))
print('print dataframe content')
print(dpy_df.collect())

运行python代码的命令:-

python path_to_python_file.py

我也试过这样做: -

$SPARK_HOME/bin/spark-submit --master local path_to_python_file.py

但是在这里,虽然代码没有抛出任何错误,但它没有向终端打印任何内容。我需要为此设置一些 spark conf 吗?

PS - 如果代码中存在拼写错误或错误,请提前道歉,因为我无法直接从公司的 IDE 复制代码和错误堆栈。

标签: apache-sparkpysparkapache-spark-sqlpy4j

解决方案


在调用 getDf() 之前缺少对entry_point的调用

所以,试试这个:

app = gateway.entry_point
j_df = app.getDf()

此外,我在下面使用 Python 和 Scala(希望你不介意)创建了工作副本,显示了如何在 Scala 端 py4j 网关上使用 Spark 会话和示例 DataFrame 启动,在 Python 端我已经访问了该 DataFrame 并转换为 Python List [ Tuple] 在 Python 端转换回用于 Spark 会话的 DataFrame 之前:

Python:

from py4j.java_gateway import JavaGateway
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StructField

if __name__ == '__main__':
    gateway = JavaGateway()

    spark_app = gateway.entry_point
    df = spark_app.df()

    # Note "apply" method here comes from Scala's companion object to access elements of an array
    df_to_list_tuple = [(int(i.apply(0)), int(i.apply(1))) for i in df]

    spark = (SparkSession
             .builder
             .appName("My PySpark App")
             .getOrCreate())

    schema = StructType([
        StructField("a", IntegerType(), True),
        StructField("b", IntegerType(), True)])

    df = spark.createDataFrame(df_to_list_tuple, schema)

    df.show()

斯卡拉:

import java.nio.file.{Path, Paths}

import org.apache.spark.sql.SparkSession
import py4j.GatewayServer

object SparkApp {
  val myFile: Path = Paths.get(System.getProperty("user.home") + "/dev/sample_data/games.csv")
  val spark = SparkSession.builder()
    .master("local[*]")
    .appName("My app")
    .getOrCreate()

  val df = spark
      .read
      .option("header", "True")
      .csv(myFile.toString)
      .collect()

}

object Py4JServerApp extends App {


  val server = new GatewayServer(SparkApp)
  server.start()

  print("Started and running...")
}


推荐阅读