apache-spark - 创建 pyspark 的 spark 上下文 py4j java gateway 对象
问题描述
我正在尝试将 java 数据帧转换为 pyspark 数据帧。为此,我在 java 进程中创建一个数据框(或行数据集)并在 java 端启动一个 py4j.GatewayServer 服务器进程。然后在 python 方面,我正在创建一个 py4j.java_gateway.JavaGateway() 客户端对象并将其传递给 pyspark 的 SparkContext 构造函数,以将其链接到已经启动的 jvm 进程。但是我收到了这个错误:-
File: "path_to_virtual_environment/lib/site-packages/pyspark/conf.py", line 120, in __init__
self._jconf = _jvm.SparkConf(loadDefaults)
TypeError: 'JavaPackage' object is not callable
有人可以帮忙吗?以下是我正在使用的代码:-
Java代码:-
import py4j.GatewayServer
public class TestJavaToPythonTransfer{
Dataset<Row> df1;
public TestJavaToPythonTransfer(){
SparkSession spark =
SparkSession.builder().appName("test1").config("spark.master","local").getOrCreate();
df1 = spark.read().json("path/to/local/json_file");
}
public Dataset<Row> getDf(){
return df1;
}
public static void main(String args[]){
GatewayServer gatewayServer = new GatewayServer(new TestJavaToPythonTransfer());
gatewayServer.start();
System.out.println("Gateway server started");
}
}
Python代码:-
from pyspark.sql import SQLContext, DataFrame
from pyspark import SparkContext, SparkConf
from py4j.java_gateway import JavaGateway
gateway = JavaGateway()
conf = SparkConf().set('spark.io.encryption.enabled','true')
py_sc = SparkContext(gateway=gateway,conf=conf)
j_df = gateway.getDf()
py_df = DataFrame(j_df,SQLContext(py_sc))
print('print dataframe content')
print(dpy_df.collect())
运行python代码的命令:-
python path_to_python_file.py
我也试过这样做: -
$SPARK_HOME/bin/spark-submit --master local path_to_python_file.py
但是在这里,虽然代码没有抛出任何错误,但它没有向终端打印任何内容。我需要为此设置一些 spark conf 吗?
PS - 如果代码中存在拼写错误或错误,请提前道歉,因为我无法直接从公司的 IDE 复制代码和错误堆栈。
解决方案
在调用 getDf() 之前缺少对entry_point的调用
所以,试试这个:
app = gateway.entry_point
j_df = app.getDf()
此外,我在下面使用 Python 和 Scala(希望你不介意)创建了工作副本,显示了如何在 Scala 端 py4j 网关上使用 Spark 会话和示例 DataFrame 启动,在 Python 端我已经访问了该 DataFrame 并转换为 Python List [ Tuple] 在 Python 端转换回用于 Spark 会话的 DataFrame 之前:
Python:
from py4j.java_gateway import JavaGateway
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StructField
if __name__ == '__main__':
gateway = JavaGateway()
spark_app = gateway.entry_point
df = spark_app.df()
# Note "apply" method here comes from Scala's companion object to access elements of an array
df_to_list_tuple = [(int(i.apply(0)), int(i.apply(1))) for i in df]
spark = (SparkSession
.builder
.appName("My PySpark App")
.getOrCreate())
schema = StructType([
StructField("a", IntegerType(), True),
StructField("b", IntegerType(), True)])
df = spark.createDataFrame(df_to_list_tuple, schema)
df.show()
斯卡拉:
import java.nio.file.{Path, Paths}
import org.apache.spark.sql.SparkSession
import py4j.GatewayServer
object SparkApp {
val myFile: Path = Paths.get(System.getProperty("user.home") + "/dev/sample_data/games.csv")
val spark = SparkSession.builder()
.master("local[*]")
.appName("My app")
.getOrCreate()
val df = spark
.read
.option("header", "True")
.csv(myFile.toString)
.collect()
}
object Py4JServerApp extends App {
val server = new GatewayServer(SparkApp)
server.start()
print("Started and running...")
}
推荐阅读
- plotly - 在 Plotly 中更改“大小”或“颜色”标签
- docker - 支持 Lando 的 Docker 镜像
- r - 使用 dplyr 重命名除列出的列名之外的所有列名后缀?
- ios - 当我们退出应用程序而不杀死应用程序 Swift 4 时弹出不会再次出现
- python - 使用 numpy.sqrt 可能会产生警告“无效值”的负数以外的输入?
- angular - 没有将“exportAs”设置为“stat.dpndcyDt”的指令
- elasticsearch - 使用serilog直接将日志写入elasticsearch是不是一个好主意
- javascript - 更改宽度后元素消失
- r - 如何获取包含 NA OR 值 < 0 的两列矩阵中的行列表?
- matlab - 在 Matlab 中分配带偏差的有符号整数