首页 > 解决方案 > 在 Keras 中使用自定义损失函数时的批量大小问题

问题描述

我正在通过定义自定义损失函数对标准神经网络进行轻微修改。自定义损失函数不仅取决于 y_true 和 y_pred,还取决于训练数据。我使用此处描述的包装解决方案实现了它。

具体来说,我想定义一个自定义损失函数,它是标准 mse 加上输入和 y_pred 平方之间的 mse:

def custom_loss(x_true)
    def loss(y_true, y_pred):
        return K.mean(K.square(y_pred - y_true) + K.square(y_true - x_true))
    return loss

然后我使用编译模型

model_custom.compile(loss = custom_loss( x_true=training_data ), optimizer='adam')

使用拟合模型

model_custom.fit(training_data, training_label, epochs=100, batch_size = training_data.shape[0])

以上所有方法都很好,因为批量大小实际上是所有训练样本的数量。

但是如果我在有 1000 个训练样本时设置不同的 batch_size(例如 10),就会出现错误

不兼容的形状:[1000] 与 [10]。

似乎 Keras 能够根据批量大小自动将输入的大小调整为自己的损失函数,但对于自定义损失函数却不能这样做。

你知道如何解决这个问题吗?

谢谢!

==================================================== =========================

* 更新:批量大小问题已解决,但出现了另一个问题

感谢 Ori 对连接输入和输出层的建议!它“有效”,因为代码可以在任何批量大小下运行。但是,似乎训练新模型的结果是错误的......下面是一个简化版本的代码来演示这个问题:

import numpy as np
import scipy.io
import keras
from keras import backend as K
from keras.models import Model
from keras.layers import Input, Dense, Activation
from numpy.random import seed
from tensorflow import set_random_seed

def custom_loss(y_true, y_pred): # this is essentially the mean_square_error
    mse = K.mean( K.square( y_pred[:,2] - y_true ) )
    return mse

# set the seeds so that we get the same initialization across different trials
seed_numpy = 0
seed_tensorflow = 0

# generate data of x = [ y^3 y^2 ]
y = np.random.rand(5000+1000,1) * 2 # generate 5000 training and 1000 testing samples
x = np.concatenate( ( np.power(y, 3) , np.power(y, 2) ) , axis=1 )

training_data  = x[0:5000:1,:]
training_label = y[0:5000:1]
testing_data   = x[5000:6000:1,:]
testing_label  = y[5000:6000:1]

# build the standard neural network with one hidden layer
seed(seed_numpy)
set_random_seed(seed_tensorflow)

input_standard = Input(shape=(2,))                                               # input
hidden_standard = Dense(10, activation='relu', input_shape=(2,))(input_standard) # hidden layer
output_standard = Dense(1, activation='linear')(hidden_standard)                 # output layer

model_standard = Model(inputs=[input_standard], outputs=[output_standard])     # build the model
model_standard.compile(loss='mean_squared_error', optimizer='adam')            # compile the model
model_standard.fit(training_data, training_label, epochs=50, batch_size = 500) # train the model
testing_label_pred_standard = model_standard.predict(testing_data)             # make prediction

# get the mean squared error
mse_standard = np.sum( np.power( testing_label_pred_standard - testing_label , 2 ) ) / 1000

# build the neural network with the custom loss
seed(seed_numpy)
set_random_seed(seed_tensorflow)

input_custom = Input(shape=(2,))                                             # input
hidden_custom = Dense(10, activation='relu', input_shape=(2,))(input_custom) # hidden layer
output_custom_temp = Dense(1, activation='linear')(hidden_custom)            # output layer
output_custom = keras.layers.concatenate([input_custom, output_custom_temp])

model_custom = Model(inputs=[input_custom], outputs=[output_custom])         # build the model
model_custom.compile(loss = custom_loss, optimizer='adam')                   # compile the model
model_custom.fit(training_data, training_label, epochs=50, batch_size = 500) # train the model
testing_label_pred_custom = model_custom.predict(testing_data)               # make prediction

# get the mean squared error
mse_custom = np.sum( np.power( testing_label_pred_custom[:,2:3:1] - testing_label , 2 ) ) / 1000

# compare the result
print( [ mse_standard , mse_custom ] )

基本上,我有一个标准的单层神经网络和一个自定义的单层神经网络,其输出层与输入层连接。出于测试目的,我没有在自定义损失函数中使用连接输入层,因为我想看看自定义网络是否可以重现标准神经网络。由于自定义损失函数等效于标准的“mean_squared_error”损失,因此两个网络应该具有相同的训练结果(我还重置了随机种子以确保它们具有相同的初始化)。

但是,训练结果却大不相同。似乎串联使训练过程有所不同?有任何想法吗?

再次感谢您的所有帮助!

最后更新:Ori 连接输入和输出层的方法有效,并通过使用生成器进行了验证。谢谢!!

标签: pythontensorflowkeras

解决方案


问题是在编译模型时,您将 x_true 设置为静态张量,以所有样本的大小。而 keras 损失函数的输入是 y_true 和 y_pred,其中每个都是 size [batch_size, :]

正如我所看到的,有两个选项可以解决这个问题,第一个是使用生成器来创建批次,这样你就可以控制每次评估哪些索引,并且在损失函数中你可以切片x_true适合被评估样本的张量:

def custom_loss(x_true)
    def loss(y_true, y_pred):
        x_true_samples = relevant_samples(x_true)
        return K.mean(K.square(y_pred - y_true) + K.square(y_true - x_true_samples))
    return loss

此解决方案可能很复杂,我建议使用更简单的解决方法 -
将输入层与输出层连接起来,这样您的新输出将采用original_output , input.

现在您可以使用新的修改后的损失函数:

def loss(y_true, y_pred):
    return K.mean(K.square(y_pred[:,:output_shape] - y_true[:,:output_shape]) +
                  K.square(y_true[:,:output_shape] - y_pred[:,outputshape:))

现在,您的新损失函数将同时考虑输入数据和预测。

编辑:
请注意,当您设置种子时,您的模型并不完全相同,并且由于您没有使用生成器,因此您让 keras 选择批次,并且对于不同的模型,他可能会选择不同的样本。
由于您的模型不收敛,不同的样本可能会导致不同的结果。

我在您的代码中添加了一个生成器,以验证我们为训练选择的样本,现在您可以看到两个结果是相同的:

def custom_loss(y_true, y_pred): # this is essentially the mean_square_error
    mse = keras.losses.mean_squared_error(y_true, y_pred[:,2])
    return mse


def generator(x, y, batch_size):
    curIndex = 0
    batch_x = np.zeros((batch_size,2))
    batch_y = np.zeros((batch_size,1))
    while True:
        for i in range(batch_size):            
            batch_x[i] = x[curIndex,:]
            batch_y[i] = y[curIndex,:]
            i += 1;
            if i == 5000:
                i = 0
        yield batch_x, batch_y

# set the seeds so that we get the same initialization across different trials
seed_numpy = 0
seed_tensorflow = 0

# generate data of x = [ y^3 y^2 ]
y = np.random.rand(5000+1000,1) * 2 # generate 5000 training and 1000 testing samples
x = np.concatenate( ( np.power(y, 3) , np.power(y, 2) ) , axis=1 )

training_data  = x[0:5000:1,:]
training_label = y[0:5000:1]
testing_data   = x[5000:6000:1,:]
testing_label  = y[5000:6000:1]

batch_size = 32



# build the standard neural network with one hidden layer
seed(seed_numpy)
set_random_seed(seed_tensorflow)

input_standard = Input(shape=(2,))                                               # input
hidden_standard = Dense(10, activation='relu', input_shape=(2,))(input_standard) # hidden layer
output_standard = Dense(1, activation='linear')(hidden_standard)                 # output layer

model_standard = Model(inputs=[input_standard], outputs=[output_standard])     # build the model
model_standard.compile(loss='mse', optimizer='adam')            # compile the model
#model_standard.fit(training_data, training_label, epochs=50, batch_size = 10) # train the model
model_standard.fit_generator(generator(training_data,training_label,batch_size),  steps_per_epoch= 32, epochs= 100)
testing_label_pred_standard = model_standard.predict(testing_data)             # make prediction

# get the mean squared error
mse_standard = np.sum( np.power( testing_label_pred_standard - testing_label , 2 ) ) / 1000

# build the neural network with the custom loss
seed(seed_numpy)
set_random_seed(seed_tensorflow)


input_custom = Input(shape=(2,))                                               # input
hidden_custom = Dense(10, activation='relu', input_shape=(2,))(input_custom) # hidden layer
output_custom_temp = Dense(1, activation='linear')(hidden_custom)            # output layer
output_custom = keras.layers.concatenate([input_custom, output_custom_temp])

model_custom = Model(inputs=input_custom, outputs=output_custom)         # build the model
model_custom.compile(loss = custom_loss, optimizer='adam')                   # compile the model
#model_custom.fit(training_data, training_label, epochs=50, batch_size = 10) # train the model
model_custom.fit_generator(generator(training_data,training_label,batch_size),  steps_per_epoch= 32, epochs= 100)
testing_label_pred_custom = model_custom.predict(testing_data)

# get the mean squared error
mse_custom = np.sum( np.power( testing_label_pred_custom[:,2:3:1] - testing_label , 2 ) ) / 1000

# compare the result
print( [ mse_standard , mse_custom ] )

推荐阅读