python - Py4JJavaError:调用 None.org.apache.spark.api.java.JavaSparkContext 时出错
问题描述
任何人都知道为什么我会在 Jupyter Notebooks 中遇到此错误???我一直在尝试将我的 Tensorflow 模型加载到 Apache Spark vis SparlFlow 中,但我似乎无法弄清楚如何克服这个错误。任何帮助将非常感激。
第一个 Jupyter 细胞:
from sparkflow.graph_utils import build_graph
from sparkflow.tensorflow_async import SparkAsyncDL
import tensorflow as tf
from pyspark.ml.feature import VectorAssembler, OneHotEncoder
from pyspark.ml.pipeline import Pipeline
from pyspark.sql import SparkSession
from tensorflow.keras import layers
from tensorflow.keras import losses
第二个 Jupyter 细胞:
def lstm_model(X_train, y_train):
# Reshapes to input neuron
inputs= keras.Input(shape = (X_train.shape[1], 1))\
#Training Layers
x_1 = layers.LSTM(units=50, return_sequences=True, input_shape=(X_train.shape[1], 1))(inputs)
x_1 = layers.Dropout(0.2)(x_1)
x_1 = layers.LSTM(units = 50, return_sequences = True)(x_1)
x_1 = layers.Dropout(0.2)(x_1)
x_1 = layers.LSTM(units = 50, return_sequences = True)(x_1)
x_1 = layers.Dropout(0.2)(x_1)
x_1 = layers.LSTM(units = 50, return_sequences = True)(x_1)
x_1 = layers.Dropout(0.2)(x_1)
x_1 = layers.Flatten()(x_1)
# 1 output neuron for each column prediction
output = Dense(units=1)(x_1)
return losses.MeanSquaredError(y_train,output)
第三个 Jupyter 细胞:
def dataframe_input(pandas_dataframe):
train_data = pandas_dataframe[self.column_name].values
# Reshaping to a 2D array
train_data = train_data.reshape(-1,1)
print(train_data.dtype)
print(type(train_data))
print(train_data.shape)
# Feature Scaling
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_train_data =scaler.fit_transform(train_data)
# Initialzing each x_train and y_train datasets for each column
X_train = []
y_train = []
# Appending scaled training data to each dataset
for i in range(self.timesteps, len(train_data)):
X_train.append(scaled_train_data[i - self.timesteps:i, 0])
y_train.append(scaled_train_data[i, 0])
# Numpy array creation, Keras requires numpy arrays for Inputs
X_train, y_train = np.array(X_train, dtype=int), np.array(y_train)
print(X_train.shape)
print(X_train.dtype)
# Reshaping to a 3D matrix (970, 30, 1)
#X_train = np.reshape(X_train, (X_train[0], X_train[1], 1))
print(X_train.shape)
return X_train, y_train
第四个 Jupyter 单元(我得到错误的地方):
# Spark Session
# In order to use APIs of SQL, HIVE, and Streaming, no need to create separate contexts as sparkSession includes all the APIs.
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.getOrCreate()
# Reading CSVto a Spark DataFrame
df = spark.read.option("inferSchema", "true").csv('"../csv_test_files/stats.csv"')
# Convert the Spark dataframe into a Pandas Dataframe
pandas_dataframe = df.select("*").toPandas()
# Get the input and ouput data for passing to the model
X_train, y_train = dataframe_input(pandas_dataframe)
错误输出:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-25-5143cc437b69> in <module>
3 spark = SparkSession \
4 .builder \
----> 5 .appName("Python Spark SQL basic example") \
6 .getOrCreate()
7
~/anaconda3/lib/python3.7/site-packages/pyspark/sql/session.py in getOrCreate(self)
171 for key, value in self._options.items():
172 sparkConf.set(key, value)
--> 173 sc = SparkContext.getOrCreate(sparkConf)
174 # This SparkContext may be an existing one.
175 for key, value in self._options.items():
~/anaconda3/lib/python3.7/site-packages/pyspark/context.py in getOrCreate(cls, conf)
365 with SparkContext._lock:
366 if SparkContext._active_spark_context is None:
--> 367 SparkContext(conf=conf or SparkConf())
368 return SparkContext._active_spark_context
369
~/anaconda3/lib/python3.7/site-packages/pyspark/context.py in __init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls)
134 try:
135 self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
--> 136 conf, jsc, profiler_cls)
137 except:
138 # If an error occurs, clean up in order to allow future SparkContext creation:
~/anaconda3/lib/python3.7/site-packages/pyspark/context.py in _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, jsc, profiler_cls)
196
197 # Create the Java SparkContext through Py4J
--> 198 self._jsc = jsc or self._initialize_context(self._conf._jconf)
199 # Reset the SparkConf to the one actually used by the SparkContext in JVM.
200 self._conf = SparkConf(_jconf=self._jsc.sc().conf())
~/anaconda3/lib/python3.7/site-packages/pyspark/context.py in _initialize_context(self, jconf)
304 Initialize SparkContext in function to allow subclass specific initialization
305 """
--> 306 return self._jvm.JavaSparkContext(jconf)
307
308 @classmethod
~/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
1523 answer = self._gateway_client.send_command(command)
1524 return_value = get_return_value(
-> 1525 answer, self._gateway_client, None, self._fqn)
1526
1527 for temp_arg in temp_args:
~/anaconda3/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.net.BindException: Can't assign requested address: Service 'sparkDriver' failed after 16 retries (on a random free port)! Consider explicitly setting the appropriate binding address for the service 'sparkDriver' (for example spark.driver.bindAddress for SparkDriver) to the correct binding address.
at java.base/sun.nio.ch.Net.bind0(Native Method)
at java.base/sun.nio.ch.Net.bind(Net.java:461)
at java.base/sun.nio.ch.Net.bind(Net.java:453)
at java.base/sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:227)
at io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:128)
at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:558)
at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1283)
at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:501)
at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:486)
at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:989)
at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:254)
at io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:364)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.base/java.lang.Thread.run(Thread.java:834)
解决方案
好像你有太多正在运行的 SparkSessions。在默认配置中,您只能有 16 次,因为有 16 次重试才能为 Spark 的作业概览页面获取端口。
这可能是因为您在一个繁忙的集群上工作,有许多用户在运行作业,或者,例如,因为您有很多运行 SparkSessions 的 Jupyter 笔记本。
根据您使用的资源管理器,有不同的方法可以检查当前打开了多少 SparkSession。
为了避免该问题,您还可以增加重试次数以查找 Spark 在创建 SparkSession 时创建的未使用端口。为此,您必须将 config 参数设置spark.port.maxRetries
为更大的值(另请参见此处:https ://spark.apache.org/docs/latest/configuration.html ):
spark = SparkSession.builder.config('spark.port.maxRetries', 100).getOrCreate()
推荐阅读
- angular - 添加到图表中的新点在图表 highstock 的最后一个不可见
- python - 即使检测到 GPU,Pytorch 也没有使用 GPU
- php - 添加出现在 Woocommerce 中选定变体上的额外文本
- postgresql - 如何将 docker 容器连接到远程 postgresql
- protractor - 出现 Toastr 通知时应触发事件
- ios - Firebase 5.0.0+ 和 iOS 7.0
- python - Python - 平均网格角的颜色图?
- python - Pyplot 上的多个 x 标签
- javascript - 在 Google 数据工作室社区连接器中建模嵌套数据
- javascript - HTML 输入上的 addEventListener 在 Android WebView 中不起作用