机器学习工作流 -------------- 机器学习系统编程模型的首要设计目标是:对开发者的整个工作流进行完整的编程支持。一个常见的机器学习任务一般包含如 :numref:`img_workflow`\ 所示的工作流。这个工作流完成了训练数据集的读取,模型的训练,测试和调试。通过归纳,我们可以将这一工作流中用户所需要自定义的部分通过定义以下API来支持(我们这里假设用户的高层次API以Python函数的形式提供): - **数据处理:** 首先,用户需要数据处理API来支持将数据集从磁盘读入。进一步,用户需要对读取的数据进行预处理,从而可以将数据输入后续的机器学习模型中。 - **模型定义:** 完成数据的预处理后,用户需要模型定义API来定义机器学习模型。这些模型带有模型参数,可以对给定的数据进行推理。 - **优化器定义:** 模型的输出需要和用户的标记进行对比,这个对比差异一般通过损失函数(Loss function)来进行评估。因此,优化器定义API允许用户定义自己的损失函数,并且根据损失来引入(Import)和定义各种优化算法(Optimisation algorithms)来计算梯度(Gradient),完成对模型参数的更新。 - **训练:** 给定一个数据集,模型,损失函数和优化器,用户需要训练API来定义一个循环(Loop)从而将数据集中的数据按照小批量(mini-batch)的方式读取出来,反复计算梯度来更新模型。这个反复的过程称为训练。 - **测试和调试:** 训练过程中,用户需要测试API来对当前模型的精度进行评估。当精度达到目标后,训练结束。这一过程中,用户往往需要调试API来完成对模型的性能和正确性进行验证。 .. _img_workflow: .. figure:: ../img/ch02/img_workflow.svg :width: 800px 机器学习系统工作流 环境配置 ~~~~~~~~ 下面以MindSpore框架实现多层感知机为例,了解完整的机器学习工作流。代码运行环境为MindSpore1.5.2,Ubuntu16.04,CUDA10.1。 在构建机器学习工作流程前,MindSpore需要通过context.set_context来配置运行需要的信息,如运行模式、后端信息、硬件等信息。 以下代码导入context模块,配置运行需要的信息。 .. code:: python import os import argparse from mindspore import context parser = argparse.ArgumentParser(description='MindSpore MLPNet Example') parser.add_argument('--device_target', type=str, default="CPU", choices=['Ascend', 'GPU', 'CPU']) args = parser.parse_known_args()[0] context.set_context(mode=context.GRAPH_MODE, device_target=args.device_target) 上述配置样例运行使用图模式。根据实际情况配置硬件信息,譬如代码运行在Ascend AI处理器上,则–device_target选择Ascend,代码运行在CPU、GPU同理。 数据处理 ~~~~~~~~ 配置好运行信息后,首先讨论数据处理API的设计。这些API提供了大量Python函数支持用户用一行命令即可读入常见的训练数据集(如MNIST,CIFAR,COCO等)。 在加载之前需要将下载的数据集存放在./datasets/MNIST_Data路径中;MindSpore提供了用于数据处理的API模块 mindspore.dataset,用于存储样本和标签。在加载数据集前,通常会对数据集进行一些处理,mindspore.dataset也集成了常见的数据处理方法。 以下代码读取了MNIST的数据是大小为\ :math:`28 \times 28`\ 的图片,返回DataSet对象。 .. code:: python import mindspore.dataset as ds DATA_DIR = './datasets/MNIST_Data/train' mnist_dataset = ds.MnistDataset(DATA_DIR) 有了DataSet对象后,通常需要对数据进行增强,常用的数据增强包括翻转、旋转、剪裁、缩放等;在MindSpore中是使用map将数据增强的操作映射到数据集中的,之后进行打乱(Shuffle)和批处理(Batch)。 .. code:: python # 导入需要用到的模块 import mindspore.dataset as ds import mindspore.dataset.transforms.c_transforms as C import mindspore.dataset.vision.c_transforms as CV from mindspore.dataset.vision import Inter from mindspore import dtype as mstype # 数据处理过程 def create_dataset(data_path, batch_size=32, repeat_size=1, num_parallel_workers=1): # 定义数据集 mnist_ds = ds.MnistDataset(data_path) resize_height, resize_width = 32, 32 rescale = 1.0 / 255.0 rescale_nml = 1 / 0.3081 shift_nml = -1 * 0.1307 / 0.3081 # 定义所需要操作的map映射 resize_op = CV.Resize((resize_height, resize_width), interpolation=Inter.LINEAR) rescale_nml_op = CV.Rescale(rescale_nml * rescale, shift_nml) hwc2chw_op = CV.HWC2CHW() type_cast_op = C.TypeCast(mstype.int32) # 使用map映射函数,将数据操作应用到数据集 mnist_ds = mnist_ds.map(operations=type_cast_op, input_columns="label", num_parallel_workers=num_parallel_workers) mnist_ds = mnist_ds.map(operations=[resize_op, rescale_nml_op,hwc2chw_op], input_columns="image",num_parallel_workers=num_parallel_workers) # 进行shuffle、batch操作 buffer_size = 10000 mnist_ds = mnist_ds.shuffle(buffer_size=buffer_size) mnist_ds = mnist_ds.batch(batch_size, drop_remainder=True) return mnist_ds 模型定义 ~~~~~~~~ 使用MindSpore定义神经网络需要继承mindspore.nn.Cell,神经网络的各层需要预先在__init__方法中定义,然后重载__construct__方法实现神经网络的前向传播过程。 因为输入大小被处理成\ :math:`32 \times 32`\ 的图片,所以需要用Flatten将数据压平为一维向量后给全连接层。 全连接层的输入大小为\ :math:`32 \times 32`\ ,输出是预测属于\ :math:`0 \sim 9`\ 中的哪个数字,因此输出大小为10,下面定义了一个三层的全连接层。 .. code:: python # 导入需要用到的模块 import mindspore.nn as nn # 定义线性模型 class MLPNet(nn.Cell): def __init__(self): super(MLPNet, self).__init__() self.flatten = nn.Flatten() self.dense1 = nn.Dense(32*32, 128) self.dense2 = nn.Dense(128, 64) self.dense3 = nn.Dense(64, 10) def construct(self, inputs): x = self.flatten(inputs) x = self.dense1(x) x = self.dense2(x) logits = self.dense3(x) return logits # 实例化网络 net = MLPNet() 损失函数和优化器 ~~~~~~~~~~~~~~~~ 有了神经网络组件构建的模型我们还需要定义\ **损失函数**\ 来计算训练过程中输出和真实值的误差。\ **均方误差**\ (Mean Squared Error,MSE)是线性回归中常用的,是计算估算值与真实值差值的平方和的平均数。 **平均绝对误差**\ (Mean Absolute Error,MAE)是计算估算值与真实值差值的绝对值求和再求平均。 **交叉熵**\ (Cross Entropy,CE)是分类问题中常用的,衡量已知数据分布情况下,计算输出分布和已知分布的差值。 有了损失函数,我们就可以通过损失值利用\ **优化器**\ 对参数进行训练更新。对于优化的目标函数\ :math:`f(x)`\ ;先求解其梯度\ :math:`\nabla`\ :math:`f(x)`\ ,然后将训练参数\ :math:`W`\ 沿着梯度的负方向更新,更新公式为:\ :math:`W_t = W_{t-1} - \alpha\nabla(W_{t-1})`\ ,其中\ :math:`\alpha`\ 是学习率,\ :math:`W`\ 是训练参数,\ :math:`\nabla(W_{t-1})`\ 是方向。 神经网络的优化器种类很多,一类是学习率不受梯度影响的随机梯度下降(Stochastic Gradient Descent)及SGD的一些改进方法,如带有Momentum的SGD;另一类是自适应学习率如AdaGrad、RMSProp、Adam等。 **SGD**\ 的更新是对每个样本进行梯度下降,因此计算速度很快,但是单样本更新频繁,会造成震荡;为了解决震荡问题,提出了带有Momentum的SGD,该方法的参数更新不仅仅由梯度决定,也和累计的梯度下降方向有关,使得增加更新梯度下降方向不变的维度,减少更新梯度下降方向改变的维度,从而速度更快也减少震荡。 自适应学习率\ **AdaGrad**\ 是通过以往的梯度自适应更新学习率,不同的参数\ :math:`W_i`\ 具有不同的学习率。AdaGrad对频繁变化的参数以更小的步长更新,而稀疏的参数以更大的步长更新。因此对稀疏的数据表现比较好。\ **Adadelta**\ 是对AdaGrad的改进,解决了AdaGrad优化过程中学习率\ :math:`\alpha`\ 单调减少问题;Adadelta不对过去的梯度平方进行累加,用指数平均的方法计算二阶动量,避免了二阶动量持续累积,导致训练提前结束。\ **Adam**\ 可以理解为Adadelta和Momentum的结合,对一阶二阶动量均采用指数平均的方法计算。 MindSpore提供了丰富的API来让用户导入损失函数和优化器。在下面的例子中,计算了输入和真实值之间的softmax交叉熵损失,导入Momentum优化器。 .. code:: python # 定义损失函数 net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean') # 定义优化器 net_opt = nn.Momentum(net.trainable_params(), learning_rate=0.01, momentum=0.9) 训练及保存模型 ~~~~~~~~~~~~~~ MindSpore提供了回调Callback机制,可以在训练过程中执行自定义逻辑,使用框架提供的ModelCheckpoint为例。ModelCheckpoint可以保存网络模型和参数,以便进行后续的Fine-tuning(微调)操作。 .. code:: python # 导入模型保存模块 from mindspore.train.callback import ModelCheckpoint, CheckpointConfig # 设置模型保存参数 config_ck = CheckpointConfig(save_checkpoint_steps=1875, keep_checkpoint_max=10) # 应用模型保存参数 ckpoint = ModelCheckpoint(prefix="checkpoint_lenet", config=config_ck) 通过MindSpore提供的model.train接口可以方便地进行网络的训练,LossMonitor可以监控训练过程中loss值的变化。 .. code:: python # 导入模型训练需要的库 from mindspore.nn import Accuracy from mindspore.train.callback import LossMonitor from mindspore import Model def train_net(args, model, epoch_size, data_path, repeat_size, ckpoint_cb, sink_mode): """定义训练的方法""" # 加载训练数据集 ds_train = create_dataset(os.path.join(data_path, "train"), 32, repeat_size) model.train(epoch_size, ds_train, callbacks=[ckpoint_cb, LossMonitor(125)], dataset_sink_mode=sink_mode) 其中,dataset_sink_mode用于控制数据是否下沉,数据下沉是指数据通过通道直接传送到Device上,可以加快训练速度,dataset_sink_mode为True表示数据下沉,否则为非下沉。 有了数据集、模型、损失函数、优化器后就可以进行训练了,这里把train_epoch设置为1,对数据集进行1个迭代的训练。在train_net和 test_net方法中,我们加载了之前下载的训练数据集,mnist_path是MNIST数据集路径。 .. code:: python train_epoch = 1 mnist_path = "./datasets/MNIST_Data" dataset_size = 1 model = Model(net, net_loss, net_opt, metrics={"Accuracy": Accuracy()}) train_net(args, model, train_epoch, mnist_path, dataset_size, ckpoint, False) 测试和验证 ~~~~~~~~~~ 测试是将测试数据集输入到模型,运行得到输出的过程。通常在训练过程中,每训练一定的数据量后就会测试一次,以验证模型的泛化能力。MindSpore使用model.eval接口读入测试数据集。 .. code:: python def test_net(model, data_path): """定义验证的方法""" ds_eval = create_dataset(os.path.join(data_path, "test")) acc = model.eval(ds_eval, dataset_sink_mode=False) print("{}".format(acc)) # 验证模型精度 test_net(model, mnist_path) 在训练完毕后,参数保存在checkpoint中,可以将训练好的参数加载到模型中进行验证。 .. code:: python import numpy as np from mindspore import Tensor from mindspore import load_checkpoint, load_param_into_net # 定义测试数据集,batch_size设置为1,则取出一张图片 ds_test = create_dataset(os.path.join(mnist_path, "test"), batch_size=1).create_dict_iterator() data = next(ds_test) # images为测试图片,labels为测试图片的实际分类 images = data["image"].asnumpy() labels = data["label"].asnumpy() # 加载已经保存的用于测试的模型 param_dict = load_checkpoint("checkpoint_lenet-1_1875.ckpt") # 加载参数到网络中 load_param_into_net(net, param_dict) # 使用函数model.predict预测image对应分类 output = model.predict(Tensor(data['image'])) # 输出预测分类与实际分类 print(f'Predicted: "{predicted[0]}", Actual: "{labels[0]}"')