首页 > 解决方案 > 在 Apache Spark 中并行训练 Keras 模型

问题描述

\我正在尝试使用 Apache Spark 和 Elephas 并行训练多个 Keras 模型。这是我正在尝试做的代码:

train_data = pd.read_csv("csv_files/stats.csv")
timesteps = 30

model_1, rdd1 = train_LSTM_model(spark_context = sc, dataframe= train_data, column_number=1 ,timesteps = 30)
model_2, rdd2 = train_LSTM_model(spark_context = sc, dataframe = train_data, column_number=2,timesteps = 30)
model_3, rdd3 = train_LSTM_model(spark_context = sc, dataframe = train_data, column_number=3,timesteps = 30)
model_4, rdd4 = train_LSTM_model(spark_context = sc, dataframe = train_data, column_number=4,timesteps = 30)
model_5, rdd5 = train_LSTM_model(spark_context = sc, dataframe = train_data, column_number=5,timesteps = 30)
model_6, rdd6 = train_LSTM_model(spark_context = sc, dataframe = train_data, column_number=6,timesteps = 30)
# Fitting model to RDD
spark_model_1 = fit_to_spark_model(keras_model=model_no_programs_run, rdd = rdd1)
spark_model_2 = fit_to_spark_model(keras_model=model_duration_time, rdd = rdd2)
spark_model_3 = fit_to_spark_model(keras_model=model_avg_duration_time, rdd = rdd3)
spark_model_4 = fit_to_spark_model(keras_model=model_no_processed_records, rdd = rdd4)
spark_model_5 = fit_to_spark_model(keras_model=model_status_states, rdd = rdd5)
spark_model_6 = fit_to_spark_model(keras_model=model_failure_within_period, rdd = rdd6)


# HOW TO RUN THE SCRIPT
#spark-submit --driver-memory 1G ./stats_app.py

很多代码都被抽象出来了,但基本上这个train_LSTM_model()函数是返回一个编译好的 Keras 模型,以及一个 RDD。rdd_1、rdd_2、rdd_# 等来自 : rdd = to_simple_rdd(spark_context, X_train, y_train)函数内部。X_train 和 y_train 已从传入的数据帧中提取到每个train_LSTM_model()函数中。

此外,这是该fit_to_spark_model()函数正在执行的操作:

def fit_to_spark_model(keras_model, rdd):
   spark_model = SparkModel(keras_model, frequency='epoch', mode='asynchronous')
   spark_model.fit(rdd, epochs=20, batch_size=32, verbose=0, validation_split=0.25)
   return spark_model

这对于在 Spark Worker 节点上同时训练多个 Keras 模型有意义吗?还是我必须采取不同的方法?不确定我是否只需要一个 RDD 或多个 RDD,就像我目前正在做的那样。万分感谢。

标签: pythonapache-sparkpysparkdeep-learning

解决方案


我希望您在实施之前充分了解 Spark 和 Elephas 的工作原理。如果您所做的这些步骤是通过 Spark 创建并行化,那么让我澄清一下,这绝对没有必要。并行性是在内部 spark 实现的,无需来自用户端的任何显式编码。

train_LSTM_model() - 这将返回一个 Keras 模型并使用to_simple_rddElephas 的方法创建一个 rdd。

从 Elephas 的基本文档中,我怀疑从train_LSTM_model函数返回的所有 rdd 和模型都是相同的(如果您没有对 train_LSTM_model 代码块中的数据维度进行任何更改)。因此,您最终将在同一个数据帧上多次构建类似的模型。

即使我的模型和数据相同的前提是错误的,我建议您单独运行每个模型,因为这可能会阻塞 Spark 服务器,可能不会为每个模型释放足够的工作节点,最终每次运行模型都会花费很多时间时间。


推荐阅读