自定义Loss、Metric 及 Callback

除了使用飞桨框架内置的 API,有时会需要根据实际场景,自定义 Loss、Metric 及 Callback 来使用,本节介绍在飞桨框架中自定义的方法。

一、自定义损失函数 Loss

1.1 损失函数介绍

损失函数用来评估模型的预测结果与真实结果之间的差距,损失函数越小,模型的鲁棒性就越好。模型训练的过程其实是对损失函数采用梯度下降的方法,使得损失函数不断减小到局部最优值,而得到对任务来说比较合理的模型参数。

一般在深度学习任务中,有许多常用的损失函数,例如在图像分类任务中常用的交叉熵损失函数,在目标检测任务中常用的 Focal loss、L1/L2 损失函数等,在图像识别任务中常用的 Triplet Loss 以及 Center Loss 等。如果框架中提供的损失函数不能满足试图解决的问题,可按照框架的 API 结构要求自定义损失函数。

1.2 自定义 Loss 步骤

飞桨框架中实现自定义 Loss 的方法和使用 paddle.nn.Layer 进行模型组网的方法类似,包括三个步骤:

  1. 创建一个继承自 paddle.nn.Layer 的类;

  2. 在类的构造函数 __init__ 中定义需要的参数;

  3. 在类的前向计算函数 forward 中进行损失函数计算。

import paddle


class SelfDefineLoss(paddle.nn.Layer):
    """
    1. 继承paddle.nn.Layer
    """

    def __init__(self):
        """
        2. 构造函数根据自己的实际算法需求和使用需求进行参数定义即可
        """
        super().__init__()

    def forward(self, x, label):
        """
        3. 实现forward函数,forward在调用时会传递两个参数:x和label
            - x:单个或批次训练数据经过模型前向计算输出结果
            - label:单个或批次训练数据对应的标签数据
            接口返回值是一个Tensor,根据需要将所有x和label计算得到的loss值求和或取均值
        """
        # 返回forword中计算的结果
        # output = xxxxx
        # return output

下面是定义交叉熵损失函数 Loss 的示例:

class CrossEntropy(paddle.nn.Layer):
    def __init__(self):
        super().__init__()

    def forward(self, x, label):
        # 使用paddle内置的cross_entropy算子实现算法
        loss = paddle.nn.functional.cross_entropy(x, label)
        return loss

二、自定义评估指标 Metric

2.1 评估指标介绍

评估指标用来衡量一个模型的实际效果好坏,一般是通过计算模型的预测结果和真实结果之间的某种"距离"得出。

和损失函数类似,一般会在不同的任务场景中选择不同的评估指标来做模型评估,例如在分类任务中常见的评估指标包括了 Accuracy、Recall、Precision 和 AUC 等,在回归任务中常用的有 MAE 和 MSE 等等。这些常见的评估指标在飞桨框架中都有对应的 API 实现,可直接使用。如果不能满足需求,则可按照框架的 API 结构要求自定义评估指标。

2.2 自定义评估指标

通过框架实现自定义评估指标的方法,包括如下几个步骤:

  1. 创建一个继承自 paddle.metric.Metric 的类;

  2. 在类的构造函数 __init__ 中定义需要的参数;

  3. 实现 name 方法,返回定义的评估指标名字;

  4. 实现 compute 方法,这个方法主要用于 update 的加速,可省略;

  5. 实现 update 方法,用于单个 batch 训练时进行评估指标计算;

  6. 实现 accumulate 方法,返回历史 batch 训练积累后计算得到的评价指标值;

  7. 实现 reset 方法,每个 epoch 结束后进行评估指标的重置。

class SelfDefineMetric(paddle.metric.Metric):
    """
    1. 继承paddle.metric.Metric
    """

    def __init__(self):
        """
        2. 构造函数实现,自定义参数即可
        """
        super().__init__()

    def name(self):
        """
        3. 实现name方法,返回定义的评估指标名字
        """
        # return '自定义评价指标的名字'

    def compute(self, **args):
        """
        4. 本步骤可以省略,实现compute方法,这个方法主要用于`update`的加速,可以在这个方法中调用一些飞桨框架实现好的Tensor计算API,编译到模型网络中一起使用底层C++ OP计算。
        """
        # return '自己想要返回的数据,会做为update的参数传入。'

    def update(self, **args):
        """
        5. 实现update方法,用于单个batch训练时进行评估指标计算。
        - 当未实现`compute`类函数时,会将模型的计算输出和标签数据的展平作为`update`的参数传入。
        - 当实现了`compute`类函数时,会将compute的返回结果作为`update`的参数传入。
        """
        # return acc_value

    def accumulate(self):
        """
        6. 实现accumulate方法,返回历史batch训练积累后计算得到的评价指标值。
        每次`update`调用时进行数据积累,`accumulate`对积累的所有数据进行计算并返回。
        计算结果会在`fit`接口的训练日志中呈现。
        """
        # 利用update中积累的成员变量数据进行计算后返回
        # return accumulated_acc_value

    def reset(self):
        """
        7. 实现reset方法,每个Epoch结束后进行评估指标的重置,这样下个Epoch可以重新进行计算。
        """
        # do reset action

接下来看一个框架中的具体例子,Accuracy 评价指标的示例,这里就是按照上述说明中的方法完成了实现。

class Accuracy(paddle.metric.Metric):
    """
    继承paddle.metric.Metric
    """

    def __init__(self, topk=(1,), name=None, *args, **kwargs):
        """
        构造函数实现
        """
        super().__init__(*args, **kwargs)
        self.topk = topk
        self.maxk = max(topk)
        self._init_name(name)
        self.reset()

    def compute(self, pred, label, *args):
        """
        实现compute方法
        """
        pred = paddle.argsort(pred, descending=True)
        pred = paddle.slice(
            pred, axes=[len(pred.shape) - 1], starts=[0], ends=[self.maxk]
        )
        if (len(label.shape) == 1) or (
            len(label.shape) == 2 and label.shape[-1] == 1
        ):
            # In static mode, the real label data shape may be different
            # from shape defined by paddle.static.InputSpec in model
            # building, reshape to the right shape.
            label = paddle.reshape(label, (-1, 1))
        elif label.shape[-1] != 1:
            # one-hot label
            label = paddle.argmax(label, axis=-1, keepdim=True)
        correct = pred == label
        return paddle.cast(correct, dtype="float32")

    def update(self, correct, *args):
        """
        实现update方法,用于单个batch训练时进行评估指标计算。
        - 当未实现`compute`类函数时,会将模型的计算输出和标签数据的展平作为`update`的参数传入。
        - 当实现了`compute`类函数时,会将compute的返回结果作为`update`的参数传入。
        """
        if isinstance(correct, paddle.Tensor):
            correct = correct.numpy()
        num_samples = np.prod(np.array(correct.shape[:-1]))
        accs = []
        for i, k in enumerate(self.topk):
            num_corrects = correct[..., :k].sum()
            accs.append(float(num_corrects) / num_samples)
            self.total[i] += num_corrects
            self.count[i] += num_samples
        accs = accs[0] if len(self.topk) == 1 else accs
        return accs

    def reset(self):
        """
        实现reset方法,每个Epoch结束后进行评估指标的重置,这样下个Epoch可以重新进行计算。
        """
        self.total = [0.0] * len(self.topk)
        self.count = [0] * len(self.topk)

    def accumulate(self):
        """
        实现accumulate方法,返回历史batch训练积累后计算得到的评价指标值。
        每次`update`调用时进行数据积累,`accumulate`对积累的所有数据进行计算并返回。
        计算结果会在`fit`接口的训练日志中呈现。
        """
        res = []
        for t, c in zip(self.total, self.count):
            r = float(t) / c if c > 0 else 0.0
            res.append(r)
        res = res[0] if len(self.topk) == 1 else res
        return res

    def _init_name(self, name):
        name = name or "acc"
        if self.maxk != 1:
            self._name = ["{}_top{}".format(name, k) for k in self.topk]
        else:
            self._name = [name]

    def name(self):
        """
        实现name方法,返回定义的评估指标名字
        """
        return self._name

三、自定义回调函数 Callback

3.1 回调函数介绍

Callback 回调函数常用于对模型训练、评估、推理过程状态和参数的观察,如训练进度、loss 值等信息;也可用于实现一些自定义操作,如设置当 loss 值达到一定阈值时停止训练、按照设定规则定期保存模型等。可方便地掌握模型训练状态,及时做出灵活调整。

Callback 用在 Model.fitModel.evaluateModel.predict 等飞桨高层 API 中,先定义一个继承自 paddle.callbacks.Callback 的类,然后通过高层 API 接口的 callback 参数传入类的实例,用于模型训练、评估或推理过程中调用。

3.2 自定义回调函数

自定义回调函数的实现模板如下所示:

class SelfDefineCallback(paddle.callbacks.Callback):
    """
    1. 继承paddle.callbacks.Callback
    2. 按照自己的需求实现以下类成员方法:
        def on_train_begin(self, logs=None)                 训练开始前,`Model.fit`接口中调用
        def on_train_end(self, logs=None)                   训练结束后,`Model.fit`接口中调用
        def on_eval_begin(self, logs=None)                  评估开始前,`Model.evaluate`接口调用
        def on_eval_end(self, logs=None)                    评估结束后,`Model.evaluate`接口调用
        def on_predict_begin(self, logs=None)               推理开始前,`Model.predict`接口中调用
        def on_predict_end(self, logs=None)                 推理结束后,`Model.predict`接口中调用
        def on_epoch_begin(self, epoch, logs=None)          每轮训练开始前,`Model.fit`接口中调用
        def on_epoch_end(self, epoch, logs=None)            每轮训练结束后,`Model.fit`接口中调用
        def on_train_batch_begin(self, step, logs=None)     单个Batch训练开始前,`Model.fit`和`Model.train_batch`接口中调用
        def on_train_batch_end(self, step, logs=None)       单个Batch训练结束后,`Model.fit`和`Model.train_batch`接口中调用
        def on_eval_batch_begin(self, step, logs=None)      单个Batch评估开始前,`Model.evalute`和`Model.eval_batch`接口中调用
        def on_eval_batch_end(self, step, logs=None)        单个Batch评估结束后,`Model.evalute`和`Model.eval_batch`接口中调用
        def on_predict_batch_begin(self, step, logs=None)   单个Batch推理开始前,`Model.predict`和`Model.test_batch`接口中调用
        def on_predict_batch_end(self, step, logs=None)     单个Batch推理结束后,`Model.predict`和`Model.test_batch`接口中调用
    """

    def __init__(self):
        super().__init__()

    # 按照需求定义自己的类成员方法

飞桨框架在 paddle.callbacks 下内置了一些常用的回调函数相关 API,接下来看两个框架中的实际例子。其中第一个例子时框架自带的 ModelCheckpoint 回调函数,可以在 Model.fit 训练模型时自动存储每轮训练得到的模型;第二个例子是框架自带的 ProgBarLogger 回调函数,用于在 Model.fit 训练时打印损失函数和评估指标。这两个回调函数会在 Model.fit 执行时默认被调用。

import paddle
import paddle.distributed as dist


class ModelCheckpoint(paddle.callbacks.Callback):
    """
    继承自paddle.callbacks.Callback,该类的功能是
    训练模型时自动存储每轮训练得到的模型
    """

    def __init__(self, save_freq=1, save_dir=None):
        """
        构造函数实现
        """
        self.save_freq = save_freq
        self.save_dir = save_dir

    def on_epoch_begin(self, epoch=None, logs=None):
        """
        每轮训练开始前,获取当前轮数
        """
        self.epoch = epoch

    def _is_save(self):
        return (
            self.model and self.save_dir and dist.ParallelEnv().local_rank == 0
        )

    def on_epoch_end(self, epoch, logs=None):
        """
        每轮训练结束后,保存每轮的checkpoint
        """
        if self._is_save() and self.epoch % self.save_freq == 0:
            path = "{}/{}".format(self.save_dir, epoch)
            print("save checkpoint at {}".format(os.path.abspath(path)))
            self.model.save(path)

    def on_train_end(self, logs=None):
        """
        训练结束后,保存最后一轮的checkpoint
        """
        if self._is_save():
            path = "{}/final".format(self.save_dir)
            print("save checkpoint at {}".format(os.path.abspath(path)))
            self.model.save(path)
import time
from paddle.distributed import ParallelEnv
from paddle.utils import try_import
from paddle.hapi.progressbar import ProgressBar


class ProgBarLogger(paddle.callbacks.Callback):
    """
    继承自paddle.callbacks.Callback,该类的功能是
    训练模型时打印损失函数和评估指标
    """

    def __init__(self, log_freq=1, verbose=2):
        """
        构造函数实现
        """
        self.epochs = None
        self.steps = None
        self.progbar = None
        self.verbose = verbose
        self.log_freq = log_freq

    def _is_print(self):
        return self.verbose and ParallelEnv().local_rank == 0

    def on_train_begin(self, logs=None):
        """
        训练开始前,获取总epoch、metric等信息
        """
        self.epochs = self.params["epochs"]
        assert self.epochs
        self.train_metrics = self.params["metrics"]
        assert self.train_metrics

        self._train_timer = {
            "data_time": 0,
            "batch_time": 0,
            "count": 0,
            "samples": 0,
        }
        if self._is_print():
            print(
                "The loss value printed in the log is the current step, and the metric is the average value of previous steps."
            )

    def on_epoch_begin(self, epoch=None, logs=None):
        """
        每轮训练开始前,获取当前轮数、步数,声明进度条与计时器等
        """
        self.steps = self.params["steps"]
        self.epoch = epoch
        self.train_step = 0
        if self.epochs and self._is_print():
            print("Epoch %d/%d" % (epoch + 1, self.epochs))
        self.train_progbar = ProgressBar(num=self.steps, verbose=self.verbose)

        self._train_timer["batch_start_time"] = time.time()

    def _updates(self, logs, mode):
        values = []
        metrics = getattr(self, "%s_metrics" % (mode))
        progbar = getattr(self, "%s_progbar" % (mode))
        steps = getattr(self, "%s_step" % (mode))

        for k in metrics:
            if k in logs:
                values.append((k, logs[k]))

        if self.verbose == 3 and hasattr(self, "_%s_timer" % (mode)):
            timer = getattr(self, "_%s_timer" % (mode))
            cnt = timer["count"] if timer["count"] > 0 else 1.0
            samples = timer["samples"] if timer["samples"] > 0 else 1.0
            values.append(
                ("avg_reader_cost", "%.5f sec" % (timer["data_time"] / cnt))
            )
            values.append(
                ("avg_batch_cost", "%.5f sec" % (timer["batch_time"] / cnt))
            )
            values.append(
                (
                    "ips",
                    "%.5f samples/sec"
                    % (samples / (timer["data_time"] + timer["batch_time"])),
                )
            )
            timer["count"] = 0
            timer["samples"] = 0
            timer["data_time"] = 0.0
            timer["batch_time"] = 0.0

        progbar.update(steps, values)

    def on_train_batch_begin(self, step, logs=None):
        """
        单个Batch训练开始前,进行计时
        """
        self._train_timer["batch_data_end_time"] = time.time()
        self._train_timer["data_time"] += (
            self._train_timer["batch_data_end_time"]
            - self._train_timer["batch_start_time"]
        )

    def on_train_batch_end(self, step, logs=None):
        """
        单个Batch训练结束后,更新参数
        """
        logs = logs or {}
        self.train_step += 1

        self._train_timer["batch_time"] += (
            time.time() - self._train_timer["batch_data_end_time"]
        )
        self._train_timer["count"] += 1
        samples = logs.get("batch_size", 1)
        self._train_timer["samples"] += samples
        if self._is_print() and self.train_step % self.log_freq == 0:
            if self.steps is None or self.train_step < self.steps:
                self._updates(logs, "train")
        self._train_timer["batch_start_time"] = time.time()

    def on_epoch_end(self, epoch, logs=None):
        """
        每轮训练结束后,更新参数
        """
        logs = logs or {}
        if self._is_print() and (self.steps is not None):
            self._updates(logs, "train")

    def on_eval_begin(self, logs=None):
        """
        评估开始前,获取当前步数,声明进度条与计时器等
        """
        self.eval_steps = logs.get("steps", None)
        self.eval_metrics = logs.get("metrics", [])
        self.eval_step = 0
        self.evaled_samples = 0

        self._eval_timer = {
            "data_time": 0,
            "batch_time": 0,
            "count": 0,
            "samples": 0,
        }

        self.eval_progbar = ProgressBar(
            num=self.eval_steps, verbose=self.verbose
        )
        if self._is_print():
            print("Eval begin...")

        self._eval_timer["batch_start_time"] = time.time()

    def on_eval_batch_begin(self, step, logs=None):
        """
        单个Batch评估开始前,进行计时
        """
        self._eval_timer["batch_data_end_time"] = time.time()
        self._eval_timer["data_time"] += (
            self._eval_timer["batch_data_end_time"]
            - self._eval_timer["batch_start_time"]
        )

    def on_eval_batch_end(self, step, logs=None):
        """
        单个Batch评估结束后,更新参数
        """
        logs = logs or {}
        self.eval_step += 1
        samples = logs.get("batch_size", 1)
        self.evaled_samples += samples

        self._eval_timer["batch_time"] += (
            time.time() - self._eval_timer["batch_data_end_time"]
        )
        self._eval_timer["count"] += 1
        samples = logs.get("batch_size", 1)
        self._eval_timer["samples"] += samples

        if self._is_print() and self.eval_step % self.log_freq == 0:
            if self.eval_steps is None or self.eval_step < self.eval_steps:
                self._updates(logs, "eval")

        self._eval_timer["batch_start_time"] = time.time()

    def on_predict_begin(self, logs=None):
        """
        推理开始前,获取当前步数,声明进度条与计时器等
        """
        self.test_steps = logs.get("steps", None)
        self.test_metrics = logs.get("metrics", [])
        self.test_step = 0
        self.tested_samples = 0

        self._test_timer = {
            "data_time": 0,
            "batch_time": 0,
            "count": 0,
            "samples": 0,
        }

        self.test_progbar = ProgressBar(
            num=self.test_steps, verbose=self.verbose
        )
        if self._is_print():
            print("Predict begin...")

        self._test_timer["batch_start_time"] = time.time()

    def on_predict_batch_begin(self, step, logs=None):
        """
        单个Batch推理开始前,进行计时
        """
        self._test_timer["batch_data_end_time"] = time.time()
        self._test_timer["data_time"] += (
            self._test_timer["batch_data_end_time"]
            - self._test_timer["batch_start_time"]
        )

    def on_predict_batch_end(self, step, logs=None):
        """
        单个Batch推理结束后,更新参数
        """
        logs = logs or {}
        self.test_step += 1
        samples = logs.get("batch_size", 1)
        self.tested_samples += samples

        self._test_timer["batch_time"] += (
            time.time() - self._test_timer["batch_data_end_time"]
        )
        self._test_timer["count"] += 1
        samples = logs.get("batch_size", 1)
        self._test_timer["samples"] += samples

        if self.test_step % self.log_freq == 0 and self._is_print():
            if self.test_steps is None or self.test_step < self.test_steps:
                self._updates(logs, "test")

        self._test_timer["batch_start_time"] = time.time()

    def on_eval_end(self, logs=None):
        """
        评估结束后,更新参数,打印信息
        """
        logs = logs or {}
        if self._is_print() and (self.eval_steps is not None):
            self._updates(logs, "eval")
            print("Eval samples: %d" % (self.evaled_samples))

    def on_predict_end(self, logs=None):
        """
        推理结束后,更新参数,打印信息
        """
        logs = logs or {}
        if self._is_print():
            if self.test_step % self.log_freq != 0 or self.verbose == 1:
                self._updates(logs, "test")
            print("Predict samples: %d" % (self.tested_samples))

四、自定义Loss、Metric 及 Callback 的使用

以下代码示例中,介绍了自定义 Loss、Metric 及 Callback 后,如何在模型训练中使用。自定义的 loss、Metric 可传入 paddle.Model.prepare中完成训练准备配置,callback 可传入 paddle.Model.fit 中在模型训练中调用。

import paddle
import numpy as np
from paddle.vision.transforms import Normalize

transform = Normalize(mean=[127.5], std=[127.5], data_format="CHW")
# 加载数据集
train_dataset = paddle.vision.datasets.MNIST(mode="train", transform=transform)
test_dataset = paddle.vision.datasets.MNIST(mode="test", transform=transform)

mnist = paddle.nn.Sequential(
    paddle.nn.Flatten(1, -1),
    paddle.nn.Linear(784, 512),
    paddle.nn.ReLU(),
    paddle.nn.Dropout(0.2),
    paddle.nn.Linear(512, 10),
)

model = paddle.Model(mnist)

# 将paddle.nn.CrossEntropyLoss替换为CrossEntropy
# 将paddle.metric.Accuracy替换为Accuracy
model.prepare(
    optimizer=paddle.optimizer.Adam(parameters=model.parameters()),
    loss=CrossEntropy(),
    metrics=Accuracy(),
)

# 启动模型训练,加入自定义的两个Callbacks
model.fit(
    train_dataset,
    epochs=5,
    batch_size=64,
    verbose=0,
    callbacks=[ProgBarLogger(verbose=1), ModelCheckpoint()],
)
W1223 04:29:17.810079  9910 device_context.cc:447] Please NOTE: device: 0, GPU Compute Capability: 7.0, Driver API Version: 10.2, Runtime API Version: 10.2
W1223 04:29:17.815956  9910 device_context.cc:465] device: 0, cuDNN Version: 7.6.


The loss value printed in the log is the current step, and the metric is the average value of previous steps.
Epoch 1/5
step   2/938 [..............................] - loss: 2.1375 - acc: 0.1562 - ETA: 7:27 - 478ms/step 

/usr/local/python3.7.0/lib/python3.7/site-packages/paddle/fluid/layers/utils.py:77: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working
  return (isinstance(seq, collections.Sequence) and


step 938/938 [==============================] - loss: 0.1039 - acc: 0.9303 - 138ms/step          
Epoch 2/5
step 938/938 [==============================] - loss: 0.0887 - acc: 0.9696 - 123ms/step          
Epoch 3/5
step 938/938 [==============================] - loss: 0.0285 - acc: 0.9782 - 158ms/step          
Epoch 4/5
step 938/938 [==============================] - loss: 0.0049 - acc: 0.9833 - 158ms/step          
Epoch 5/5
step 938/938 [==============================] - loss: 0.1041 - acc: 0.9863 - 145ms/step          

五、总结

本节中介绍了飞桨框架中一些高阶自定义用法,包括自定义 Loss、Metric 及 Callback。飞桨框架既内置了丰富的组件,方便用户直接使用提升模型开发效率,也提供开放的接口方便用户根据任务需求自定义组件来使用,以便更灵活地进行模型开发。