首页 > 技术文章 > 用PyTorch重新创建Keras API

gyshht 2020-08-27 15:48 原文

  Francois Chollet写的《Deep Learning with Python》一书让我进入了深度学习的世界。从那时起我就爱上了Keras的风格。

  Keras是我的第一个框架,然后是Tensorflow,接着进入PyTorch。

  老实说,在Keras的模型训练中,我很兴奋这个进度条,真是太棒了。

  那么,为什么不尝试把Keras训练模型的经验带到PyTorch呢?

  这个问题让我开始了工作,最后我用所有那些花哨的进度条重现了Keras的Dense层、卷积层和平坦层。

  模型可以通过堆叠一层到另一层来创建,并通过简单地调用fit方法进行训练,该方法类似于Keras的工作方式。

  Keras的工作方式如下:

  #一层一层叠起来#采用输入数据的形状inputs = keras.Input(shape=(784,))l1 = layers.Dense(64, activation="relu")(inputs)l2 = layers.Dense(64, activation="relu")(l1)outputs = layers.Dense(10)(l2)model = keras.Model(inputs=inputs, outputs=outputs)#输出模型摘要model.summary()#模型训练和评估model.fit(x_train, y_train, epochs=2)model.evaluate(x_test, y_test)1.导入所需的库

  你可能不熟悉库pkbar,它用于显示类似Keras的进度条。

  !pip install pkbarimport torchfrom torch import nnfrom torch import optimfrom torch.autograd import Variablefrom torchsummary import summary as summary_import pkbarimport warningswarnings.filterwarnings('ignore')2.输入层和dense层

  输入层只是以数据的单一实例的形式被传递到神经网络并返回它,对于全连接的网络,它将类似于(1,784),对于卷积神经网络,它将是图像的尺寸(高度×宽度×通道)。

  使用大写字母来命名python函数是违反规则的,但是我们暂时忽略它(Keras源代码的某些部分使用相同的约定)。

  def Input(shape): Input.shape = shape return Input.shapedef get_conv_output(shape, inputs): bs = 1 data = Variable(torch.rand(bs, *shape)) output_feat = inputs(data) return output_feat.size(1)def same_pad(h_in, kernal, stride, dilation): return (stride*(h_in-1)-h_in+(dilation*(kernal-1))+1) / 2.0Dense类通过传递该层的输出神经元数量和激活函数来初始化。调用Dense层时,前一层作为输入传递。

  现在我们有了关于前一层的信息。如果前一层是输入层,则创建一个PyTorch线性层,其中输入层返回的形状和输出神经元的数量作为Dense类初始化期间的参数。

  如果前一层是Dense层,我们通过在Dense类中增加一个PyTorch线性层和一个激活层来扩展神经网络。

  如果前一层是卷积层或平坦层,我们将创建一个名为get_conv_output()的实用函数,通过卷积层和平坦层得到图像的输出形状。此维度是必需的,因为如果不向in_features参数传递值,则无法在PyTorch中创建线性层。

  函数的作用是将图像形状和卷积神经网络模型作为输入。然后,它创建一个与图像形状相同的虚拟张量,并将其传递给卷积网络(具有平坦层),并返回从中输出的数据的大小,该大小作为值传递给PyTorch线性层中的in_features参数。

  class Dense(nn.Module): def __init__(self, outputs, activation): super().__init__() self.outputs = outputs self.activation = activation def __call__(self, inputs): self.inputs_size = 1 if type(inputs) == tuple: for i in range(len(inputs)): self.inputs_size *= inputs[i] self.layers = nn.Sequential( nn.Linear(self.inputs_size, self.outputs), self.activation ) return self.layers elif isinstance(inputs[-2], nn.Linear): self.inputs = inputs self.layers = list(self.inputs) self.layers.extend([nn.Linear(self.layers[-2].out_features, self.outputs), self.activation]) self.layers = nn.Sequential(*self.layers) return self.layers else: self.inputs = inputs self.layers = list(self.inputs) self.layers.extend([nn.Linear(get_conv_output(Input.shape, self.inputs), self.outputs), self.activation]) self.layers = nn.Sequential(*self.layers) return self.layers3.平坦层

  为了创建一个平坦层,我们将创建一个名为FlattenedLayer的自定义层类,它接受张量作为输入,并在前向传播期间返回张量的平坦版本。

  我们将创建另一个名为flatten的类,当调用这个层时,前面的层作为输入传递,然后flatten类通过在前面的层上添加我们自定义创建的FlattenedLayer类来扩展网络。

  因此,所有到达平坦层的数据都是使用我们自定义创建的平坦层进行平坦的。

  class FlattenedLayer(nn.Module): def __init__(self): super().__init__() pass def forward(self, input): self.inputs = input.view(input.size(0), -1) return self.inputsclass Flatten(): def __init__(self): pass def __call__(self, inputs): self.inputs = inputs self.layers = list(self.inputs) self.layers.extend([FlattenedLayer()]) self.layers = nn.Sequential(*self.layers) return self.layers4.卷积层

  我们将通过传入滤波器数量、内核大小、步长、填充、膨胀和激活函数来初始化Conv2d层。

  现在,当调用Conv2d层时,前面的层被传递给它,如果前一层是Input layer,则是一个Pytorch conv2d层,其中提供了滤波器数量、内核大小、步长、填充,扩张和激活函数被创建,其中in_channels的值取自输入形状中的通道数。

  如果前一层是卷积层,则通过添加一个PyTorch Conv2d层和激活函数来扩展前一层,激活函数的值取自前一层的out_channels 。

  在填充的情况下,如果用户需要保留从该层传出的数据的维度,则可以将padding的值指定为“same”,而不是整数。

  如果padding的值被指定为“same”,那么将使用一个名为same_pad()的实用函数来获取padding的值,以保留给定输入大小、内核大小、步长和膨胀的维度。

  可以使用前面讨论的get_conv_output()实用程序函数获得输入大小。

  class Conv2d(nn.Module): def __init__(self, filters, kernel_size, strides, padding, dilation, activation): super().__init__() self.filters = filters self.kernel = kernel_size self.strides = strides self.padding = padding self.dilation = dilation self.activation = activation def __call__(self, inputs): if type(inputs) == tuple: self.inputs_size = inputs if self.padding == 'same': self.padding = int(same_pad(self.inputs_size[-2], self.kernel, self.strides, self.dilation)) else: self.padding = self.padding self.layers = nn.Sequential( nn.Conv2d(self.inputs_size[-3], self.filters, self.kernel, self.strides, self.padding, self.dilation), self.activation ) return self.layers else: if self.padding == 'same': self.padding = int(same_pad(get_conv_output(Input.shape, inputs), self.kernel, self.strides, self.dilation)) else: self.padding = self.padding self.inputs = inputs self.layers = list(self.inputs) self.layers.extend( [nn.Conv2d(self.layers[-2].out_channels, self.filters, self.kernel, self.strides, self.padding, self.dilation), self.activation] ) self.layers = nn.Sequential(*self.layers) return self.layers5.模型类

  在构建了模型的体系结构之后,通过传入输入层和输出层来初始化模型类。但是我已经给出了一个额外的参数,名为device,它在Keras中不存在,这个参数接受值为'CPU'或'CUDA',它将把整个模型移动到指定的设备。

  model类的parameters方法用于返回要给PyTorch优化器的模型参数。

  model类有一个名为compile的方法,它接受训练模型所需的优化器和丢失函数。模型类的摘要方法是借助torch的summary库显示所创建模型的摘要。

  采用拟合方法对模型进行训练,该方法以输入特征集、目标数据集和epoch数为参数。它显示由损失函数计算的损失和使用pkbar库的训练进度。

  评估会计算验证数据集的损失和精度。

  当使用PyTorch数据加载程序加载数据时,将使用fit_generator、evaluate_generator 和predict_generator 。fit_generator 以训练集数据加载器和epoch作为参数。evaluate_generator和predict_generator分别使用验证集数据加载器和测试数据加载器来衡量模型对未查看数据的执行情况。

  class Model(): def __init__(self, inputs, outputs, device): self.input_size = inputs self.device = device self.model = outputs.to(self.device) def parameters(self): return self.model.parameters() def compile(self, optimizer, loss): self.opt = optimizer self.criterion = loss def summary(self): summary_(self.model, self.input_size, device=self.device) print("Device Type:", self.device) def fit(self, data_x, data_y, epochs): self.model.train() for epoch in range(epochs): print("Epoch {}/{}".format(epoch+1, epochs)) progress = pkbar.Kbar(target=len(data_x), width=25) for i, (data, target) in enumerate(zip(data_x, data_y)): self.opt.zero_grad() train_out = self.model(data.to(self.device)) loss = self.criterion(train_out, target.to(self.device)) loss.backward() self.opt.step() progress.update(i, values=[("loss: ", loss.item())]) progress.add(1) def evaluate(self, test_x, test_y): self.model.eval() correct, loss = 0.0, 0.0 progress = pkbar.Kbar(target=len(test_x), width=25) for i, (data, target) in enumerate(zip(test_x, test_y)): out = self.model(data.to(self.device)) loss += self.criterion(out, target.to(self.device)) correct += ((torch.max(out, 1)[1]) == target.to(self.device)).sum() progress.update(i, values=[("loss", loss.item()/len(test_x)), ("acc", (correct/len(test_x)).item())]) progress.add(1) def fit_generator(self, generator, epochs): self.model.train() for epoch in range(epochs): print("Epoch {}/{}".format(epoch+1, epochs)) progress = pkbar.Kbar(target=len(generator), width=25) for i, (data, target) in enumerate(generator): self.opt.zero_grad() train_out = self.model(data.to(self.device)) loss = self.criterion(train_out.squeeze(), target.to(self.device)) loss.backward() self.opt.step() progress.update(i, values=[("loss: ", loss.item())]) progress.add(1) def evaluate_generator(self, generator): self.model.eval() correct, loss = 0.0, 0.0 progress = pkbar.Kbar(target=len(generator), width=25) for i, (data, target) in enumerate(generator): out = self.model(data.to(self.device)) loss += self.criterion(out.squeeze(), target.to(self.device)) correct += (torch.max(out.squeeze(), 1)[1] == target.to(self.device)).sum() progress.update(i, values=[("test_acc", (correct/len(generator)).item()), ("test_loss", loss.item()/len(generator))]) progress.add(1) def predict_generator(self, generator): self.model.train() out = [] for i, (data, labels) in enumerate(generator): out.append(self.model(data.to(self.device))) return out结尾郑州人流医院哪家好http://www.zztjyiyuan.com/

  我用Dense层和卷积神经网络在CIFAR100、CIFAR10和MNIST数据集上测试了代码。它工作得很好,但还有很大的改进空间。

推荐阅读