python - LSTM 时间序列预测 - 验证损失和测试损失低于训练损失
问题描述
我试图根据前面步骤的速度预测下一个车速。我目前实现这一目标的方法是使用 LSTM 神经网络进行时间序列预测。我已经阅读了很多关于这个问题的教程,现在我已经建立了自己的预测车速的程序。在我当前的设置中,我尝试根据之前的 20 来预测下一个车速。
数据: 我有一个包含约 1000 个不同 .csv 文件的数据集。每个 .csv 文件都包含真实汽车行驶的车速,每秒测量一次。路线不同,但在同一个地区,来自同一个司机(我)。因此,每个 .csv 文件都有不同的长度。
数据检索和拆分:
我得到了 .csv 文件的文件名,并将其拆分为训练集、验证集和测试集。我这样做是因为应该尽可能快地进行拆分以防止泄漏。
full_files = [f for f in os.listdir(search_path) if os.path.isfile(f) and f.endswith(".csv")]
random.shuffle(full_files)
args = [cells, files_batched, None, history_range, future_range, steps_skipped, shifting]
files_len = len(full_files)
train_count = round(files_len * 0.7)
val_count = round(files_len * 0.2)
test_count = round(files_len * 0.1)
train_files = full_files[:train_count]
val_files = full_files[train_count:train_count + val_count]
test_files = full_files[train_count + val_count:train_count + val_count + test_count]
train_data = tuple(map(np.array, zip(*list(data_generator(train_files, *args)))))
val_data = tuple(map(np.array, zip(*list(data_generator(val_files, *args)))))
test_data = tuple(map(np.array, zip(*list(data_generator(test_files, *args)))))
我写了一个生成器“data_generator”,它基本上遍历每个文件,读取它并直接将其拆分为我想要的 LSTM 输入形状。Future_range 是我用作标签的时间步长,历史范围是以前的时间步长,单元格是我想从每个 .csv 文件中提取的特征。我知道这可能不是最好的方法,但它确保我在不同的 .csv 文件之间没有切片重叠(如果我读入所有内容并随后将其拆分,就会发生这种情况)。
def multivariate_data(data_set, target_vector, start_index, end_index, history_size, target_size, skip_data=1, index_step=1):
data = []
labels = []
start_index = start_index + history_size
if end_index is None:
end_index = len(data_set) - target_size
for i in range(start_index, end_index, index_step):
indices = range(i - history_size, i, skip_data)
data.append(data_set[indices])
labels.append(target_vector[i:i + target_size])
return np.array(data), np.array(labels)
def data_generator(file_list, feature_cells, file_batches, sample_batches, history_step, prediction_step, skip_data=1, shift=True):
i = 0
while True:
if i >= len(file_list):
break
else:
printProgressBar(i, len(file_list) - 1)
file_chunk = file_list[i * file_batches:(i + 1) * file_batches]
for file in file_chunk:
temp = pd.read_csv(open(file, 'r'), usecols=feature_cells, sep=";", header=None)
norm_values = temp.values
index_step = 1
if shift is False:
index_step = history_step + 1
train, label = multivariate_data(norm_values, norm_values[:, 0], 0, None, history_step, prediction_step, skip_data, index_step)
if sample_batches is not None:
for index in range(0, len(train), sample_batches):
batch = train[index: index + sample_batches], label[index: index + sample_batches]
if batch[0].shape != (sample_batches, history_step, len(feature_cells)):
continue
yield batch
else:
for index in range(0, len(train)):
yield train[index], label[index]
i += 1
缩放和洗牌:
现在我将我的 MinMaxScaler 安装到训练集(以防止泄漏)并将转换应用于训练集、验证集和测试集。然后我创建张量切片并打乱数据。
scaler = MinMaxScaler(feature_range=scaling)
scaler.fit(train_data[0].reshape(-1, train_data[0].shape[-1]))
train_x = scaler.transform(train_data[0].reshape(-1, train_data[0].shape[-1])).reshape(train_data[0].shape)
train_y = scaler.transform(train_data[1].reshape(-1, train_data[1].shape[-1])).reshape(train_data[1].shape)
val_x = scaler.transform(val_data[0].reshape(-1, val_data[0].shape[-1])).reshape(val_data[0].shape)
val_y = scaler.transform(val_data[1].reshape(-1, val_data[1].shape[-1])).reshape(val_data[1].shape)
test_x = scaler.transform(test_data[0].reshape(-1, test_data[0].shape[-1])).reshape(test_data[0].shape)
test_y = scaler.transform(test_data[1].reshape(-1, test_data[1].shape[-1])).reshape(test_data[1].shape)
train_len = len(train_x)
val_len = len(val_x)
test_len = len(test_x)
train_set = tf.data.Dataset.from_tensor_slices((train_x, train_y))
train_set = train_set.cache().shuffle(train_len).batch(batch_size).repeat()
val_set = tf.data.Dataset.from_tensor_slices((val_x, val_y))
val_set = val_set.batch(batch_size).repeat()
test_set = tf.data.Dataset.from_tensor_slices((test_x, test_y))
test_set = test_set.batch(batch_size).repeat()
训练:
最后,我创建了我的模型并将其拟合到我的数据中。
train_steps = train_len // batch_size
val_steps = val_len // batch_size
test_steps = test_len // batch_size
model = Sequential()
model.add(LSTM(128, return_sequences=True))
model.add(Dropout(dropout))
model.add(LSTM(64))
model.add(Dropout(dropout))
model.add(Dense(future_range))
early_stopping = EarlyStopping(monitor='val_loss', patience=2, mode='min')
checkpoint = ModelCheckpoint(search_path + "\\model", monitor='loss', verbose=0, save_best_only=True, mode='min')
optimizer = tf.optimizers.Adam(learning_rate=learning_rate)
model.compile(loss=tf.losses.MeanSquaredError(), optimizer=optimizer, metrics=[root_mean_squared_error])
history = model.fit(train_set, validation_data=val_set, epochs=epochs, steps_per_epoch=train_steps, validation_steps=val_steps, callbacks=[early_stopping])
当我开始训练时,损失从 ~0.2 开始,在第一次之后下降到 0.05 以下!时代。验证损失总是低于训练损失。测试集上的损失也非常低。
我认为很明显,对于这样的 NN,这些结果并不“合理”,因为车速通常是一个非常复杂的函数。我已经在互联网上搜索了可能的错误,唯一对我来说似乎合法的是数据泄漏。但在我看来,信息不可能从训练泄露到验证集。我直接拆分文件并仅在训练数据上使用缩放。
我也检查了这篇文章,但我认为其中一个问题不适合: https ://www.kdnuggets.com/2017/08/37-reasons-neural-network-not-working.html
如果我犯了一个愚蠢的错误,我很抱歉,但我是深度学习的新手,不太了解我的方法。这里有什么问题?
编辑:我试图将模型从单步预测更改为多步预测。“未来”预测的形状总是相同的(总是线性的并且不遵循正确的形状)。测试集(也用于下面的预测)MSE 损失很低,但 RMSE 非常高。怎么会这样?
解决方案
这更像是一个讨论问题。你使用重度辍学吗?因为他们可能会直接解释这里发生了什么。尝试使用不同的dropouts值进行实验。您还可以使用不同的正则化技术。
在dropout的例子中:由于禁用了神经元,每个样本的一些信息丢失了,随后的层试图基于不完整的表示来构建答案。训练损失更高,因为你人为地让网络更难给出正确的答案。然而,在验证期间,所有单元都可用,因此网络具有完整的计算能力——因此它可能比训练时表现更好。
我总结一下可能的原因:
在训练期间应用正则化,但在验证/测试期间不应用。
在每个 epoch 期间测量训练损失,而在每个 epoch 之后测量验证损失。
验证集可能比训练集更容易(或者可能存在泄漏)。如果可能,请尝试交叉验证。
您可以在此处找到有关该主题的更多详细信息
推荐阅读
- javascript - 当前日期的日期范围自定义功能
- javascript - json对象到字符串数组
- uiwebview - UIWebView 不再被接受。相反,使用 WKWebView
- python - 熊猫在列中删除结尾尾零,按另一列分组
- roblox - Roblox ScreenGUI 问题
- python - 子图未正确填充
- rest - Power bi 报表服务器(本地)中共享仪表板的方式之间的区别,例如嵌入或 api 或 url 访问或 wmi 提供程序或报表查看器
- wso2 - 通过 WSO2 IS 使用联合身份提供者配置单点登录
- php - Laravel 在运行时更改数据库
- ruby-on-rails - 有人能告诉我这条线的含义吗?