自学内容网 自学内容网

RNN循环递归网络讲解与不掉包python实现

1.算法简介

参考论文:Elman J L. Finding structure in time[J]. Cognitive science, 1990, 14(2): 179-211.,谷歌被引次数超16000!

说到循环递归结构就不得不提到其鼻祖RNN网络。首先我们先对RNN有个初步的概念:想象一下,你正在阅读一本非常吸引人的小说。每次你翻开新的一页,你的大脑不仅会处理这一页上的内容,还会结合之前读过的所有内容来理解故事的情节、人物关系和背景设定。这就是一种“记忆”和“上下文理解”的过程,因为你大脑中的信息不是孤立的,而是连续且相互关联的。RNN就是模仿了这种“记忆”功能的神经网络。在传统的神经网络中,假设每个输入都是独立的,没有前后联系。但RNN不同,它专门设计用来处理序列数据——也就是那些按顺序排列,其中每个元素都与前后的元素有关联的数据,比如时间序列数据(股价、温度变化)、自然语言(句子、对话)等。在RNN中,有一个特殊的循环连接,让信息能够从前一个时间点传递到下一个时间点。就像你在读书时,上一页的信息会影响你对下一页的理解。RNN的这个特性让它能够记住前面的信息,这样当处理后续数据时,它就能够利用这些记忆来做出更好的决策或预测。有了一个初步概念之后,我们现在来具体的讲一讲RNN到底是什么以及其是怎么运行的。
在这里插入图片描述

2. RNN算法原理

在这里插入图片描述
RNN的核心思想是利用序列中的时序信息。与传统的前馈神经网络不同,RNN引入了循环连接,使网络能够保留之前时间步的信息。如上面的GIF图所示(引自:RNN):
其中X表示当前时刻的输入,以一个天气预报任务为例,我们将使用过去3天(前天、昨天和今天)的温度(T’)、湿度(H)和风速(W)来预测下一天的温度(T)。对于这个任务而言,我们的输入数据是[ X T t − 2 X_{T_{t-2}} XTt2, X T t − 1 X_{T_{t-1}} XTt1, X T t X_{T_{t}} XTt, X H t − 2 X_{H_{t-2}} XHt2, X H t − 1 X_{H_{t-1}} XHt1, X H t X_{H_{t}} XHt, X W t − 2 X_{W_{t-2}} XWt2, X W t − 1 X_{W_{t-1}} XWt1, X W t X_{W_{t}} XWt]即X是一个1*9的输入。我们的输出是明天的温度 y T t + 1 y_{T_{t+1}} yTt+1。即我们需要建一个RNN模型 f ( x ) f(x) f(x):
y T t + 1 = f ( X T t − 2 , X T t − 1 , X T t , X H t − 2 , X H t − 1 , X H t , X W t − 2 , X W t − 1 , X W t ) y_{T_{t+1}}=f(X_{T_{t-2}},X_{T_{t-1}},X_{T_{t}},X_{H_{t-2}},X_{H_{t-1}},X_{H_{t}},X_{W_{t-2}},X_{W_{t-1}},X_{W_{t}}) yTt+1=f(XTt2,XTt1,XTt,XHt2,XHt1,XHt,XWt2,XWt1,XWt)
那么具体该怎么实现这个模型呢?待我一步步分解:

2.1 RNN基本结构介绍

RNN的核心是其循环结构,它允许信息在序列中传递。对于我们的天气预报任务,RNN的基本结构可以表示为:
h t = t a n h ( W h h ∗ h t − 1 + W x h ∗ x t + b h ) h_t = tanh(W_hh * h_{t-1} + W_xh * x_t + b_h) ht=tanh(Whhht1+Wxhxt+bh)
y t = W h y ∗ h t + b y y_t = W_hy * h_t + b_y yt=Whyht+by
其中:

  • h t h_t ht 是当前时间步的隐藏状态
  • x t x_t xt 是当前时间步的输入
  • W h h W_hh Whh, W x h W_xh Wxh, W h y W_hy Why 是权重矩阵,也就是网络的可学习参数,是未知量。
  • b h b_h bh, b y b_y by 是偏置项
  • tanh 是激活函数

2.2 计算流程

对于我们的天气预报任务,计算流程如下:

a) 初始化隐藏状态 h 0 h_0 h0(通常为零向量)

b) 对于每个时间步 t = 1 to 3:
h t − 2 = h 0 h_{t-2}=h_0 ht2=h0
h t − 1 = t a n h ( W h h ∗ h t − 2 + W x h ∗ [ X T t − 1 , X H t − 1 , X W t − 1 ] + b h ) h_{t-1} = tanh(W_{h}h * h_{t-2}+ W_xh*[X_{T_{t-1}}, X_{H_{t-1}}, X_{W_{t-1}}]+ b_h) ht1=tanh(Whhht2+Wxh[XTt1,XHt1,XWt1]+bh)
h t = t a n h ( W h h ∗ h t − 2 + W x h ∗ [ X T t , X H t , X W t ] + b h ) h_{t} = tanh(W_{h}h * h_{t-2}+ W_xh*[X_{T_t}, X_{H_t}, X_{W_t}]+ b_h) ht=tanh(Whhht2+Wxh[XTt,XHt,XWt]+bh)
c) 最后,我们使用最终的隐藏状态来预测明天的温度:
y T t + 1 = W h y ∗ h t + b y y_{T_{t+1}} = W_hy * h_t + b_y yTt+1=Whyht+by
然后我手搓了一个RNN代码便于各位理解:

class RNN:
    def __init__(self, input_size, hidden_size, output_size):
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size

        # 初始化权重
        self.Wxh = np.random.randn(hidden_size, input_size) * 0.01
        self.Whh = np.random.randn(hidden_size, hidden_size) * 0.01
        self.Why = np.random.randn(output_size, hidden_size) * 0.01

        # 初始化偏置
        self.bh = np.zeros((hidden_size, 1))
        self.by = np.zeros((output_size, 1))

        # 初始化Adam优化器
        self.optimizer = Adam({
            'Wxh': self.Wxh, 'Whh': self.Whh, 'Why': self.Why,
            'bh': self.bh, 'by': self.by
        })

    def forward(self, inputs):
        h = np.zeros((self.hidden_size, 1))
        self.last_inputs = inputs
        self.last_hs = {0: h}

        # 前向传播
        for t, x in enumerate(inputs):
            h = np.tanh(np.dot(self.Wxh, x) + np.dot(self.Whh, h) + self.bh)
            self.last_hs[t + 1] = h

        y = np.dot(self.Why, h) + self.by
        return y, h

    def backward(self, d_y):
        n = len(self.last_inputs)

        # 初始化梯度
        d_Wxh = np.zeros_like(self.Wxh)
        d_Whh = np.zeros_like(self.Whh)
        d_Why = np.zeros_like(self.Why)
        d_bh = np.zeros_like(self.bh)
        d_by = np.zeros_like(self.by)

        d_h = np.dot(self.Why.T, d_y)

        # 反向传播
        for t in reversed(range(n)):
            temp = (1 - self.last_hs[t + 1] ** 2) * d_h
            d_Wxh += np.dot(temp, self.last_inputs[t].T)
            d_Whh += np.dot(temp, self.last_hs[t].T)
            d_bh += temp

            d_h = np.dot(self.Whh.T, temp)

        d_Why = np.dot(d_y, self.last_hs[n].T)
        d_by = d_y

        # 使用Adam优化器更新参数
        self.optimizer.step({
            'Wxh': d_Wxh, 'Whh': d_Whh, 'Why': d_Why,
            'bh': d_bh, 'by': d_by
        })

3.完整训练代码

import numpy as np


class Adam:
    def __init__(self, params, lr=0.001, beta1=0.9, beta2=0.999, epsilon=1e-8):
        self.params = params
        self.lr = lr
        self.beta1 = beta1
        self.beta2 = beta2
        self.epsilon = epsilon
        self.m = {k: np.zeros_like(v) for k, v in params.items()}
        self.v = {k: np.zeros_like(v) for k, v in params.items()}
        self.t = 0

    def step(self, grads):
        self.t += 1
        for k in self.params.keys():
            self.m[k] = self.beta1 * self.m[k] + (1 - self.beta1) * grads[k]
            self.v[k] = self.beta2 * self.v[k] + (1 - self.beta2) * (grads[k] ** 2)
            m_hat = self.m[k] / (1 - self.beta1 ** self.t)
            v_hat = self.v[k] / (1 - self.beta2 ** self.t)
            self.params[k] -= self.lr * m_hat / (np.sqrt(v_hat) + self.epsilon)


class RNN:
    def __init__(self, input_size, hidden_size, output_size):
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size

        # 初始化权重
        self.Wxh = np.random.randn(hidden_size, input_size) * 0.01
        self.Whh = np.random.randn(hidden_size, hidden_size) * 0.01
        self.Why = np.random.randn(output_size, hidden_size) * 0.01

        # 初始化偏置
        self.bh = np.zeros((hidden_size, 1))
        self.by = np.zeros((output_size, 1))

        # 初始化Adam优化器
        self.optimizer = Adam({
            'Wxh': self.Wxh, 'Whh': self.Whh, 'Why': self.Why,
            'bh': self.bh, 'by': self.by
        })

    def forward(self, inputs):
        h = np.zeros((self.hidden_size, 1))
        self.last_inputs = inputs
        self.last_hs = {0: h}

        # 前向传播
        for t, x in enumerate(inputs):
            h = np.tanh(np.dot(self.Wxh, x) + np.dot(self.Whh, h) + self.bh)
            self.last_hs[t + 1] = h

        y = np.dot(self.Why, h) + self.by
        return y, h

    def backward(self, d_y):
        n = len(self.last_inputs)

        # 初始化梯度
        d_Wxh = np.zeros_like(self.Wxh)
        d_Whh = np.zeros_like(self.Whh)
        d_Why = np.zeros_like(self.Why)
        d_bh = np.zeros_like(self.bh)
        d_by = np.zeros_like(self.by)

        d_h = np.dot(self.Why.T, d_y)

        # 反向传播
        for t in reversed(range(n)):
            temp = (1 - self.last_hs[t + 1] ** 2) * d_h
            d_Wxh += np.dot(temp, self.last_inputs[t].T)
            d_Whh += np.dot(temp, self.last_hs[t].T)
            d_bh += temp

            d_h = np.dot(self.Whh.T, temp)

        d_Why = np.dot(d_y, self.last_hs[n].T)
        d_by = d_y

        # 使用Adam优化器更新参数
        self.optimizer.step({
            'Wxh': d_Wxh, 'Whh': d_Whh, 'Why': d_Why,
            'bh': d_bh, 'by': d_by
        })


# 生成模拟数据
def generate_data(num_samples, time_steps):
    X = np.random.rand(num_samples, time_steps, 3)  # 3个特征:温度、湿度、风速
    y = np.sum(X[:, :, 0], axis=1) / 3 + np.random.normal(0, 0.1, num_samples)  # 简单地用平均温度加噪声作为目标
    return X, y.reshape(-1, 1)


# 数据标准化
def normalize(data):
    return (data - np.mean(data)) / np.std(data)


# 生成训练和测试数据
X_train, y_train = generate_data(1000, 3)
X_test, y_test = generate_data(200, 3)

# 标准化数据
X_train_norm = normalize(X_train)
y_train_norm = normalize(y_train)
X_test_norm = normalize(X_test)
y_test_norm = normalize(y_test)

# 初始化RNN
input_size = 3
hidden_size = 64
output_size = 1
rnn = RNN(input_size, hidden_size, output_size)


# 训练函数
def train(rnn, X, y, epochs):
    for epoch in range(epochs):
        total_loss = 0
        for i in range(len(X)):
            inputs = [X[i][t].reshape(-1, 1) for t in range(3)]
            target = y[i]

            # 前向传播
            output, _ = rnn.forward(inputs)

            # 计算损失
            loss = np.sum((output - target) ** 2)
            total_loss += loss

            # 反向传播
            d_y = 2 * (output - target)
            rnn.backward(d_y)

        if epoch % 10 == 0:
            print(f"Epoch {epoch}, Loss: {total_loss / len(X)}")


# 初始化RNN
input_size = 3
hidden_size = 64
output_size = 1
rnn = RNN(input_size, hidden_size, output_size)

# 训练模型
train(rnn, X_train_norm, y_train_norm, epochs=100)


# 评估函数
def evaluate(rnn, X, y):
    total_loss = 0
    for i in range(len(X)):
        inputs = [X[i][t].reshape(-1, 1) for t in range(3)]
        target = y[i]

        output, _ = rnn.forward(inputs)
        loss = np.sum((output - target) ** 2)
        total_loss += loss

    return total_loss / len(X)


# 评估模型
test_loss = evaluate(rnn, X_test_norm, y_test_norm)
print(f"Test Loss: {test_loss}")


# 进行预测
def predict(rnn, X):
    inputs = [X[t].reshape(-1, 1) for t in range(3)]
    output, _ = rnn.forward(inputs)
    return output


# 示例预测
sample_data = X_test_norm[0]
prediction = predict(rnn, sample_data)
print(f"Sample input: {X_test[0]}")
print(f"Normalized prediction: {prediction[0][0]}")

# 反标准化预测结果
mean_y = np.mean(y_train)
std_y = np.std(y_train)
denormalized_prediction = prediction[0][0] * std_y + mean_y
print(f"Denormalized prediction: {denormalized_prediction}")
print(f"Actual value: {y_test[0][0]}")

创作不易,烦请各位观众老爷给个三连,小编在这里跪谢了!
在这里插入图片描述


原文地址:https://blog.csdn.net/m0_59257547/article/details/140498895

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!