自学内容网 自学内容网

实验15 验证LSTM模型的长程依赖

本次实验依托上个实验

一 模型构建

创建LSTM模型,里面包括初始化init函数、初始化权重函数_init_weights、初始化状态函数init_state、前向传播函数forward。

init函数:虽然每个时间的输入和隐层都是一样的,但是他们会有四组不同的权重,最终产生输入门、输出门、遗忘门和真实的结果。所以要初始化四组权重W、U、b。例如四组W都是input_size*hidden_size大小的随机数。

init_state:初始化隐状态和单元状态为batch*hidden大小的全零向量。

Forward:首先获取输入的形状为batch_size、seq_len、input_size,然后将隐层的初始状态和单元状态进行初始化。定义相应的门状态和单元状态向量列表Is, Fs, Os, Cs,对于每个序列,开始计算LSTM,获取这个时刻的输入,然后计算得到输入门、遗忘门、输出门、真实结果,根据得到的结果更新单元状态,同时得到隐层状态,最后存储门状态向量和单元状态向量到列表中。

代码:

class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(LSTM, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size

        # 初始化模型参数
        self.W_i = nn.Parameter(torch.randn(input_size, hidden_size))
        self.W_f = nn.Parameter(torch.randn(input_size, hidden_size))
        self.W_o = nn.Parameter(torch.randn(input_size, hidden_size))
        self.W_a = nn.Parameter(torch.randn(input_size, hidden_size))

        self.U_i = nn.Parameter(torch.randn(hidden_size, hidden_size))
        self.U_f = nn.Parameter(torch.randn(hidden_size, hidden_size))
        self.U_o = nn.Parameter(torch.randn(hidden_size, hidden_size))
        self.U_a = nn.Parameter(torch.randn(hidden_size, hidden_size))

        self.b_i = nn.Parameter(torch.zeros(1, hidden_size))
        self.b_f = nn.Parameter(torch.zeros(1, hidden_size))
        self.b_o = nn.Parameter(torch.zeros(1, hidden_size))
        self.b_a = nn.Parameter(torch.zeros(1, hidden_size))

        # 参数初始化
        self.apply(self._init_weights)
    def _init_weights(self, m):
        if isinstance(m, nn.Linear):
            init.xavier_uniform_(m.weight)
    def init_state(self, batch_size):
        # 初始化隐状态和单元状态
        hidden_state = torch.zeros(batch_size, self.hidden_size)
        cell_state = torch.zeros(batch_size, self.hidden_size)
        return hidden_state, cell_state

    def forward(self, inputs, states=None):
        batch_size, seq_len, input_size = inputs.shape  # inputs shape: (batch_size, seq_len, input_size)

        if states is None:
            states = self.init_state(batch_size)
        hidden_state, cell_state = states

        # 定义相应的门状态和单元状态向量列表
        Is, Fs, Os, Cs = [], [], [], []

        # 执行LSTM计算,包括输入门、遗忘门、输出门、候选状态、状态和隐状态
        for step in range(seq_len):
            input_step = inputs[:, step, :]
            I_gate = torch.sigmoid(torch.matmul(input_step, self.W_i) + torch.matmul(hidden_state, self.U_i) + self.b_i)
            F_gate = torch.sigmoid(torch.matmul(input_step, self.W_f) + torch.matmul(hidden_state, self.U_f) + self.b_f)
            O_gate = torch.sigmoid(torch.matmul(input_step, self.W_o) + torch.matmul(hidden_state, self.U_o) + self.b_o)
            C_tilde = torch.tanh(torch.matmul(input_step, self.W_a) + torch.matmul(hidden_state, self.U_a) + self.b_a)

            cell_state = F_gate * cell_state + I_gate * C_tilde
            hidden_state = O_gate * torch.tanh(cell_state)

            # 存储门状态向量和单元状态向量
            Is.append(I_gate.detach().cpu().numpy())
            Fs.append(F_gate.detach().cpu().numpy())
            Os.append(O_gate.detach().cpu().numpy())
            Cs.append(cell_state.detach().cpu().numpy())

        return hidden_state

二 模型训练

所有的都和上个实验一样,就是在这个实验中需要实例化LSTM模型,然后把实例化之后的base_model作为参数实例化Model_RNN4SeqClass。调用train函数开始训练。

代码:

import os
import random
import numpy as np
import torch
from matplotlib import pyplot as plt
from torch import optim
from torch.utils.data import Dataset, DataLoader

#数据集构建
# 固定随机种子
random.seed(0)
np.random.seed(0)
class DigitSumDataset(Dataset):
    def __init__(self, data_path=None, length=None, k=None, mode='train'):
        """
        初始化数据集
        如果传入了data_path,则从文件加载数据;
        如果传入了length和k,则生成数据集。

        参数:
        data_path: 存放数据集的目录,用于加载数据
        length: 数据序列的长度(仅用于生成数据集时)
        k: 数据增强的数量(仅用于生成数据集时)
        mode: 'train'/'dev'/'test',决定生成训练集、验证集还是测试集(仅用于生成数据集时)
        """
        self.data_path = data_path
        self.length = length
        self.k = k
        self.mode = mode

        if data_path:  # 从文件加载数据
            self.examples = self.load_data(data_path)
        else:  # 生成数据
            if length < 3 or k <= 0:
                raise ValueError("The length of data should be greater than 2 and k should be greater than 0.")
            self.examples = self.generate_data()

    def generate_data(self):
        """
        生成数据:生成指定长度的数字序列,并进行数据增强
        """
        base_examples = []
        for n1 in range(0, 10):
            for n2 in range(0, 10):
                seq = [n1, n2] + [0] * (self.length - 2)
                label = n1 + n2
                base_examples.append((seq, label))

        examples = []
        for base_example in base_examples:
            for _ in range(self.k):
                idx = np.random.randint(2, self.length)
                val = np.random.randint(0, 10)
                seq = base_example[0].copy()
                label = base_example[1]
                seq[idx] = val
                examples.append((seq, label))

        return examples

    def load_data(self, data_path):
        """
        从文件加载数据
        """

        def _load_file(file_path):
            examples = []
            with open(file_path, "r", encoding="utf-8") as f:
                for line in f.readlines():
                    items = line.strip().split("\t")
                    seq = [int(i) for i in items[0].split(" ")]
                    label = int(items[1])
                    examples.append((seq, label))
            return examples

        # 加载训练集、验证集、测试集
        train_examples = _load_file(os.path.join(data_path, "train.txt"))
        dev_examples = _load_file(os.path.join(data_path, "dev.txt"))
        test_examples = _load_file(os.path.join(data_path, "test.txt"))

        return train_examples if self.mode == 'train' else dev_examples if self.mode == 'dev' else test_examples

    def __len__(self):
        return len(self.examples)

    def __getitem__(self, idx):
        seq, label = self.examples[idx]
        seq_tensor = torch.tensor(seq, dtype=torch.long)
        label_tensor = torch.tensor(label, dtype=torch.long)
        return seq_tensor, label_tensor

# 设定数据集的路径和生成参数
lengths = [5, 10, 15, 20, 25, 30, 35]
# lengths = [5]
k_train = 3  # 训练集数据增强的数量
k_test_val = 1  # 验证集和测试集数据增强的数量

#总的模型
import torch
import torch.nn as nn
import torch.nn.functional as F

# 嵌入层
class Embedding(nn.Module):
    def __init__(self, num_embeddings, embedding_dim):
        super(Embedding, self).__init__()
        self.W = nn.init.xavier_uniform_(torch.empty(num_embeddings, embedding_dim), gain=1.0)

    def forward(self, inputs):
        inputs = inputs.long()  # 确保是 LongTensor 类型
        embs = self.W[inputs]  # 根据输入索引返回嵌入向量
        return embs



import torch
import torch.nn as nn
import torch.nn.functional as F

from torch.nn import init


class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(LSTM, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size

        # 初始化模型参数
        self.W_i = nn.Parameter(torch.randn(input_size, hidden_size))
        self.W_f = nn.Parameter(torch.randn(input_size, hidden_size))
        self.W_o = nn.Parameter(torch.randn(input_size, hidden_size))
        self.W_a = nn.Parameter(torch.randn(input_size, hidden_size))

        self.U_i = nn.Parameter(torch.randn(hidden_size, hidden_size))
        self.U_f = nn.Parameter(torch.randn(hidden_size, hidden_size))
        self.U_o = nn.Parameter(torch.randn(hidden_size, hidden_size))
        self.U_a = nn.Parameter(torch.randn(hidden_size, hidden_size))

        self.b_i = nn.Parameter(torch.zeros(1, hidden_size))
        self.b_f = nn.Parameter(torch.zeros(1, hidden_size))
        self.b_o = nn.Parameter(torch.zeros(1, hidden_size))
        self.b_a = nn.Parameter(torch.zeros(1, hidden_size))

        # 参数初始化
        self.apply(self._init_weights)
    def _init_weights(self, m):
        if isinstance(m, nn.Linear):
            init.xavier_uniform_(m.weight)
    def init_state(self, batch_size):
        # 初始化隐状态和单元状态
        hidden_state = torch.zeros(batch_size, self.hidden_size)
        cell_state = torch.zeros(batch_size, self.hidden_size)
        return hidden_state, cell_state

    def forward(self, inputs, states=None):
        batch_size, seq_len, input_size = inputs.shape  # inputs shape: (batch_size, seq_len, input_size)

        if states is None:
            states = self.init_state(batch_size)
        hidden_state, cell_state = states

        # 定义相应的门状态和单元状态向量列表
        Is, Fs, Os, Cs = [], [], [], []

        # 执行LSTM计算,包括输入门、遗忘门、输出门、候选状态、状态和隐状态
        for step in range(seq_len):
            input_step = inputs[:, step, :]
            I_gate = torch.sigmoid(torch.matmul(input_step, self.W_i) + torch.matmul(hidden_state, self.U_i) + self.b_i)
            F_gate = torch.sigmoid(torch.matmul(input_step, self.W_f) + torch.matmul(hidden_state, self.U_f) + self.b_f)
            O_gate = torch.sigmoid(torch.matmul(input_step, self.W_o) + torch.matmul(hidden_state, self.U_o) + self.b_o)
            C_tilde = torch.tanh(torch.matmul(input_step, self.W_a) + torch.matmul(hidden_state, self.U_a) + self.b_a)

            cell_state = F_gate * cell_state + I_gate * C_tilde
            hidden_state = O_gate * torch.tanh(cell_state)

            # 存储门状态向量和单元状态向量
            Is.append(I_gate.detach().cpu().numpy())
            Fs.append(F_gate.detach().cpu().numpy())
            Os.append(O_gate.detach().cpu().numpy())
            Cs.append(cell_state.detach().cpu().numpy())

        return hidden_state


# 基于RNN实现数字预测的模型
class Model_RNN4SeqClass(nn.Module):
    def __init__(self, model, num_digits, input_size, hidden_size, num_classes):
        super(Model_RNN4SeqClass, self).__init__()
        # 传入实例化的RNN层,例如SRN
        self.rnn_model = model
        # 词典大小
        self.num_digits = num_digits
        # 嵌入向量的维度
        self.input_size = input_size
        # 定义Embedding层
        self.embedding = Embedding(num_digits, input_size)
        # 定义线性层
        self.linear = nn.Linear(hidden_size, num_classes)

    def forward(self, inputs):
        inputs_emb = self.embedding(inputs)
        hidden_state= self.rnn_model(inputs_emb)  # 获取门状态
        logits = self.linear(hidden_state)
        return logits

#模型训练
class Runner:
    def __init__(self, model, train_loader, val_loader, test_loader, criterion, optimizer):
        self.model = model
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.test_loader = test_loader
        self.criterion = criterion
        self.optimizer = optimizer
        self.best_model = None
        self.best_val_loss = float('inf')
        self.train_losses = []  # 存储训练损失
        self.val_losses = []    # 存储验证损失

    def train(self, epochs):
        for epoch in range(epochs):
            self.model.train()
            running_loss = 0.0
            for inputs, labels in self.train_loader:
                self.optimizer.zero_grad()
                outputs = self.model(inputs)
                loss = self.criterion(outputs, labels)
                loss.backward()
                self.optimizer.step()

                running_loss += loss.item()

            # 计算平均训练损失
            train_loss = running_loss / len(self.train_loader)
            self.train_losses.append(train_loss)

            # 计算验证集上的损失
            val_loss = self.evaluate()
            self.val_losses.append(val_loss)

            print(f'Epoch [{epoch + 1}/{epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')

            # 如果验证集上的损失最小,保存模型
            if val_loss < self.best_val_loss:
                self.best_val_loss = val_loss
                self.best_model = self.model.state_dict()
        plt.figure(figsize=(10, 6))
        plt.plot(self.train_losses, label='Train Loss')
        plt.plot(self.val_losses, label='Validation Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.title('Loss Curve')
        plt.legend()
        plt.grid()
        plt.show()

    def evaluate(self):
        self.model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for inputs, labels in self.val_loader:
                outputs = self.model(inputs)
                loss = self.criterion(outputs, labels)
                val_loss += loss.item()
        return val_loss / len(self.val_loader)

    def test(self):
        self.model.load_state_dict(self.best_model)
        self.model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, labels in self.test_loader:
                outputs = self.model(inputs)
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        test_accuracy = correct / total
        print(f'Test Accuracy: {test_accuracy:.4f}')

    def predict(self, image):
        self.model.eval()
        with torch.no_grad():
            output = self.model(image)
            _, predicted = torch.max(output, 1)
            return predicted.item()

# 循环不同长度的序列
for length in lengths:
    # 生成训练集
    train_dataset = DigitSumDataset(length=length, k=k_train, mode='train')
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    # 生成验证集
    dev_dataset = DigitSumDataset(length=length, k=k_test_val, mode='dev')
    dev_loader = DataLoader(dev_dataset, batch_size=64, shuffle=False)
    # 生成测试集
    test_dataset = DigitSumDataset(length=length, k=k_test_val, mode='test')
    test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
    # 输入数字的类别数
    num_digits = 10
    # 将数字映射为向量的维度
    input_size = 32
    # 隐状态向量的维度s
    hidden_size = 32
    # 预测数字的类别数
    num_classes = 19
    base_model = LSTM(input_size, hidden_size)
    model = Model_RNN4SeqClass(base_model, num_digits, input_size, hidden_size, num_classes)
    # 定义损失函数和优化器
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    # 创建Runner实例,并进行训练
    runner = Runner(model, train_loader, dev_loader, test_loader, criterion, optimizer)
    print(f"Training model for sequence length {length}...")

    # 训练并测试模型
    runner.train(epochs=600)  # 训练模型







三 模型评价

调用test函数计算模型在测试集上的准确率

代码:

runner.test()  # 测试模型

将自定义LSTM与pytorch内置的LSTM进行对比

创建一个PyTorchLSTM模型,其中init函数中定义的lstm直接调用nn.LSTM。

然后调用 compare_models函数,在里面对两个模型进行对比,对比他们的模型在一次前向传播的计算时间、运行60次的平均运行时间、LSTM和PyTorch LSTM模型的参数总数、模型输出

代码:

import torch
import torch.nn as nn
import time
import numpy as np
from torch.nn import init

class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(LSTM, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size

        # 初始化模型参数
        self.W_i = nn.Parameter(torch.randn(input_size, hidden_size))
        self.W_f = nn.Parameter(torch.randn(input_size, hidden_size))
        self.W_o = nn.Parameter(torch.randn(input_size, hidden_size))
        self.W_a = nn.Parameter(torch.randn(input_size, hidden_size))

        self.U_i = nn.Parameter(torch.randn(hidden_size, hidden_size))
        self.U_f = nn.Parameter(torch.randn(hidden_size, hidden_size))
        self.U_o = nn.Parameter(torch.randn(hidden_size, hidden_size))
        self.U_a = nn.Parameter(torch.randn(hidden_size, hidden_size))

        self.b_i = nn.Parameter(torch.zeros(1, hidden_size))
        self.b_f = nn.Parameter(torch.zeros(1, hidden_size))
        self.b_o = nn.Parameter(torch.zeros(1, hidden_size))
        self.b_a = nn.Parameter(torch.zeros(1, hidden_size))

        # 参数初始化
        self.apply(self._init_weights)

    def _init_weights(self, m):
        if isinstance(m, nn.Linear):
            init.xavier_uniform_(m.weight)

    def init_state(self, batch_size):
        # 初始化隐状态和单元状态
        hidden_state = torch.zeros(batch_size, self.hidden_size)
        cell_state = torch.zeros(batch_size, self.hidden_size)
        return hidden_state, cell_state

    def forward(self, inputs, states=None):
        batch_size, seq_len, input_size = inputs.shape  # inputs shape: (batch_size, seq_len, input_size)

        if states is None:
            states = self.init_state(batch_size)
        hidden_state, cell_state = states

        # 执行LSTM计算,包括输入门、遗忘门、输出门、候选状态、状态和隐状态
        for step in range(seq_len):
            input_step = inputs[:, step, :]
            I_gate = torch.sigmoid(torch.matmul(input_step, self.W_i) + torch.matmul(hidden_state, self.U_i) + self.b_i)
            F_gate = torch.sigmoid(torch.matmul(input_step, self.W_f) + torch.matmul(hidden_state, self.U_f) + self.b_f)
            O_gate = torch.sigmoid(torch.matmul(input_step, self.W_o) + torch.matmul(hidden_state, self.U_o) + self.b_o)
            C_tilde = torch.tanh(torch.matmul(input_step, self.W_a) + torch.matmul(hidden_state, self.U_a) + self.b_a)

            cell_state = F_gate * cell_state + I_gate * C_tilde
            hidden_state = O_gate * torch.tanh(cell_state)

        return hidden_state


# PyTorch内置LSTM模型
class PyTorchLSTM(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(PyTorchLSTM, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size)

    def forward(self, inputs, states=None):
        output, (hn, cn) = self.lstm(inputs, states)
        return hn[-1]


# 输入参数设置
input_size = 10
hidden_size = 20
batch_size = 32
seq_len = 50

# 创建随机输入数据
inputs = torch.randn(batch_size, seq_len, input_size)

# 初始化自定义LSTM模型和PyTorch LSTM模型
custom_lstm = LSTM(input_size, hidden_size)
pytorch_lstm = PyTorchLSTM(input_size, hidden_size)


# 计算两个模型的输出,记录时间和参数量
def compare_models(inputs, custom_lstm, pytorch_lstm):
    # 1. 计算输出
    start_time = time.time()
    custom_lstm_output = custom_lstm(inputs)  # 自定义LSTM输出
    custom_lstm_time = (time.time() - start_time) * 1000  # 毫秒

    start_time = time.time()
    pytorch_lstm_output = pytorch_lstm(inputs)  # PyTorch LSTM输出
    pytorch_lstm_time = (time.time() - start_time) * 1000  # 毫秒

    # 2. 计算参数数量
    custom_lstm_params = sum(p.numel() for p in custom_lstm.parameters())
    pytorch_lstm_params = sum(p.numel() for p in pytorch_lstm.parameters())

    # 3. 比较运行时间(60次运行的平均时间)
    custom_lstm_times = []
    pytorch_lstm_times = []
    for _ in range(60):
        start_time = time.time()
        custom_lstm(inputs)
        custom_lstm_times.append(time.time() - start_time)

        start_time = time.time()
        pytorch_lstm(inputs)
        pytorch_lstm_times.append(time.time() - start_time)

    custom_lstm_avg_time = np.mean(custom_lstm_times) * 1000  # 毫秒
    pytorch_lstm_avg_time = np.mean(pytorch_lstm_times) * 1000  # 毫秒

    return {
        'custom_lstm_time': custom_lstm_time,
        'pytorch_lstm_time': pytorch_lstm_time,
        'custom_lstm_avg_time': custom_lstm_avg_time,
        'pytorch_lstm_avg_time': pytorch_lstm_avg_time,
        'custom_lstm_params': custom_lstm_params,
        'pytorch_lstm_params': pytorch_lstm_params,
        'custom_lstm_output': custom_lstm_output,
        'pytorch_lstm_output': pytorch_lstm_output
    }


# 比较两个模型
comparison_results = compare_models(inputs, custom_lstm, pytorch_lstm)

# 打印结果
#模型在一次前向传播的计算时间
print(f"Custom LSTM (1 run) Time: {comparison_results['custom_lstm_time']} ms")
print(f"PyTorch LSTM (1 run) Time: {comparison_results['pytorch_lstm_time']} ms")
#运行60次的平均运行时间
print(f"Custom LSTM Average Time (60 runs): {comparison_results['custom_lstm_avg_time']} ms")
print(f"PyTorch LSTM Average Time (60 runs): {comparison_results['pytorch_lstm_avg_time']} ms")
#这两个参数分别表示自定义LSTM和PyTorch LSTM模型的参数总数
print(f"Custom LSTM Parameters: {comparison_results['custom_lstm_params']}")
print(f"PyTorch LSTM Parameters: {comparison_results['pytorch_lstm_params']}")

# 打印模型输出
print("\nCustom LSTM Output:")
print(comparison_results['custom_lstm_output'])

print("\nPyTorch LSTM Output:")
print(comparison_results['pytorch_lstm_output'])

五 实验结果

1、模型在序列长度为5、10、15、20、25、30、35上的loss变化和准确率

同SRN模型一样,随着序列长度的增加,训练集上的损失逐渐不稳定,验证集上的损失整体趋向于变大,这说明当序列长度增加时,保持长期依赖的能力同样在逐渐变弱.但是同SRN相比,LSTM模型在序列长度增加时,收敛情况比SRN模型更好。LSTM模型在测试集上的准确率整体也趋向于降低但是准确率显著高于SRN模型,表明LSTM模型保持长期依赖的能力要优于SRN模型. 

2.将自定义LSTM与pytorch内置的LSTM进行对比

 两个模型在单次运行时的执行时间非常接近,差异非常小。在多次运行的平均时间上,自定义 LSTM 的平均时间明显较长。这表明,在连续多次运行时,自定义实现的 LSTM 模型效率较低。自定义 LSTM 模型的参数数量稍少(2480)与 PyTorch LSTM(2560)相比。两者之间的差异可能来自于 PyTorch LSTM 内部的优化结构。

liu


原文地址:https://blog.csdn.net/qq_74062041/article/details/144375935

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!