章节大纲


  • 重复训练:Epochs

    我们可以重复训练多次。每次完整的训练循环被称为一个 “epoch”(或“训练轮次”)。

    Python
    import numpy as np
    
    # 假设之前的 NeuralNetwork 类、sigmoid 和 truncated_normal 函数已定义并可用
    # 导入之前保存的数据
    import pickle
    data_path = "data/mnist/" # 确保路径正确
    with open(data_path + "pickled_mnist.pkl", "br") as fh:
        data = pickle.load(fh)
    train_imgs = data[0]
    test_imgs = data[1]
    train_labels = data[2]
    test_labels = data[3]
    train_labels_one_hot = data[4]
    test_labels_one_hot = data[5]
    
    image_size = 28
    image_pixels = image_size * image_size
    no_of_different_labels = 10
    
    # 激活函数和权重初始化辅助函数
    @np.vectorize
    def sigmoid(x):
        return 1 / (1 + np.e ** -x)
    
    activation_function = sigmoid
    
    from scipy.stats import truncnorm
    def truncated_normal(mean=0, sd=1, low=0, upp=10):
        return truncnorm((low - mean) / sd, (upp - mean) / sd, loc=mean, scale=sd)
    
    # 定义 NeuralNetwork 类
    class NeuralNetwork:
        def __init__(self,
                     no_of_in_nodes,
                     no_of_out_nodes,
                     no_of_hidden_nodes,
                     learning_rate):
            self.no_of_in_nodes = no_of_in_nodes
            self.no_of_out_nodes = no_of_out_nodes
            self.no_of_hidden_nodes = no_of_hidden_nodes
            self.learning_rate = learning_rate
            self.create_weight_matrices()
    
        def create_weight_matrices(self):
            rad = 1 / np.sqrt(self.no_of_in_nodes)
            X = truncated_normal(mean=0, sd=1, low=-rad, upp=rad)
            self.wih = X.rvs((self.no_of_hidden_nodes, self.no_of_in_nodes))
    
            rad = 1 / np.sqrt(self.no_of_hidden_nodes)
            X = truncated_normal(mean=0, sd=1, low=-rad, upp=rad)
            self.who = X.rvs((self.no_of_out_nodes, self.no_of_hidden_nodes))
    
        def train_single(self, input_vector, target_vector):
            """
            input_vector 和 target_vector 可以是元组、列表或 ndarray。
            这个方法执行一次前向传播和一次反向传播(单个样本)。
            """
            input_vector = np.array(input_vector, ndmin=2).T
            target_vector = np.array(target_vector, ndmin=2).T
    
            # 前向传播
            output_vector1 = np.dot(self.wih, input_vector)
            output_hidden = activation_function(output_vector1)
    
            output_vector2 = np.dot(self.who, output_hidden)
            output_network = activation_function(output_vector2)
    
            # 计算误差
            output_errors = target_vector - output_network
    
            # 更新隐藏层到输出层的权重 (who)
            tmp = output_errors * output_network * (1.0 - output_network)
            tmp = self.learning_rate * np.dot(tmp, output_hidden.T)
            self.who += tmp
    
            # 计算隐藏层误差
            hidden_errors = np.dot(self.who.T, output_errors)
    
            # 更新输入层到隐藏层的权重 (wih)
            tmp = hidden_errors * output_hidden * (1.0 - output_hidden)
            self.wih += self.learning_rate * np.dot(tmp, input_vector.T)
    
        def train(self, data_array, labels_one_hot_array, epochs=1, intermediate_results=False):
            """
            这个方法在整个数据集上重复训练多个 epoch。
            如果 intermediate_results 为 True,则返回每个 epoch 后的权重。
            """
            intermediate_weights = []
            for epoch in range(epochs):
                print(f"Epoch: {epoch+1}/{epochs}", end="\r") # 打印当前 epoch 进度
                for i in range(len(data_array)):
                    self.train_single(data_array[i], labels_one_hot_array[i])
                
                # 在每个 epoch 结束时,评估并打印准确率
                corrects, wrongs = self.evaluate(train_imgs, train_labels)
                train_accuracy = corrects / (corrects + wrongs)
                corrects, wrongs = self.evaluate(test_imgs, test_labels)
                test_accuracy = corrects / (corrects + wrongs)
                print(f"Epoch: {epoch+1}/{epochs} - 训练准确率: {train_accuracy:.4f}, 测试准确率: {test_accuracy:.4f}")
    
                if intermediate_results:
                    intermediate_weights.append((self.wih.copy(), self.who.copy()))
            return intermediate_weights if intermediate_results else None
    
        def confusion_matrix(self, data_array, labels):
            cm = {} # 使用字典存储,因为矩阵可能稀疏
            for i in range(len(data_array)):
                res = self.run(data_array[i])
                res_max = res.argmax()
                target = labels[i][0] # 真实的标签
                # 将浮点型标签转换为整数,作为字典的键
                key = (int(target), res_max)
                cm[key] = cm.get(key, 0) + 1 # 增加计数
            return cm
    
        def run(self, input_vector):
            input_vector = np.array(input_vector, ndmin=2).T
            output_vector = np.dot(self.wih, input_vector)
            output_vector = activation_function(output_vector)
            output_vector = np.dot(self.who, output_vector)
            output_vector = activation_function(output_vector)
            return output_vector
    
        def evaluate(self, data, labels):
            corrects, wrongs = 0, 0
            for i in range(len(data)):
                res = self.run(data[i])
                res_max = res.argmax()
                # 确保标签是整数进行比较
                if res_max == int(labels[i][0]):
                    corrects += 1
                else:
                    wrongs += 1
            return corrects, wrongs
    
    # --- 运行训练 ---
    epochs_to_run = 3 # 设定训练轮次
    NN = NeuralNetwork(no_of_in_nodes=image_pixels,
                        no_of_out_nodes=10,
                        no_of_hidden_nodes=100,
                        learning_rate=0.1)
    
    print("开始多轮训练...")
    # 直接调用新的 train 方法,它会打印每个 epoch 的准确率
    NN.train(train_imgs,
             train_labels_one_hot,
             epochs=epochs_to_run,
             intermediate_results=False) # 这里不需要存储中间权重
    
    print("\n多轮训练完成。")
    
    # 再次评估最终准确率(通常会比 epoch 结束时的最后一次打印的更准确,因为训练是连续的)
    corrects_train_final, wrongs_train_final = NN.evaluate(train_imgs, train_labels)
    print(f"最终训练准确率: {corrects_train_final / (corrects_train_final + wrongs_train_final):.4f}")
    corrects_test_final, wrongs_test_final = NN.evaluate(test_imgs, test_labels)
    print(f"最终测试准确率: {corrects_test_final / (corrects_test_final + wrongs_test_final):.4f}")
    
    

    输出示例(每次运行可能略有不同,但趋势应是准确率逐渐提高):

    开始多轮训练...
    Epoch: 1/3 - 训练准确率: 0.9452, 测试准确率: 0.9459
    Epoch: 2/3 - 训练准确率: 0.9627, 测试准确率: 0.9582
    Epoch: 3/3 - 训练准确率: 0.9699, 测试准确率: 0.9626
    
    多轮训练完成。
    最终训练准确率: 0.9699
    最终测试准确率: 0.9626
    

    为了重复训练,我们对 NeuralNetwork 类进行了以下修改:

    • train_single 方法:这个方法基本上就是之前被称为 train 的逻辑,它负责对单个输入-目标对执行一次前向传播和反向传播以更新权重。

    • 新的 train 方法:这个方法现在负责管理训练的“epoch”计数。它会循环执行指定次数的 epoch,在每个 epoch 内遍历整个训练数据集,并调用 train_single 方法来更新权重。

    • 中间结果存储:为了测试目的,我们增加了 intermediate_results 参数。如果设置为 True,它会在每个 epoch 结束后,将当前的权重矩阵 self.wihself.who 的副本保存到 intermediate_weights 列表中并返回。这对于分析训练过程中模型性能的变化非常有用。

    • 混淆矩阵字典:为了更好地处理可能稀疏的混淆矩阵,confusion_matrix 方法现在使用字典来存储 (实际标签, 预测标签) 的计数,而不是固定的 NumPy 数组。这样可以更灵活地处理各种标签组合,并且对于那些从未出现过的错误分类,它不会在内存中占用空间。

    • 评估标签类型:在 evaluateconfusion_matrix 方法中,我们确保在与 res_max(预测的整数索引)比较或作为字典键使用时,真实的标签也转换为整数类型(例如 int(labels[i][0]))。

    通过这些改进,我们能够更清晰地组织训练过程,并更好地观察神经网络在多个训练轮次中的性能提升。您可以看到,随着 epoch 的增加,训练集和测试集的准确率都在稳步提升,这表明网络正在有效地从数据中学习。


    We can repeat the training multiple times. Each run is called an "epoch".
    epochs = 3
    NN = NeuralNetwork(no_of_in_nodes = image_pixels,
    no_of_out_nodes = 10,
    no_of_hidden_nodes = 100,
    learning_rate = 0.1)
    for epoch in range(epochs):
    print("epoch: ", epoch)
    for i in range(len(train_imgs)):
    NN.train(train_imgs[i],
    train_labels_one_hot[i])
    corrects, wrongs = NN.evaluate(train_imgs, train_labels)
    print("accuracy train: ", corrects / ( corrects + wrongs))
    corrects, wrongs = NN.evaluate(test_imgs, test_labels)
    print("accuracy: test", corrects / ( corrects + wrongs))
    epoch: 0
    accruracy train: 0.94515
    accruracy: test 0.9459
    epoch: 1
    accruracy train: 0.9626833333333333
    accruracy: test 0.9582
    epoch: 2
    accruracy train: 0.96995
    accruracy: test 0.9626
    We want to do the multiple training of the training set inside of our network. To this purpose we rewrite the
    method train and add a method train_single. train_single is more or less what we called 'train' before. Whereas
    the new 'train' method is doing the epoch counting. For testing purposes, we save the weight matrices after
    each epoch in
    the list intermediate_weights. This list is returned as the output of train:
    import numpy as np
    @np.vectorize
    def sigmoid(x):
    210
    return 1 / (1 + np.e ** -x)
    activation_function = sigmoid
    from scipy.stats import truncnorm
    def truncated_normal(mean=0, sd=1, low=0, upp=10):
    return truncnorm((low - mean) / sd,
    (upp - mean) / sd,
    loc=mean,
    scale=sd)
    class NeuralNetwork:
    def __init__(self,
    no_of_in_nodes,
    no_of_out_nodes,
    no_of_hidden_nodes,
    learning_rate):
    self.no_of_in_nodes = no_of_in_nodes
    self.no_of_out_nodes = no_of_out_nodes
    self.no_of_hidden_nodes = no_of_hidden_nodes
    self.learning_rate = learning_rate
    self.create_weight_matrices()
    def create_weight_matrices(self):
    """ A method to initialize the weight matrices of the neur
    al network"""
    rad = 1 / np.sqrt(self.no_of_in_nodes)
    X = truncated_normal(mean=0,
    sd=1,
    low=-rad,
    upp=rad)
    self.wih = X.rvs((self.no_of_hidden_nodes,
    self.no_of_in_nodes))
    rad = 1 / np.sqrt(self.no_of_hidden_nodes)
    X = truncated_normal(mean=0,
    sd=1,
    low=-rad,
    upp=rad)
    self.who = X.rvs((self.no_of_out_nodes,
    self.no_of_hidden_nodes))
    def train_single(self, input_vector, target_vector):
    211
    n)
    r.T)
    def"""
    input_vector and target_vector can be tuple,
    list or ndarray
    """
    output_vectors = []
    input_vector = np.array(input_vector, ndmin=2).T
    target_vector = np.array(target_vector, ndmin=2).T
    output_vector1 = np.dot(self.wih,
    input_vector)
    output_hidden = activation_function(output_vector1)
    output_vector2 = np.dot(self.who,
    output_hidden)
    output_network = activation_function(output_vector2)
    output_errors = target_vector - output_network
    # update the weights:
    tmp = output_errors * output_network * \
    (1.0 - output_network)
    tmp = self.learning_rate * np.dot(tmp,
    output_hidden.T)
    self.who += tmp
    # calculate hidden errors:
    hidden_errors = np.dot(self.who.T,
    output_errors)
    # update the weights:
    tmp = hidden_errors * output_hidden * (1.0 - output_hidde
    self.wih += self.learning_rate * np.dot(tmp, input_vecto
    train(self, data_array,
    labels_one_hot_array,
    epochs=1,
    intermediate_results=False):
    intermediate_weights = []
    for epoch in range(epochs):
    print("*", end="")
    for i in range(len(data_array)):
    212
    ifreturnself.train_single(data_array[i],
    labels_one_hot_array[i])
    intermediate_results:
    intermediate_weights.append((self.wih.copy(),
    self.who.copy()))
    intermediate_weights
    def confusion_matrix(self, data_array, labels):
    cm = {}
    for i in range(len(data_array)):
    res = self.run(data_array[i])
    res_max = res.argmax()
    target = labels[i][0]
    if (target, res_max) in cm:
    cm[(target, res_max)] += 1
    else:
    cm[(target, res_max)] = 1
    return cm
    def run(self, input_vector):
    """ input_vector can be tuple, list or ndarray """
    input_vector = np.array(input_vector, ndmin=2).T
    output_vector = np.dot(self.wih,
    input_vector)
    output_vector = activation_function(output_vector)
    output_vector = np.dot(self.who,
    output_vector)
    output_vector = activation_function(output_vector)
    return output_vector
    def evaluate(self, data, labels):
    corrects, wrongs = 0, 0
    for i in range(len(data)):
    res = self.run(data[i])
    res_max = res.argmax()
    if res_max == labels[i]:
    corrects += 1
    else:
    wrongs += 1
    return corrects, wrongs
    213
    epochs = 10
    ANN = NeuralNetwork(no_of_in_nodes = image_pixels,
    no_of_out_nodes = 10,
    no_of_hidden_nodes = 100,
    learning_rate = 0.15)
    weights = ANN.train(train_imgs,
    train_labels_one_hot,
    epochs=epochs,
    intermediate_results=True)
    **********
    cm = ANN.confusion_matrix(train_imgs, train_labels)
    print(ANN.run(train_imgs[i]))
    [[2.60149245e-03]
    [2.52542556e-03]
    [6.57990628e-03]
    [1.32663729e-03]
    [1.34985384e-03]
    [2.63840265e-04]
    [2.18329159e-04]
    [1.32693720e-04]
    [9.84326084e-01]
    [4.34559417e-02]]
    cm = list(cm.items())
    print(sorted(cm))
    214
    [((0.0, 0), 5853), ((0.0, 1), 1), ((0.0, 2), 3), ((0.0, 4), 8),
    ((0.0, 5), 2), ((0.0, 6), 12), ((0.0, 7), 7), ((0.0, 8), 27),
    ((0.0, 9), 10), ((1.0, 0), 1), ((1.0, 1), 6674), ((1.0, 2), 17),
    ((1.0, 3), 5), ((1.0, 4), 14), ((1.0, 5), 2), ((1.0, 6), 1),
    ((1.0, 7), 6), ((1.0, 8), 15), ((1.0, 9), 7), ((2.0, 0), 37),
    ((2.0, 1), 14), ((2.0, 2), 5791), ((2.0, 3), 17), ((2.0, 4), 11),
    ((2.0, 5), 2), ((2.0, 6), 10), ((2.0, 7), 15), ((2.0, 8), 51),
    ((2.0, 9), 10), ((3.0, 0), 16), ((3.0, 1), 5), ((3.0, 2), 34),
    ((3.0, 3), 5869), ((3.0, 4), 8), ((3.0, 5), 57), ((3.0, 6), 4),
    ((3.0, 7), 20), ((3.0, 8), 58), ((3.0, 9), 60), ((4.0, 0), 14),
    ((4.0, 1), 6), ((4.0, 2), 8), ((4.0, 3), 1), ((4.0, 4), 5678),
    ((4.0, 5), 1), ((4.0, 6), 14), ((4.0, 7), 5), ((4.0, 8), 11),
    ((4.0, 9), 104), ((5.0, 0), 7), ((5.0, 1), 2), ((5.0, 2), 6),
    ((5.0, 3), 27), ((5.0, 4), 5), ((5.0, 5), 5312), ((5.0, 6), 12),
    ((5.0, 7), 5), ((5.0, 8), 20), ((5.0, 9), 25), ((6.0, 0), 32),
    ((6.0, 1), 5), ((6.0, 2), 1), ((6.0, 4), 10), ((6.0, 5), 52),
    ((6.0, 6), 5791), ((6.0, 8), 26), ((6.0, 9), 1), ((7.0, 0), 5),
    ((7.0, 1), 11), ((7.0, 2), 22), ((7.0, 3), 2), ((7.0, 4), 17),
    ((7.0, 5), 3), ((7.0, 6), 2), ((7.0, 7), 6074), ((7.0, 8), 26),
    ((7.0, 9), 103), ((8.0, 0), 20), ((8.0, 1), 18), ((8.0, 2), 9),
    ((8.0, 3), 14), ((8.0, 4), 27), ((8.0, 5), 24), ((8.0, 6), 9),
    ((8.0, 7), 8), ((8.0, 8), 5668), ((8.0, 9), 54), ((9.0, 0), 26),
    ((9.0, 1), 2), ((9.0, 2), 2), ((9.0, 3), 16), ((9.0, 4), 69),
    ((9.0, 5), 14), ((9.0, 6), 7), ((9.0, 7), 19), ((9.0, 8), 15),
    ((9.0, 9), 5779)]
    In [ ]:
    for i in range(epochs):
    print("epoch: ", i)
    ANN.wih = weights[i][0]
    ANN.who = weights[i][1]
    corrects, wrongs = ANN.evaluate(train_imgs, train_labels)
    print("accuracy train: ", corrects / ( corrects + wrongs))
    corrects, wrongs = ANN.evaluate(test_imgs, test_labels)
    print("accuracy: test", corrects / ( corrects + wrongs))