实验15 验证LSTM模型的长程依赖
本次实验依托上个实验
一 模型构建
创建LSTM模型,里面包括初始化init函数、初始化权重函数_init_weights、初始化状态函数init_state、前向传播函数forward。
init函数:虽然每个时间的输入和隐层都是一样的,但是他们会有四组不同的权重,最终产生输入门、输出门、遗忘门和真实的结果。所以要初始化四组权重W、U、b。例如四组W都是input_size*hidden_size大小的随机数。
init_state:初始化隐状态和单元状态为batch*hidden大小的全零向量。
Forward:首先获取输入的形状为batch_size、seq_len、input_size,然后将隐层的初始状态和单元状态进行初始化。定义相应的门状态和单元状态向量列表Is, Fs, Os, Cs,对于每个序列,开始计算LSTM,获取这个时刻的输入,然后计算得到输入门、遗忘门、输出门、真实结果,根据得到的结果更新单元状态,同时得到隐层状态,最后存储门状态向量和单元状态向量到列表中。
代码:
class LSTM(nn.Module):
def __init__(self, input_size, hidden_size):
super(LSTM, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
# 初始化模型参数
self.W_i = nn.Parameter(torch.randn(input_size, hidden_size))
self.W_f = nn.Parameter(torch.randn(input_size, hidden_size))
self.W_o = nn.Parameter(torch.randn(input_size, hidden_size))
self.W_a = nn.Parameter(torch.randn(input_size, hidden_size))
self.U_i = nn.Parameter(torch.randn(hidden_size, hidden_size))
self.U_f = nn.Parameter(torch.randn(hidden_size, hidden_size))
self.U_o = nn.Parameter(torch.randn(hidden_size, hidden_size))
self.U_a = nn.Parameter(torch.randn(hidden_size, hidden_size))
self.b_i = nn.Parameter(torch.zeros(1, hidden_size))
self.b_f = nn.Parameter(torch.zeros(1, hidden_size))
self.b_o = nn.Parameter(torch.zeros(1, hidden_size))
self.b_a = nn.Parameter(torch.zeros(1, hidden_size))
# 参数初始化
self.apply(self._init_weights)
def _init_weights(self, m):
if isinstance(m, nn.Linear):
init.xavier_uniform_(m.weight)
def init_state(self, batch_size):
# 初始化隐状态和单元状态
hidden_state = torch.zeros(batch_size, self.hidden_size)
cell_state = torch.zeros(batch_size, self.hidden_size)
return hidden_state, cell_state
def forward(self, inputs, states=None):
batch_size, seq_len, input_size = inputs.shape # inputs shape: (batch_size, seq_len, input_size)
if states is None:
states = self.init_state(batch_size)
hidden_state, cell_state = states
# 定义相应的门状态和单元状态向量列表
Is, Fs, Os, Cs = [], [], [], []
# 执行LSTM计算,包括输入门、遗忘门、输出门、候选状态、状态和隐状态
for step in range(seq_len):
input_step = inputs[:, step, :]
I_gate = torch.sigmoid(torch.matmul(input_step, self.W_i) + torch.matmul(hidden_state, self.U_i) + self.b_i)
F_gate = torch.sigmoid(torch.matmul(input_step, self.W_f) + torch.matmul(hidden_state, self.U_f) + self.b_f)
O_gate = torch.sigmoid(torch.matmul(input_step, self.W_o) + torch.matmul(hidden_state, self.U_o) + self.b_o)
C_tilde = torch.tanh(torch.matmul(input_step, self.W_a) + torch.matmul(hidden_state, self.U_a) + self.b_a)
cell_state = F_gate * cell_state + I_gate * C_tilde
hidden_state = O_gate * torch.tanh(cell_state)
# 存储门状态向量和单元状态向量
Is.append(I_gate.detach().cpu().numpy())
Fs.append(F_gate.detach().cpu().numpy())
Os.append(O_gate.detach().cpu().numpy())
Cs.append(cell_state.detach().cpu().numpy())
return hidden_state
二 模型训练
所有的都和上个实验一样,就是在这个实验中需要实例化LSTM模型,然后把实例化之后的base_model作为参数实例化Model_RNN4SeqClass。调用train函数开始训练。
代码:
import os
import random
import numpy as np
import torch
from matplotlib import pyplot as plt
from torch import optim
from torch.utils.data import Dataset, DataLoader
#数据集构建
# 固定随机种子
random.seed(0)
np.random.seed(0)
class DigitSumDataset(Dataset):
def __init__(self, data_path=None, length=None, k=None, mode='train'):
"""
初始化数据集
如果传入了data_path,则从文件加载数据;
如果传入了length和k,则生成数据集。
参数:
data_path: 存放数据集的目录,用于加载数据
length: 数据序列的长度(仅用于生成数据集时)
k: 数据增强的数量(仅用于生成数据集时)
mode: 'train'/'dev'/'test',决定生成训练集、验证集还是测试集(仅用于生成数据集时)
"""
self.data_path = data_path
self.length = length
self.k = k
self.mode = mode
if data_path: # 从文件加载数据
self.examples = self.load_data(data_path)
else: # 生成数据
if length < 3 or k <= 0:
raise ValueError("The length of data should be greater than 2 and k should be greater than 0.")
self.examples = self.generate_data()
def generate_data(self):
"""
生成数据:生成指定长度的数字序列,并进行数据增强
"""
base_examples = []
for n1 in range(0, 10):
for n2 in range(0, 10):
seq = [n1, n2] + [0] * (self.length - 2)
label = n1 + n2
base_examples.append((seq, label))
examples = []
for base_example in base_examples:
for _ in range(self.k):
idx = np.random.randint(2, self.length)
val = np.random.randint(0, 10)
seq = base_example[0].copy()
label = base_example[1]
seq[idx] = val
examples.append((seq, label))
return examples
def load_data(self, data_path):
"""
从文件加载数据
"""
def _load_file(file_path):
examples = []
with open(file_path, "r", encoding="utf-8") as f:
for line in f.readlines():
items = line.strip().split("\t")
seq = [int(i) for i in items[0].split(" ")]
label = int(items[1])
examples.append((seq, label))
return examples
# 加载训练集、验证集、测试集
train_examples = _load_file(os.path.join(data_path, "train.txt"))
dev_examples = _load_file(os.path.join(data_path, "dev.txt"))
test_examples = _load_file(os.path.join(data_path, "test.txt"))
return train_examples if self.mode == 'train' else dev_examples if self.mode == 'dev' else test_examples
def __len__(self):
return len(self.examples)
def __getitem__(self, idx):
seq, label = self.examples[idx]
seq_tensor = torch.tensor(seq, dtype=torch.long)
label_tensor = torch.tensor(label, dtype=torch.long)
return seq_tensor, label_tensor
# 设定数据集的路径和生成参数
lengths = [5, 10, 15, 20, 25, 30, 35]
# lengths = [5]
k_train = 3 # 训练集数据增强的数量
k_test_val = 1 # 验证集和测试集数据增强的数量
#总的模型
import torch
import torch.nn as nn
import torch.nn.functional as F
# 嵌入层
class Embedding(nn.Module):
def __init__(self, num_embeddings, embedding_dim):
super(Embedding, self).__init__()
self.W = nn.init.xavier_uniform_(torch.empty(num_embeddings, embedding_dim), gain=1.0)
def forward(self, inputs):
inputs = inputs.long() # 确保是 LongTensor 类型
embs = self.W[inputs] # 根据输入索引返回嵌入向量
return embs
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import init
class LSTM(nn.Module):
def __init__(self, input_size, hidden_size):
super(LSTM, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
# 初始化模型参数
self.W_i = nn.Parameter(torch.randn(input_size, hidden_size))
self.W_f = nn.Parameter(torch.randn(input_size, hidden_size))
self.W_o = nn.Parameter(torch.randn(input_size, hidden_size))
self.W_a = nn.Parameter(torch.randn(input_size, hidden_size))
self.U_i = nn.Parameter(torch.randn(hidden_size, hidden_size))
self.U_f = nn.Parameter(torch.randn(hidden_size, hidden_size))
self.U_o = nn.Parameter(torch.randn(hidden_size, hidden_size))
self.U_a = nn.Parameter(torch.randn(hidden_size, hidden_size))
self.b_i = nn.Parameter(torch.zeros(1, hidden_size))
self.b_f = nn.Parameter(torch.zeros(1, hidden_size))
self.b_o = nn.Parameter(torch.zeros(1, hidden_size))
self.b_a = nn.Parameter(torch.zeros(1, hidden_size))
# 参数初始化
self.apply(self._init_weights)
def _init_weights(self, m):
if isinstance(m, nn.Linear):
init.xavier_uniform_(m.weight)
def init_state(self, batch_size):
# 初始化隐状态和单元状态
hidden_state = torch.zeros(batch_size, self.hidden_size)
cell_state = torch.zeros(batch_size, self.hidden_size)
return hidden_state, cell_state
def forward(self, inputs, states=None):
batch_size, seq_len, input_size = inputs.shape # inputs shape: (batch_size, seq_len, input_size)
if states is None:
states = self.init_state(batch_size)
hidden_state, cell_state = states
# 定义相应的门状态和单元状态向量列表
Is, Fs, Os, Cs = [], [], [], []
# 执行LSTM计算,包括输入门、遗忘门、输出门、候选状态、状态和隐状态
for step in range(seq_len):
input_step = inputs[:, step, :]
I_gate = torch.sigmoid(torch.matmul(input_step, self.W_i) + torch.matmul(hidden_state, self.U_i) + self.b_i)
F_gate = torch.sigmoid(torch.matmul(input_step, self.W_f) + torch.matmul(hidden_state, self.U_f) + self.b_f)
O_gate = torch.sigmoid(torch.matmul(input_step, self.W_o) + torch.matmul(hidden_state, self.U_o) + self.b_o)
C_tilde = torch.tanh(torch.matmul(input_step, self.W_a) + torch.matmul(hidden_state, self.U_a) + self.b_a)
cell_state = F_gate * cell_state + I_gate * C_tilde
hidden_state = O_gate * torch.tanh(cell_state)
# 存储门状态向量和单元状态向量
Is.append(I_gate.detach().cpu().numpy())
Fs.append(F_gate.detach().cpu().numpy())
Os.append(O_gate.detach().cpu().numpy())
Cs.append(cell_state.detach().cpu().numpy())
return hidden_state
# 基于RNN实现数字预测的模型
class Model_RNN4SeqClass(nn.Module):
def __init__(self, model, num_digits, input_size, hidden_size, num_classes):
super(Model_RNN4SeqClass, self).__init__()
# 传入实例化的RNN层,例如SRN
self.rnn_model = model
# 词典大小
self.num_digits = num_digits
# 嵌入向量的维度
self.input_size = input_size
# 定义Embedding层
self.embedding = Embedding(num_digits, input_size)
# 定义线性层
self.linear = nn.Linear(hidden_size, num_classes)
def forward(self, inputs):
inputs_emb = self.embedding(inputs)
hidden_state= self.rnn_model(inputs_emb) # 获取门状态
logits = self.linear(hidden_state)
return logits
#模型训练
class Runner:
def __init__(self, model, train_loader, val_loader, test_loader, criterion, optimizer):
self.model = model
self.train_loader = train_loader
self.val_loader = val_loader
self.test_loader = test_loader
self.criterion = criterion
self.optimizer = optimizer
self.best_model = None
self.best_val_loss = float('inf')
self.train_losses = [] # 存储训练损失
self.val_losses = [] # 存储验证损失
def train(self, epochs):
for epoch in range(epochs):
self.model.train()
running_loss = 0.0
for inputs, labels in self.train_loader:
self.optimizer.zero_grad()
outputs = self.model(inputs)
loss = self.criterion(outputs, labels)
loss.backward()
self.optimizer.step()
running_loss += loss.item()
# 计算平均训练损失
train_loss = running_loss / len(self.train_loader)
self.train_losses.append(train_loss)
# 计算验证集上的损失
val_loss = self.evaluate()
self.val_losses.append(val_loss)
print(f'Epoch [{epoch + 1}/{epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')
# 如果验证集上的损失最小,保存模型
if val_loss < self.best_val_loss:
self.best_val_loss = val_loss
self.best_model = self.model.state_dict()
plt.figure(figsize=(10, 6))
plt.plot(self.train_losses, label='Train Loss')
plt.plot(self.val_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Loss Curve')
plt.legend()
plt.grid()
plt.show()
def evaluate(self):
self.model.eval()
val_loss = 0.0
with torch.no_grad():
for inputs, labels in self.val_loader:
outputs = self.model(inputs)
loss = self.criterion(outputs, labels)
val_loss += loss.item()
return val_loss / len(self.val_loader)
def test(self):
self.model.load_state_dict(self.best_model)
self.model.eval()
correct = 0
total = 0
with torch.no_grad():
for inputs, labels in self.test_loader:
outputs = self.model(inputs)
_, predicted = torch.max(outputs, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
test_accuracy = correct / total
print(f'Test Accuracy: {test_accuracy:.4f}')
def predict(self, image):
self.model.eval()
with torch.no_grad():
output = self.model(image)
_, predicted = torch.max(output, 1)
return predicted.item()
# 循环不同长度的序列
for length in lengths:
# 生成训练集
train_dataset = DigitSumDataset(length=length, k=k_train, mode='train')
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
# 生成验证集
dev_dataset = DigitSumDataset(length=length, k=k_test_val, mode='dev')
dev_loader = DataLoader(dev_dataset, batch_size=64, shuffle=False)
# 生成测试集
test_dataset = DigitSumDataset(length=length, k=k_test_val, mode='test')
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
# 输入数字的类别数
num_digits = 10
# 将数字映射为向量的维度
input_size = 32
# 隐状态向量的维度s
hidden_size = 32
# 预测数字的类别数
num_classes = 19
base_model = LSTM(input_size, hidden_size)
model = Model_RNN4SeqClass(base_model, num_digits, input_size, hidden_size, num_classes)
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 创建Runner实例,并进行训练
runner = Runner(model, train_loader, dev_loader, test_loader, criterion, optimizer)
print(f"Training model for sequence length {length}...")
# 训练并测试模型
runner.train(epochs=600) # 训练模型
三 模型评价
调用test函数计算模型在测试集上的准确率
代码:
runner.test() # 测试模型
四 将自定义LSTM与pytorch内置的LSTM进行对比
创建一个PyTorchLSTM模型,其中init函数中定义的lstm直接调用nn.LSTM。
然后调用 compare_models函数,在里面对两个模型进行对比,对比他们的模型在一次前向传播的计算时间、运行60次的平均运行时间、LSTM和PyTorch LSTM模型的参数总数、模型输出
代码:
import torch
import torch.nn as nn
import time
import numpy as np
from torch.nn import init
class LSTM(nn.Module):
def __init__(self, input_size, hidden_size):
super(LSTM, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
# 初始化模型参数
self.W_i = nn.Parameter(torch.randn(input_size, hidden_size))
self.W_f = nn.Parameter(torch.randn(input_size, hidden_size))
self.W_o = nn.Parameter(torch.randn(input_size, hidden_size))
self.W_a = nn.Parameter(torch.randn(input_size, hidden_size))
self.U_i = nn.Parameter(torch.randn(hidden_size, hidden_size))
self.U_f = nn.Parameter(torch.randn(hidden_size, hidden_size))
self.U_o = nn.Parameter(torch.randn(hidden_size, hidden_size))
self.U_a = nn.Parameter(torch.randn(hidden_size, hidden_size))
self.b_i = nn.Parameter(torch.zeros(1, hidden_size))
self.b_f = nn.Parameter(torch.zeros(1, hidden_size))
self.b_o = nn.Parameter(torch.zeros(1, hidden_size))
self.b_a = nn.Parameter(torch.zeros(1, hidden_size))
# 参数初始化
self.apply(self._init_weights)
def _init_weights(self, m):
if isinstance(m, nn.Linear):
init.xavier_uniform_(m.weight)
def init_state(self, batch_size):
# 初始化隐状态和单元状态
hidden_state = torch.zeros(batch_size, self.hidden_size)
cell_state = torch.zeros(batch_size, self.hidden_size)
return hidden_state, cell_state
def forward(self, inputs, states=None):
batch_size, seq_len, input_size = inputs.shape # inputs shape: (batch_size, seq_len, input_size)
if states is None:
states = self.init_state(batch_size)
hidden_state, cell_state = states
# 执行LSTM计算,包括输入门、遗忘门、输出门、候选状态、状态和隐状态
for step in range(seq_len):
input_step = inputs[:, step, :]
I_gate = torch.sigmoid(torch.matmul(input_step, self.W_i) + torch.matmul(hidden_state, self.U_i) + self.b_i)
F_gate = torch.sigmoid(torch.matmul(input_step, self.W_f) + torch.matmul(hidden_state, self.U_f) + self.b_f)
O_gate = torch.sigmoid(torch.matmul(input_step, self.W_o) + torch.matmul(hidden_state, self.U_o) + self.b_o)
C_tilde = torch.tanh(torch.matmul(input_step, self.W_a) + torch.matmul(hidden_state, self.U_a) + self.b_a)
cell_state = F_gate * cell_state + I_gate * C_tilde
hidden_state = O_gate * torch.tanh(cell_state)
return hidden_state
# PyTorch内置LSTM模型
class PyTorchLSTM(nn.Module):
def __init__(self, input_size, hidden_size):
super(PyTorchLSTM, self).__init__()
self.lstm = nn.LSTM(input_size, hidden_size)
def forward(self, inputs, states=None):
output, (hn, cn) = self.lstm(inputs, states)
return hn[-1]
# 输入参数设置
input_size = 10
hidden_size = 20
batch_size = 32
seq_len = 50
# 创建随机输入数据
inputs = torch.randn(batch_size, seq_len, input_size)
# 初始化自定义LSTM模型和PyTorch LSTM模型
custom_lstm = LSTM(input_size, hidden_size)
pytorch_lstm = PyTorchLSTM(input_size, hidden_size)
# 计算两个模型的输出,记录时间和参数量
def compare_models(inputs, custom_lstm, pytorch_lstm):
# 1. 计算输出
start_time = time.time()
custom_lstm_output = custom_lstm(inputs) # 自定义LSTM输出
custom_lstm_time = (time.time() - start_time) * 1000 # 毫秒
start_time = time.time()
pytorch_lstm_output = pytorch_lstm(inputs) # PyTorch LSTM输出
pytorch_lstm_time = (time.time() - start_time) * 1000 # 毫秒
# 2. 计算参数数量
custom_lstm_params = sum(p.numel() for p in custom_lstm.parameters())
pytorch_lstm_params = sum(p.numel() for p in pytorch_lstm.parameters())
# 3. 比较运行时间(60次运行的平均时间)
custom_lstm_times = []
pytorch_lstm_times = []
for _ in range(60):
start_time = time.time()
custom_lstm(inputs)
custom_lstm_times.append(time.time() - start_time)
start_time = time.time()
pytorch_lstm(inputs)
pytorch_lstm_times.append(time.time() - start_time)
custom_lstm_avg_time = np.mean(custom_lstm_times) * 1000 # 毫秒
pytorch_lstm_avg_time = np.mean(pytorch_lstm_times) * 1000 # 毫秒
return {
'custom_lstm_time': custom_lstm_time,
'pytorch_lstm_time': pytorch_lstm_time,
'custom_lstm_avg_time': custom_lstm_avg_time,
'pytorch_lstm_avg_time': pytorch_lstm_avg_time,
'custom_lstm_params': custom_lstm_params,
'pytorch_lstm_params': pytorch_lstm_params,
'custom_lstm_output': custom_lstm_output,
'pytorch_lstm_output': pytorch_lstm_output
}
# 比较两个模型
comparison_results = compare_models(inputs, custom_lstm, pytorch_lstm)
# 打印结果
#模型在一次前向传播的计算时间
print(f"Custom LSTM (1 run) Time: {comparison_results['custom_lstm_time']} ms")
print(f"PyTorch LSTM (1 run) Time: {comparison_results['pytorch_lstm_time']} ms")
#运行60次的平均运行时间
print(f"Custom LSTM Average Time (60 runs): {comparison_results['custom_lstm_avg_time']} ms")
print(f"PyTorch LSTM Average Time (60 runs): {comparison_results['pytorch_lstm_avg_time']} ms")
#这两个参数分别表示自定义LSTM和PyTorch LSTM模型的参数总数
print(f"Custom LSTM Parameters: {comparison_results['custom_lstm_params']}")
print(f"PyTorch LSTM Parameters: {comparison_results['pytorch_lstm_params']}")
# 打印模型输出
print("\nCustom LSTM Output:")
print(comparison_results['custom_lstm_output'])
print("\nPyTorch LSTM Output:")
print(comparison_results['pytorch_lstm_output'])
五 实验结果
1、模型在序列长度为5、10、15、20、25、30、35上的loss变化和准确率
同SRN模型一样,随着序列长度的增加,训练集上的损失逐渐不稳定,验证集上的损失整体趋向于变大,这说明当序列长度增加时,保持长期依赖的能力同样在逐渐变弱.但是,同SRN相比,LSTM模型在序列长度增加时,收敛情况比SRN模型更好。LSTM模型在测试集上的准确率整体也趋向于降低,但是准确率显著高于SRN模型,表明LSTM模型保持长期依赖的能力要优于SRN模型.
2.将自定义LSTM与pytorch内置的LSTM进行对比
两个模型在单次运行时的执行时间非常接近,差异非常小。在多次运行的平均时间上,自定义 LSTM 的平均时间明显较长。这表明,在连续多次运行时,自定义实现的 LSTM 模型效率较低。自定义 LSTM 模型的参数数量稍少(2480)与 PyTorch LSTM(2560)相比。两者之间的差异可能来自于 PyTorch LSTM 内部的优化结构。
liu
原文地址:https://blog.csdn.net/qq_74062041/article/details/144375935
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!