多路运行(Multiple Runs)
Section outline
-
重复训练:Epochs
我们可以重复训练多次。每次完整的训练循环被称为一个 “epoch”(或“训练轮次”)。
Pythonimport numpy as np # 假设之前的 NeuralNetwork 类、sigmoid 和 truncated_normal 函数已定义并可用 # 导入之前保存的数据 import pickle data_path = "data/mnist/" # 确保路径正确 with open(data_path + "pickled_mnist.pkl", "br") as fh: data = pickle.load(fh) train_imgs = data[0] test_imgs = data[1] train_labels = data[2] test_labels = data[3] train_labels_one_hot = data[4] test_labels_one_hot = data[5] image_size = 28 image_pixels = image_size * image_size no_of_different_labels = 10 # 激活函数和权重初始化辅助函数 @np.vectorize def sigmoid(x): return 1 / (1 + np.e ** -x) activation_function = sigmoid from scipy.stats import truncnorm def truncated_normal(mean=0, sd=1, low=0, upp=10): return truncnorm((low - mean) / sd, (upp - mean) / sd, loc=mean, scale=sd) # 定义 NeuralNetwork 类 class NeuralNetwork: def __init__(self, no_of_in_nodes, no_of_out_nodes, no_of_hidden_nodes, learning_rate): self.no_of_in_nodes = no_of_in_nodes self.no_of_out_nodes = no_of_out_nodes self.no_of_hidden_nodes = no_of_hidden_nodes self.learning_rate = learning_rate self.create_weight_matrices() def create_weight_matrices(self): rad = 1 / np.sqrt(self.no_of_in_nodes) X = truncated_normal(mean=0, sd=1, low=-rad, upp=rad) self.wih = X.rvs((self.no_of_hidden_nodes, self.no_of_in_nodes)) rad = 1 / np.sqrt(self.no_of_hidden_nodes) X = truncated_normal(mean=0, sd=1, low=-rad, upp=rad) self.who = X.rvs((self.no_of_out_nodes, self.no_of_hidden_nodes)) def train_single(self, input_vector, target_vector): """ input_vector 和 target_vector 可以是元组、列表或 ndarray。 这个方法执行一次前向传播和一次反向传播(单个样本)。 """ input_vector = np.array(input_vector, ndmin=2).T target_vector = np.array(target_vector, ndmin=2).T # 前向传播 output_vector1 = np.dot(self.wih, input_vector) output_hidden = activation_function(output_vector1) output_vector2 = np.dot(self.who, output_hidden) output_network = activation_function(output_vector2) # 计算误差 output_errors = target_vector - output_network # 更新隐藏层到输出层的权重 (who) tmp = output_errors * output_network * (1.0 - output_network) tmp = self.learning_rate * np.dot(tmp, output_hidden.T) self.who += tmp # 计算隐藏层误差 hidden_errors = np.dot(self.who.T, output_errors) # 更新输入层到隐藏层的权重 (wih) tmp = hidden_errors * output_hidden * (1.0 - output_hidden) self.wih += self.learning_rate * np.dot(tmp, input_vector.T) def train(self, data_array, labels_one_hot_array, epochs=1, intermediate_results=False): """ 这个方法在整个数据集上重复训练多个 epoch。 如果 intermediate_results 为 True,则返回每个 epoch 后的权重。 """ intermediate_weights = [] for epoch in range(epochs): print(f"Epoch: {epoch+1}/{epochs}", end="\r") # 打印当前 epoch 进度 for i in range(len(data_array)): self.train_single(data_array[i], labels_one_hot_array[i]) # 在每个 epoch 结束时,评估并打印准确率 corrects, wrongs = self.evaluate(train_imgs, train_labels) train_accuracy = corrects / (corrects + wrongs) corrects, wrongs = self.evaluate(test_imgs, test_labels) test_accuracy = corrects / (corrects + wrongs) print(f"Epoch: {epoch+1}/{epochs} - 训练准确率: {train_accuracy:.4f}, 测试准确率: {test_accuracy:.4f}") if intermediate_results: intermediate_weights.append((self.wih.copy(), self.who.copy())) return intermediate_weights if intermediate_results else None def confusion_matrix(self, data_array, labels): cm = {} # 使用字典存储,因为矩阵可能稀疏 for i in range(len(data_array)): res = self.run(data_array[i]) res_max = res.argmax() target = labels[i][0] # 真实的标签 # 将浮点型标签转换为整数,作为字典的键 key = (int(target), res_max) cm[key] = cm.get(key, 0) + 1 # 增加计数 return cm def run(self, input_vector): input_vector = np.array(input_vector, ndmin=2).T output_vector = np.dot(self.wih, input_vector) output_vector = activation_function(output_vector) output_vector = np.dot(self.who, output_vector) output_vector = activation_function(output_vector) return output_vector def evaluate(self, data, labels): corrects, wrongs = 0, 0 for i in range(len(data)): res = self.run(data[i]) res_max = res.argmax() # 确保标签是整数进行比较 if res_max == int(labels[i][0]): corrects += 1 else: wrongs += 1 return corrects, wrongs # --- 运行训练 --- epochs_to_run = 3 # 设定训练轮次 NN = NeuralNetwork(no_of_in_nodes=image_pixels, no_of_out_nodes=10, no_of_hidden_nodes=100, learning_rate=0.1) print("开始多轮训练...") # 直接调用新的 train 方法,它会打印每个 epoch 的准确率 NN.train(train_imgs, train_labels_one_hot, epochs=epochs_to_run, intermediate_results=False) # 这里不需要存储中间权重 print("\n多轮训练完成。") # 再次评估最终准确率(通常会比 epoch 结束时的最后一次打印的更准确,因为训练是连续的) corrects_train_final, wrongs_train_final = NN.evaluate(train_imgs, train_labels) print(f"最终训练准确率: {corrects_train_final / (corrects_train_final + wrongs_train_final):.4f}") corrects_test_final, wrongs_test_final = NN.evaluate(test_imgs, test_labels) print(f"最终测试准确率: {corrects_test_final / (corrects_test_final + wrongs_test_final):.4f}")
输出示例(每次运行可能略有不同,但趋势应是准确率逐渐提高):
开始多轮训练... Epoch: 1/3 - 训练准确率: 0.9452, 测试准确率: 0.9459 Epoch: 2/3 - 训练准确率: 0.9627, 测试准确率: 0.9582 Epoch: 3/3 - 训练准确率: 0.9699, 测试准确率: 0.9626 多轮训练完成。 最终训练准确率: 0.9699 最终测试准确率: 0.9626
为了重复训练,我们对
NeuralNetwork
类进行了以下修改:-
train_single
方法:这个方法基本上就是之前被称为train
的逻辑,它负责对单个输入-目标对执行一次前向传播和反向传播以更新权重。 -
新的
train
方法:这个方法现在负责管理训练的“epoch”计数。它会循环执行指定次数的 epoch,在每个 epoch 内遍历整个训练数据集,并调用train_single
方法来更新权重。 -
中间结果存储:为了测试目的,我们增加了
intermediate_results
参数。如果设置为True
,它会在每个 epoch 结束后,将当前的权重矩阵self.wih
和self.who
的副本保存到intermediate_weights
列表中并返回。这对于分析训练过程中模型性能的变化非常有用。 -
混淆矩阵字典:为了更好地处理可能稀疏的混淆矩阵,
confusion_matrix
方法现在使用字典来存储 (实际标签, 预测标签) 的计数,而不是固定的 NumPy 数组。这样可以更灵活地处理各种标签组合,并且对于那些从未出现过的错误分类,它不会在内存中占用空间。 -
评估标签类型:在
evaluate
和confusion_matrix
方法中,我们确保在与res_max
(预测的整数索引)比较或作为字典键使用时,真实的标签也转换为整数类型(例如int(labels[i][0])
)。
通过这些改进,我们能够更清晰地组织训练过程,并更好地观察神经网络在多个训练轮次中的性能提升。您可以看到,随着 epoch 的增加,训练集和测试集的准确率都在稳步提升,这表明网络正在有效地从数据中学习。
We can repeat the training multiple times. Each run is called an "epoch".
epochs = 3
NN = NeuralNetwork(no_of_in_nodes = image_pixels,
no_of_out_nodes = 10,
no_of_hidden_nodes = 100,
learning_rate = 0.1)
for epoch in range(epochs):
print("epoch: ", epoch)
for i in range(len(train_imgs)):
NN.train(train_imgs[i],
train_labels_one_hot[i])
corrects, wrongs = NN.evaluate(train_imgs, train_labels)
print("accuracy train: ", corrects / ( corrects + wrongs))
corrects, wrongs = NN.evaluate(test_imgs, test_labels)
print("accuracy: test", corrects / ( corrects + wrongs))
epoch: 0
accruracy train: 0.94515
accruracy: test 0.9459
epoch: 1
accruracy train: 0.9626833333333333
accruracy: test 0.9582
epoch: 2
accruracy train: 0.96995
accruracy: test 0.9626
We want to do the multiple training of the training set inside of our network. To this purpose we rewrite the
method train and add a method train_single. train_single is more or less what we called 'train' before. Whereas
the new 'train' method is doing the epoch counting. For testing purposes, we save the weight matrices after
each epoch in
the list intermediate_weights. This list is returned as the output of train:
import numpy as np
@np.vectorize
def sigmoid(x):
210
return 1 / (1 + np.e ** -x)
activation_function = sigmoid
from scipy.stats import truncnorm
def truncated_normal(mean=0, sd=1, low=0, upp=10):
return truncnorm((low - mean) / sd,
(upp - mean) / sd,
loc=mean,
scale=sd)
class NeuralNetwork:
def __init__(self,
no_of_in_nodes,
no_of_out_nodes,
no_of_hidden_nodes,
learning_rate):
self.no_of_in_nodes = no_of_in_nodes
self.no_of_out_nodes = no_of_out_nodes
self.no_of_hidden_nodes = no_of_hidden_nodes
self.learning_rate = learning_rate
self.create_weight_matrices()
def create_weight_matrices(self):
""" A method to initialize the weight matrices of the neur
al network"""
rad = 1 / np.sqrt(self.no_of_in_nodes)
X = truncated_normal(mean=0,
sd=1,
low=-rad,
upp=rad)
self.wih = X.rvs((self.no_of_hidden_nodes,
self.no_of_in_nodes))
rad = 1 / np.sqrt(self.no_of_hidden_nodes)
X = truncated_normal(mean=0,
sd=1,
low=-rad,
upp=rad)
self.who = X.rvs((self.no_of_out_nodes,
self.no_of_hidden_nodes))
def train_single(self, input_vector, target_vector):
211
n)
r.T)
def"""
input_vector and target_vector can be tuple,
list or ndarray
"""
output_vectors = []
input_vector = np.array(input_vector, ndmin=2).T
target_vector = np.array(target_vector, ndmin=2).T
output_vector1 = np.dot(self.wih,
input_vector)
output_hidden = activation_function(output_vector1)
output_vector2 = np.dot(self.who,
output_hidden)
output_network = activation_function(output_vector2)
output_errors = target_vector - output_network
# update the weights:
tmp = output_errors * output_network * \
(1.0 - output_network)
tmp = self.learning_rate * np.dot(tmp,
output_hidden.T)
self.who += tmp
# calculate hidden errors:
hidden_errors = np.dot(self.who.T,
output_errors)
# update the weights:
tmp = hidden_errors * output_hidden * (1.0 - output_hidde
self.wih += self.learning_rate * np.dot(tmp, input_vecto
train(self, data_array,
labels_one_hot_array,
epochs=1,
intermediate_results=False):
intermediate_weights = []
for epoch in range(epochs):
print("*", end="")
for i in range(len(data_array)):
212
ifreturnself.train_single(data_array[i],
labels_one_hot_array[i])
intermediate_results:
intermediate_weights.append((self.wih.copy(),
self.who.copy()))
intermediate_weights
def confusion_matrix(self, data_array, labels):
cm = {}
for i in range(len(data_array)):
res = self.run(data_array[i])
res_max = res.argmax()
target = labels[i][0]
if (target, res_max) in cm:
cm[(target, res_max)] += 1
else:
cm[(target, res_max)] = 1
return cm
def run(self, input_vector):
""" input_vector can be tuple, list or ndarray """
input_vector = np.array(input_vector, ndmin=2).T
output_vector = np.dot(self.wih,
input_vector)
output_vector = activation_function(output_vector)
output_vector = np.dot(self.who,
output_vector)
output_vector = activation_function(output_vector)
return output_vector
def evaluate(self, data, labels):
corrects, wrongs = 0, 0
for i in range(len(data)):
res = self.run(data[i])
res_max = res.argmax()
if res_max == labels[i]:
corrects += 1
else:
wrongs += 1
return corrects, wrongs
213
epochs = 10
ANN = NeuralNetwork(no_of_in_nodes = image_pixels,
no_of_out_nodes = 10,
no_of_hidden_nodes = 100,
learning_rate = 0.15)
weights = ANN.train(train_imgs,
train_labels_one_hot,
epochs=epochs,
intermediate_results=True)
**********
cm = ANN.confusion_matrix(train_imgs, train_labels)
print(ANN.run(train_imgs[i]))
[[2.60149245e-03]
[2.52542556e-03]
[6.57990628e-03]
[1.32663729e-03]
[1.34985384e-03]
[2.63840265e-04]
[2.18329159e-04]
[1.32693720e-04]
[9.84326084e-01]
[4.34559417e-02]]
cm = list(cm.items())
print(sorted(cm))
214
[((0.0, 0), 5853), ((0.0, 1), 1), ((0.0, 2), 3), ((0.0, 4), 8),
((0.0, 5), 2), ((0.0, 6), 12), ((0.0, 7), 7), ((0.0, 8), 27),
((0.0, 9), 10), ((1.0, 0), 1), ((1.0, 1), 6674), ((1.0, 2), 17),
((1.0, 3), 5), ((1.0, 4), 14), ((1.0, 5), 2), ((1.0, 6), 1),
((1.0, 7), 6), ((1.0, 8), 15), ((1.0, 9), 7), ((2.0, 0), 37),
((2.0, 1), 14), ((2.0, 2), 5791), ((2.0, 3), 17), ((2.0, 4), 11),
((2.0, 5), 2), ((2.0, 6), 10), ((2.0, 7), 15), ((2.0, 8), 51),
((2.0, 9), 10), ((3.0, 0), 16), ((3.0, 1), 5), ((3.0, 2), 34),
((3.0, 3), 5869), ((3.0, 4), 8), ((3.0, 5), 57), ((3.0, 6), 4),
((3.0, 7), 20), ((3.0, 8), 58), ((3.0, 9), 60), ((4.0, 0), 14),
((4.0, 1), 6), ((4.0, 2), 8), ((4.0, 3), 1), ((4.0, 4), 5678),
((4.0, 5), 1), ((4.0, 6), 14), ((4.0, 7), 5), ((4.0, 8), 11),
((4.0, 9), 104), ((5.0, 0), 7), ((5.0, 1), 2), ((5.0, 2), 6),
((5.0, 3), 27), ((5.0, 4), 5), ((5.0, 5), 5312), ((5.0, 6), 12),
((5.0, 7), 5), ((5.0, 8), 20), ((5.0, 9), 25), ((6.0, 0), 32),
((6.0, 1), 5), ((6.0, 2), 1), ((6.0, 4), 10), ((6.0, 5), 52),
((6.0, 6), 5791), ((6.0, 8), 26), ((6.0, 9), 1), ((7.0, 0), 5),
((7.0, 1), 11), ((7.0, 2), 22), ((7.0, 3), 2), ((7.0, 4), 17),
((7.0, 5), 3), ((7.0, 6), 2), ((7.0, 7), 6074), ((7.0, 8), 26),
((7.0, 9), 103), ((8.0, 0), 20), ((8.0, 1), 18), ((8.0, 2), 9),
((8.0, 3), 14), ((8.0, 4), 27), ((8.0, 5), 24), ((8.0, 6), 9),
((8.0, 7), 8), ((8.0, 8), 5668), ((8.0, 9), 54), ((9.0, 0), 26),
((9.0, 1), 2), ((9.0, 2), 2), ((9.0, 3), 16), ((9.0, 4), 69),
((9.0, 5), 14), ((9.0, 6), 7), ((9.0, 7), 19), ((9.0, 8), 15),
((9.0, 9), 5779)]
In [ ]:
for i in range(epochs):
print("epoch: ", i)
ANN.wih = weights[i][0]
ANN.who = weights[i][1]
corrects, wrongs = ANN.evaluate(train_imgs, train_labels)
print("accuracy train: ", corrects / ( corrects + wrongs))
corrects, wrongs = ANN.evaluate(test_imgs, test_labels)
print("accuracy: test", corrects / ( corrects + wrongs)) -