自学内容网 自学内容网

从0开始transformer代码理解(附带调试和个人原理理解)

代码来源

本次代码来源自github https://github.com/graykode/nlp-tutorial
里面的5.1 transformer代码

第一步 数据准备(从main函数开始)

首先这里是自定义了三句话,分别是给到encoder的输入和decoder的输入还有测试的输入

sentences = ['ich mochte ein bier P', 'S i want a beer', 'i want a beer E']

然后把这些通过词向量编号变成,对应的向量。

#对应的词向量编号
    src_vocab = {'P': 0, 'ich': 1, 'mochte': 2, 'ein': 3, 'bier': 4}
    src_vocab_size = len(src_vocab)

    tgt_vocab = {'P': 0, 'i': 1, 'want': 2, 'a': 3, 'beer': 4, 'S': 5, 'E': 6}
    number_dict = {i: w for i, w in enumerate(tgt_vocab)}
    tgt_vocab_size = len(tgt_vocab)
    src_len = 5 # 我们训练的时候的源序列长度
    tgt_len = 5 # 测试集语句的长度

    d_model = 512  # Embedding Size  词嵌入的维度 可以理解为一个词有512个维度 它被分解成了具有一个512维度特征的向量 更浅显易懂的说就是把“我”这个词和512种不同的词作比较从而得到向量
    d_ff = 2048  # 这个先不管 是前馈神经网络所需要提高的维度
    d_k = d_v = 64  # 特征k v 的特征维度
    n_layers = 6  # Encoder 和 Decoder 的数量 也就是大N
    n_heads = 8  # 把qkv所需要分的头数  但是要注意 n_heads * d_k = d_model 为什么?因为输入的d_model被划分成8个子空间(也就是头数)这意味着每个头输出的维度是64计算完成后拼接起来

经过make_batch函数我们得到encoder和decoder的向量

enc_inputs, dec_inputs, target_batch = make_batch(sentences)

那他们是什么形状的呢?如下图 看到一句话是[1,5]也就是
在这里插入图片描述

make_batch函数

其实很简单 这个函数跑就是把词对应的编号一 一和我们的句子对应起来 比如S对应5. 以此类推

#用于将句子转换为批量数据
def make_batch(sentences):
    # 将输入句子转换为词汇表的索引
    input_batch = [[src_vocab[n] for n in sentences[0].split()]]
    # 将输出句子转换为词汇表的索引
    output_batch = [[tgt_vocab[n] for n in sentences[1].split()]]
    # 将目标句子转换为词汇表的索引
    target_batch = [[tgt_vocab[n] for n in sentences[2].split()]]
    # 返回批量数据
    return torch.LongTensor(input_batch), torch.LongTensor(output_batch), torch.LongTensor(target_batch)

那么接下来就来到重要的模型定义和训练

    model = Transformer()#  Transformer模型

    criterion = nn.CrossEntropyLoss()#交叉熵损失函数
    optimizer = optim.Adam(model.parameters(), lr=0.001)#    优化器

    for epoch in range(20):
        optimizer.zero_grad()
        outputs, enc_self_attns, dec_self_attns, dec_enc_attns = model(enc_inputs, dec_inputs)
        loss = criterion(outputs, target_batch.contiguous().view(-1))
        print('Epoch:', '%04d' % (epoch + 1), 'cost =', '{:.6f}'.format(loss))
        loss.backward()
        optimizer.step()

Transformer 主体函数定义代码

怎么说呢?主体代码就是根据transformer上面的图一样的,不会的可以看我上一篇博客。开始是encoder层然后是decoder层,最后用一个线性层降维得到我们所需要的结果

class Transformer(nn.Module):
    # 初始化函数
    def __init__(self):
        # 调用父类的初始化函数
        super(Transformer, self).__init__()
        # 定义编码器
        self.encoder = Encoder()
        # 定义解码器
        self.decoder = Decoder()
        # 定义一个线性层,将d_model个神经元的编码器输出转化为目标词汇表大小个神经元的输出,不添加偏置
        self.projection = nn.Linear(d_model, tgt_vocab_size, bias=False)
    # 前向传播函数
    def forward(self, enc_inputs, dec_inputs):
        # 调用编码器
        enc_outputs, enc_self_attns = self.encoder(enc_inputs)
        # 调用解码器
        dec_outputs, dec_self_attns, dec_enc_attns = self.decoder(dec_inputs, enc_inputs, enc_outputs)#enc_inputs是告诉解码端那些是pad符号
        # 调用线性层 调整输出形状  就是把我们的decoder输出的d_model维度映射到词汇表维度  在本例子种是512->5
        dec_logits = self.projection(dec_outputs) # dec_logits : [batch_size x src_vocab_size x tgt_vocab_size]
        # 返回解码  这里是方便我们调用损失函数  因为我们的形状是[batch_size x src_vocab_size x tgt_vocab_size]  所以batch * src_size是我们的总样本数 方便进行交叉熵计算
        return dec_logits.view(-1, dec_logits.size(-1)), enc_self_attns, dec_self_attns, dec_enc_attns

Encoder层

词向量维度嵌入

ok 进入我们transformer的第一步解释我们的encodr层,因为transformer里面的第一步是输入到encoder里面
**这里解释一下整个原理 ,我们的encoder必须是先加上位置编码 - - > 然后进行多头注意力机制(其中有用norm操作进行残差连接) --> 前馈神经网络层 ** 简单的三步却让人一生无法释怀!

我将分开讲,首先是定义,很简单,我想说的是这个词嵌入层,可能新手不太懂(比如我),那么你可以简单的理解为维度转换,这里就是把维度为[1,5,5]的数据变成[1,5,512] 我添加了打印代码,调试结果在下面。然后就是pos_emb 这个就是相当与把我们需要训练的数据加上下标,具体就是根据我们的编码表(这里是直接把我们的编码表输入了)获得对应的嵌入。

    def __init__(self):
        super(Encoder, self).__init__()
        # 定义编码器中的词嵌入层
        self.src_emb = nn.Embedding(src_vocab_size, d_model)
        # 定义编码器中的位置编码层,从预训练的编码表中获取
        self.pos_emb = nn.Embedding.from_pretrained(get_sinusoid_encoding_table(src_len+1, d_model),freeze=True)
        # 定义编码器中的层
        self.layers = nn.ModuleList([EncoderLayer() for _ in range(n_layers)])

在这里插入图片描述

掩码部分实现

OK进行下一步
这一步很重要!因为我们在训练的时候要要求我们的序列是等长的,但是很多时候我们的词汇并不是 这样,有长有短,所以我们必须用一些特定符号进行标记,这里我们用PAD来说明,但是填充的时候肯定要标记呀,不然把PAD也拿去训练那不是搞笑了吗。

        # 获取编码器自注意力时的mask  注意我们的输入都是enc_inputs
        enc_self_attn_mask = get_attn_pad_mask(enc_inputs, enc_inputs)

在这里插入图片描述

所以这个函数是这样的
我们的输入都是enc_inputs 为什么?** 这个问题就要从注意力点积开始了。注意力计算的核心操作是对查询(Q)和键(K)之间的点积。假设查询的维度是 [batch_size, len_q, d_k],键的维度是 [batch_size, len_k, d_k],点积结果的维度将会是 [batch_size, len_q, len_k]。
掩码矩阵的维度需要与这个点积结果的维度匹配,以便正确地应用掩码。因此,掩码矩阵的维度必须是 [batch_size, len_q, len_k]。**

def get_attn_pad_mask(seq_q, seq_k):#enc_inputs, enc_inputs 告诉后面的句子后面的层 那些是被pad符号填充的  pad的目的是让batch里面的每一行长度一致
    # 获取seq_q和seq_k的batch_size和长度
    batch_size, len_q = seq_q.size()
    batch_size, len_k = seq_k.size()
    # 创建一个全为0的mask矩阵,维度为batch_size x len_k
    pad_attn_mask = seq_k.data.eq(0).unsqueeze(1)  # batch_size x 1 x len_k(=len_q), one is maskingtensor([[[False, False, False, False,  True]]])  True代表为pad符号
    # 扩展mask矩阵,使其维度为batch_size x len_q x len_k
    a=pad_attn_mask.expand(batch_size, len_q, len_k)
    #tensor([[[False, False, False, False,  True],
        #  [False, False, False, False,  True],
        #  [False, False, False, False,  True],
        #  [False, False, False, False,  True],
        #  [False, False, False, False,  True]]])
    return pad_attn_mask.expand(batch_size, len_q, len_k)  # batch_size x len_q x len_k  重复len_q次
    

做完这些,最后就到了encoder的叠加,也就是N个encoder的叠加,把上一层的输出传给下一层
把上一个encoder的输出变为为下一个encoder的输入,还有encoder的掩码也要传递给下一个encoder,因为它们的掩码是通用的。

        # 遍历编码器中的每一层
        for layer in self.layers:
            # 将输出传递给下一层 
            enc_outputs, enc_self_attn = layer(enc_outputs, enc_self_attn_mask)
            # 将编码器自注意力添加到自注意力列表中 这个列表主要是最后的展示show 没什么用
            enc_self_attns.append(enc_self_attn)
        # 返回编码器的输出和自注意力列表

多层encoder叠加

那有人会问 ,我们的encoderlayer是怎么样的
以下代码解释了怎么样的
我们可以看到,里面很简单和原文的模型结构一样,一个自注意力层和前馈神经网络。

class EncoderLayer(nn.Module):
    def __init__(self):
        super(EncoderLayer, self).__init__()
        self.enc_self_attn = MultiHeadAttention()  # 编码器自注意力层
        self.pos_ffn = PoswiseFeedForwardNet()  # 位置前馈神经网络

    def forward(self, enc_inputs, enc_self_attn_mask):
        enc_outputs, attn = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs, enc_self_attn_mask)  # encoder的输入的qkv特征都是来自一个input所以一样
        enc_outputs = self.pos_ffn(enc_outputs)  # enc_outputs: [batch_size x len_q x d_model]
        return enc_outputs, attn

多头自注意力层

``这个实现的代码就是通过给的input然后乘以权重矩阵Wq Wk Wv来获得 我们的qkv特征,当然我们再这里需要分头,所谓分头就是把我们的d_model特征分成n_head*d_q 或d_v 或d_k个头,然后进行注意力计算达到我们原文描述的效果。在下面注解种有。

class MultiHeadAttention(nn.Module):
    def __init__(self):#通过线形层映射一个qkv的特征矩阵
        super(MultiHeadAttention, self).__init__()
        self.W_Q = nn.Linear(d_model, d_k * n_heads)
        self.W_K = nn.Linear(d_model, d_k * n_heads)
        self.W_V = nn.Linear(d_model, d_v * n_heads)
        self.linear = nn.Linear(n_heads * d_v, d_model)
        self.layer_norm = nn.LayerNorm(d_model)

    def forward(self, Q, K, V, attn_mask):
        # q: [batch_size x len_q x d_model], k: [batch_size x len_k x d_model], v: [batch_size x len_k x d_model]
        residual, batch_size = Q, Q.size(0)
        # (B, S, D) -proj-> (B, S, D) -split-> (B, S, H, W) -trans-> (B, H, S, W)这里是分头  也就是把q分成q_1 q_2.。。。
        q_s = self.W_Q(Q).view(batch_size, -1, n_heads, d_k).transpose(1,2)  # q_s: [batch_size x n_heads x len_q x d_k]
        k_s = self.W_K(K).view(batch_size, -1, n_heads, d_k).transpose(1,2)  # k_s: [batch_size x n_heads x len_k x d_k]
        v_s = self.W_V(V).view(batch_size, -1, n_heads, d_v).transpose(1,2)  # v_s: [batch_size x n_heads x len_k x d_v]
        print("W_Q", self.W_Q.weight)
        print("W_K", self.W_K.weight)
        print("W_V", self.W_V.weight)

当然对于分的每个头,我们都需要知道他们的掩码(也就是pad符号)所以我们对我们的掩码也进行分头(效果如下图)

 attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1) # attn_mask : [batch_size x n_heads x len_q x len_k]  对每个头我们都要知道他们的qkv特征的pad填充在哪所以也要分头

在这里插入图片描述
然后是注意力点积计算部分

context, attn = ScaledDotProductAttention()(q_s, k_s, v_s, attn_mask)

函数ScaledDotProductAttention根据原文给的公式计算,我的上一篇博客有
这里注意一个问题,这里可能会问为什么不能先和v乘再进行softmax 因为qk乘以得到的结果矩阵和v维度不一样 再者我们假如乘完再softmax最后会失去其注意力机制的意义 首先因为乘以完v后我们会维度不匹配,scores 的形状是 [batch_size, seq_len, seq_len],V 的形状是 [batch_size, seq_len, d_v]。直接相乘在数学上是行不通的,因为矩阵乘法要求维度匹配。再者原文中只有这qk的公式。

class ScaledDotProductAttention(nn.Module):#这里是注意力机制的计算部分 
    def __init__(self):
        super(ScaledDotProductAttention, self).__init__()

    def forward(self, Q, K, V, attn_mask):
        scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k) # scores : [batch_size x n_heads x len_q(=len_k) x len_k(=len_q)]  做qk乘法后除以维度
        scores.masked_fill_(attn_mask, -1e9) # Fills elements of self tensor with value where mask is one. 把pad符号的值设置为负无穷  因为设置负无穷可以让softmax的值都为0
        attn = nn.Softmax(dim=-1)(scores) # 归一化 这里可能会问为什么不能先和v乘再进行softmax  因为qk乘以得到的结果矩阵和v维度不一样 再者我们假如乘完再softmax最后会失去其注意力机制的意义 因为我们要
        context = torch.matmul(attn, V)#[1,8,5,5]x[1,8,5,64]=[1,8,5,64] 计算矩阵乘法
        return context, attn

在这里插入图片描述
在这里插入图片描述

然后是把得到的context进行合并用于前馈神经网络降维后再进行残差链接


        context = context.transpose(1, 2).contiguous().view(batch_size, -1, n_heads * d_v) # context: [batch_size x len_q x n_heads * d_v]这里d_v是64
        output = self.linear(context)#线性层把512->512便于残差链接
        return self.layer_norm(output + residual), attn # output: [batch_size x len_q x d_model]残差链接

前馈神经网络、

  self.pos_ffn = PoswiseFeedForwardNet()  # 位置前馈神经网络

这里主要是对我们的通道进行缩放注解中有

 class PoswiseFeedForwardNet(nn.Module):#对应图的feedforward层  前馈神经网络增强非线性表达能力 具体就是将其低维度卷积->高纬度 用ReLU激活函数->还原低纬度
 def __init__(self):
     super(PoswiseFeedForwardNet, self).__init__()
     self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)#512->2048
     self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)#2048->512
     self.layer_norm = nn.LayerNorm(d_model)#顺手进行norm操作
#一维卷积 (nn.Conv1d) 需要输入的数据形状为 [batch_size, in_channels, length]。具体来说:

# batch_size:批次大小,表示输入样本的数量。
# in_channels:输入通道数,表示输入特征的维度。
# length:输入序列的长度。
 def forward(self, inputs):
     residual = inputs # inputs : [batch_size, len_q, d_model]
     output = nn.ReLU()(self.conv1(inputs.transpose(1, 2)))#这里因为一维卷积需要维度再前所以要交换特征位置
     output = self.conv2(output).transpose(1, 2)
     return self.layer_norm(output + residual)

Decodr部分

说到Dcoder 其实Dcoder和Encoder的代码差不多 也就是多了交叉注意力机制。但是Dcoder的输入我们需要注意,他们分别是encoder的输出和输入还有我们额decoder的输入。这个时候疑惑,这里的代码为什么还需要encoder的输入?其实这里的输入就是为了再交叉注意力计算的时候给出encoder 的掩码位置便于计算而已。

     # 调用解码器
     dec_outputs, dec_self_attns, dec_enc_attns = self.decoder(dec_inputs, enc_inputs, enc_outputs)#enc_inputs是告诉解码端那些是pad符号

初始化代码就是词嵌入和位置编码和encoder一样

class Decoder(nn.Module):
 def __init__(self):
     super(Decoder, self).__init__()
     self.tgt_emb = nn.Embedding(tgt_vocab_size, d_model)
     self.pos_emb = nn.Embedding.from_pretrained(get_sinusoid_encoding_table(tgt_len+1, d_model),freeze=True)
     self.layers = nn.ModuleList([DecoderLayer() for _ in range(n_layers)])

mask操作

因为在我们训练的时候需要蒙住操作,具体原理如下图
比如输入S的时候只能看到S看不到卷起来 输入卷的时候只能看到S和卷看不到起来
所以形成一个上三角矩阵。

请添加图片描述
代码如下 核心代码就是最后一句获得我们的mask矩阵

 def forward(self, dec_inputs, enc_inputs, enc_outputs): # dec_inputs : [batch_size x target_len]
     dec_outputs = self.tgt_emb(dec_inputs) + self.pos_emb(torch.LongTensor([[5,1,2,3,4]]))#和encoder的输入一样,需要加上位置编码
     dec_self_attn_pad_mask = get_attn_pad_mask(dec_inputs, dec_inputs)#解码端的自注意力mask 也就是告诉输入的pad符号不参与自注意力计算 并且告诉哪里是pad符号
     dec_self_attn_subsequent_mask = get_attn_subsequent_mask(dec_inputs)#得到decoder输入的上三角mask矩阵

在这里插入图片描述

当和pad矩阵两者相加就会得到我们要的矩阵大于0的部分置为1小于等于0 的部分置为0
代码如下

     dec_self_attn_mask = torch.gt((dec_self_attn_pad_mask + dec_self_attn_subsequent_mask), 0)#当和pad矩阵两者相加就会得到我们要的矩阵大于0的部分置为1  小于等于0 的部分置为0

交叉注意力层实现

我们知道交叉注意力 层我们需要encoder的qk和我们decoder的v来进行qkv多头注意力矩阵计算矩阵计算

交叉注意力层的mask矩阵 它的输入Quer来自于Masked Multi-Head Attention的输出,Keys和Values来自于Encoder中最后一层的输出。所以dec_input输入的是q enc_inputs输入的是k v

dec_enc_attn_mask = get_attn_pad_mask(dec_inputs, enc_inputs)#交叉注意力层的mask矩阵  它的输入Quer来自于Masked Multi-Head Attention的输出,Keys和Values来自于Encoder中最后一层的输出。所以dec_input输入的是q enc_inputs输入的是k v

然后我们就是decoder的层进行叠加6次,其实只要注意交叉注意力层就行了,但是我们要注意我们的输入要输入encoder的输出enc_outputs和decoder的输出dec_outputs,还有交叉注意力的掩码dec_enc_attn_mask和,自注意力掩码dec_self_attn_mask

     dec_self_attns, dec_enc_attns = [], []
     for layer in self.layers:
         dec_outputs, dec_self_attn, dec_enc_attn = layer(dec_outputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask)
         dec_self_attns.append(dec_self_attn)
         dec_enc_attns.append(dec_enc_attn)
     return dec_outputs, dec_self_attns, dec_enc_attns

class DecoderLayer(nn.Module):
  def __init__(self):
      super(DecoderLayer, self).__init__()
      self.dec_self_attn = MultiHeadAttention()
      self.dec_enc_attn = MultiHeadAttention()
      self.pos_ffn = PoswiseFeedForwardNet()

  def forward(self, dec_inputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask):
      dec_outputs, dec_self_attn = self.dec_self_attn(dec_inputs, dec_inputs, dec_inputs, dec_self_attn_mask)#自注意力层
      dec_outputs, dec_enc_attn = self.dec_enc_attn(dec_outputs, enc_outputs, enc_outputs, dec_enc_attn_mask)#交互注意力层
      dec_outputs = self.pos_ffn(dec_outputs)#前馈神经网络
      return dec_outputs, dec_self_attn, dec_enc_attn

前馈神经网络

其实encoder和decoder的前馈神经网络是一样的,都是维度变换压缩然后512->2048 ->512然后进行残差链接。具体可以看我的代码注释

最后一部

就是计算损失…,用测试集测试,将返回的掩码进行展示
为什么返回掩码集合就是以下这样

   print('first head of last state enc_self_attns')
   showgraph(enc_self_attns)

   print('first head of last state dec_self_attns')
   showgraph(dec_self_attns)

   print('first head of last state dec_enc_attns')
   showgraph(dec_enc_attns)

全部代码

# %%
# code by Tae Hwan Jung(Jeff Jung) @graykode, Derek Miller @dmmiller612
# Reference : https://github.com/jadore801120/attention-is-all-you-need-pytorch
#           https://github.com/JayParks/transformer
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt

# S: Symbol that shows starting of decoding input
# E: Symbol that shows starting of decoding output
# P: Symbol that will fill in blank sequence if current batch data size is short than time steps

# 定义一个函数,用于将句子转换为批量数据
def make_batch(sentences):
   # 将输入句子转换为词汇表的索引
   input_batch = [[src_vocab[n] for n in sentences[0].split()]]
   # 将输出句子转换为词汇表的索引
   output_batch = [[tgt_vocab[n] for n in sentences[1].split()]]
   # 将目标句子转换为词汇表的索引
   target_batch = [[tgt_vocab[n] for n in sentences[2].split()]]
   # 返回批量数据
   return torch.LongTensor(input_batch), torch.LongTensor(output_batch), torch.LongTensor(target_batch)

# 定义一个函数,用于获取正弦编码表
def get_sinusoid_encoding_table(n_position, d_model):
   # 定义一个计算角度的函数
   def cal_angle(position, hid_idx):
       # 计算位置除以10000的幂次方,再乘以2乘以隐藏层索引除以d_model
       return position / np.power(10000, 2 * (hid_idx // 2) / d_model)
   # 定义一个获取位置角度向量的函数
   def get_posi_angle_vec(position):
       # 返回一个列表,列表的元素为计算得到的的角度
       return [cal_angle(position, hid_j) for hid_j in range(d_model)]

   # 创建一个n_position位置的正弦编码表
   sinusoid_table = np.array([get_posi_angle_vec(pos_i) for pos_i in range(n_position)])
   # 将正弦编码表的偶数列(dim 2i)设置为sin
   sinusoid_table[:, 0::2] = np.sin(sinusoid_table[:, 0::2])  # dim 2i
   # 将正弦编码表的奇数列(dim 2i+1)设置为cos
   sinusoid_table[:, 1::2] = np.cos(sinusoid_table[:, 1::2])  # dim 2i+1
   # 返回FloatTensor类型的正弦编码表
   return torch.FloatTensor(sinusoid_table)

def get_attn_pad_mask(seq_q, seq_k):#enc_inputs, enc_inputs 告诉后面的句子后面的层 那些是被pad符号填充的  pad的目的是让batch里面的每一行长度一致
   # 获取seq_q和seq_k的batch_size和长度
   batch_size, len_q = seq_q.size()
   batch_size, len_k = seq_k.size()
   # 创建一个全为0的mask矩阵,维度为batch_size x len_k  增加第一个维度,len_k个元素都是0
   pad_attn_mask = seq_k.data.eq(0).unsqueeze(1)  # batch_size x 1 x len_k(=len_q), one is maskingtensor([[[False, False, False, False,  True]]])  True代表为pad符号
   # 扩展mask矩阵,使其维度为batch_size x len_q x len_k
   a=pad_attn_mask.expand(batch_size, len_q, len_k)#tensor([[[False, False, False, False,  True],
       #  [False, False, False, False,  True],
       #  [False, False, False, False,  True],
       #  [False, False, False, False,  True],
       #  [False, False, False, False,  True]]])
   return pad_attn_mask.expand(batch_size, len_q, len_k)  # batch_size x len_q x len_k  重复len_q次
   

def get_attn_subsequent_mask(seq):
   attn_shape = [seq.size(0), seq.size(1), seq.size(1)]#创建三角矩阵维度
   subsequent_mask = np.triu(np.ones(attn_shape), k=1)#创建一个上三角为1的矩阵,为什么要这样 因为我们的mask是 比如[S我爱我家] 从S开始看就是说看S的时候只能看S 看到我的时候只能看过S和我。。。就是单词掩码
   subsequent_mask = torch.from_numpy(subsequent_mask).byte()
   return subsequent_mask

class ScaledDotProductAttention(nn.Module):#这里是注意力机制的计算部分 
   def __init__(self):
       super(ScaledDotProductAttention, self).__init__()

   def forward(self, Q, K, V, attn_mask):
       scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k) # scores : [batch_size x n_heads x len_q(=len_k) x len_k(=len_q)]  做qk乘法后除以维度
       scores.masked_fill_(attn_mask, -1e9) # Fills elements of self tensor with value where mask is one. 把pad符号的值设置为负无穷  因为设置负无穷可以让softmax的值都为0
       attn = nn.Softmax(dim=-1)(scores) # 归一化 这里可能会问为什么不能先和v乘再进行softmax  因为qk乘以得到的结果矩阵和v维度不一样 再者我们假如乘完再softmax最后会失去其注意力机制的意义 因为我们要
       context = torch.matmul(attn, V)#[1,8,5,5]x[1,8,5,64]=[1,8,5,64] 计算点积
       return context, attn

class MultiHeadAttention(nn.Module):
   def __init__(self):#通过线形层映射一个qkv的特征矩阵
       super(MultiHeadAttention, self).__init__()
       self.W_Q = nn.Linear(d_model, d_k * n_heads)
       self.W_K = nn.Linear(d_model, d_k * n_heads)
       self.W_V = nn.Linear(d_model, d_v * n_heads)
       self.linear = nn.Linear(n_heads * d_v, d_model)
       self.layer_norm = nn.LayerNorm(d_model)

   def forward(self, Q, K, V, attn_mask):
       # q: [batch_size x len_q x d_model], k: [batch_size x len_k x d_model], v: [batch_size x len_k x d_model]
       residual, batch_size = Q, Q.size(0)
       # (B, S, D) -proj-> (B, S, D) -split-> (B, S, H, W) -trans-> (B, H, S, W)这里是分头  也就是把q分成q_1 q_2.。。。
       q_s = self.W_Q(Q).view(batch_size, -1, n_heads, d_k).transpose(1,2)  # q_s: [batch_size x n_heads x len_q x d_k]
       k_s = self.W_K(K).view(batch_size, -1, n_heads, d_k).transpose(1,2)  # k_s: [batch_size x n_heads x len_k x d_k]
       v_s = self.W_V(V).view(batch_size, -1, n_heads, d_v).transpose(1,2)  # v_s: [batch_size x n_heads x len_k x d_v]
       print("W_Q", self.W_Q.weight)
       print("W_K", self.W_K.weight)
       print("W_V", self.W_V.weight)
        
       

       attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1) # attn_mask : [batch_size x n_heads x len_q x len_k]  对每个头我们都要知道他们的qkv特征的pad填充在哪所以也要分头

       # context: [batch_size x n_heads x len_q x d_v], attn: [batch_size x n_heads x len_q(=len_k) x len_k(=len_q)]
       context, attn = ScaledDotProductAttention()(q_s, k_s, v_s, attn_mask)
       context = context.transpose(1, 2).contiguous().view(batch_size, -1, n_heads * d_v) # context: [batch_size x len_q x n_heads * d_v]这里d_v是64
       output = self.linear(context)#线性层把512->512便于残差链接
       return self.layer_norm(output + residual), attn # output: [batch_size x len_q x d_model]残差链接

class PoswiseFeedForwardNet(nn.Module):#对应图的feedforward层  前馈神经网络增强非线性表达能力 具体就是将其低维度卷积->高纬度 用ReLU激活函数->还原低纬度
   def __init__(self):
       super(PoswiseFeedForwardNet, self).__init__()
       self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)#512->2048
       self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)#2048->512
       self.layer_norm = nn.LayerNorm(d_model)#顺手进行norm操作
#一维卷积 (nn.Conv1d) 需要输入的数据形状为 [batch_size, in_channels, length]。具体来说:

# batch_size:批次大小,表示输入样本的数量。
# in_channels:输入通道数,表示输入特征的维度。
# length:输入序列的长度。
   def forward(self, inputs):
       residual = inputs # inputs : [batch_size, len_q, d_model]
       output = nn.ReLU()(self.conv1(inputs.transpose(1, 2)))#这里因为一维卷积需要维度再前所以要交换特征位置
       output = self.conv2(output).transpose(1, 2)
       return self.layer_norm(output + residual)

class EncoderLayer(nn.Module):
   def __init__(self):
       super(EncoderLayer, self).__init__()
       self.enc_self_attn = MultiHeadAttention()  # 编码器自注意力层
       self.pos_ffn = PoswiseFeedForwardNet()  # 位置前馈神经网络

   def forward(self, enc_inputs, enc_self_attn_mask):
       enc_outputs, attn = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs, enc_self_attn_mask)  # enc_inputs to same Q,K,V
       enc_outputs = self.pos_ffn(enc_outputs)  # enc_outputs: [batch_size x len_q x d_model]
       return enc_outputs, attn

class DecoderLayer(nn.Module):
   def __init__(self):
       super(DecoderLayer, self).__init__()
       self.dec_self_attn = MultiHeadAttention()
       self.dec_enc_attn = MultiHeadAttention()
       self.pos_ffn = PoswiseFeedForwardNet()

   def forward(self, dec_inputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask):
       dec_outputs, dec_self_attn = self.dec_self_attn(dec_inputs, dec_inputs, dec_inputs, dec_self_attn_mask)#自注意力层
       dec_outputs, dec_enc_attn = self.dec_enc_attn(dec_outputs, enc_outputs, enc_outputs, dec_enc_attn_mask)#交互注意力层
       dec_outputs = self.pos_ffn(dec_outputs)#前馈神经网络
       return dec_outputs, dec_self_attn, dec_enc_attn

class Encoder(nn.Module):
   def __init__(self):
       super(Encoder, self).__init__()
       # 定义编码器中的词嵌入层
       self.src_emb = nn.Embedding(src_vocab_size, d_model)
       # 定义编码器中的位置编码层,从预训练的sinusoid编码表中获取
       self.pos_emb = nn.Embedding.from_pretrained(get_sinusoid_encoding_table(src_len+1, d_model),freeze=True)
       # 定义编码器中的层
       self.layers = nn.ModuleList([EncoderLayer() for _ in range(n_layers)])

   def forward(self, enc_inputs): # enc_inputs : [batch_size x source_len]
       # 将输入的词转换为词嵌入
       a=self.src_emb(enc_inputs)
       print(a)
       print(a.shape)
       enc_outputs = self.src_emb(enc_inputs) + self.pos_emb(torch.LongTensor([[1,2,3,4,0]]))
       print("enc_outputs:",enc_outputs)

       # 获取编码器自注意力时的mask
       enc_self_attn_mask = get_attn_pad_mask(enc_inputs, enc_inputs)
       enc_self_attns = []
       # 初始化编码器自注意力列表
       print("enc_self_attn_mask:",enc_self_attn_mask.shape)

       # 遍历编码器中的每一层
       for layer in self.layers:
           # 将输出传递给下一层
           enc_outputs, enc_self_attn = layer(enc_outputs, enc_self_attn_mask)
           # 将编码器自注意力添加到自注意力列表中
           enc_self_attns.append(enc_self_attn)
       # 返回编码器的输出和自注意力列表


       return enc_outputs, enc_self_attns

class Decoder(nn.Module):
   def __init__(self):
       super(Decoder, self).__init__()
       self.tgt_emb = nn.Embedding(tgt_vocab_size, d_model)
       self.pos_emb = nn.Embedding.from_pretrained(get_sinusoid_encoding_table(tgt_len+1, d_model),freeze=True)
       self.layers = nn.ModuleList([DecoderLayer() for _ in range(n_layers)])

   def forward(self, dec_inputs, enc_inputs, enc_outputs): # dec_inputs : [batch_size x target_len]
       dec_outputs = self.tgt_emb(dec_inputs) + self.pos_emb(torch.LongTensor([[5,1,2,3,4]]))#和encoder的输入一样,需要加上位置编码
       dec_self_attn_pad_mask = get_attn_pad_mask(dec_inputs, dec_inputs)#解码端的自注意力mask 也就是告诉输入的pad符号不参与自注意力计算 并且告诉哪里是pad符号
       dec_self_attn_subsequent_mask = get_attn_subsequent_mask(dec_inputs)#得到decoder输入的上三角mask矩阵
       dec_self_attn_mask = torch.gt((dec_self_attn_pad_mask + dec_self_attn_subsequent_mask), 0)#当和pad矩阵两者相加就会得到我们要的矩阵大于0的部分置为1  小于等于0 的部分置为0

       dec_enc_attn_mask = get_attn_pad_mask(dec_inputs, enc_inputs)#交叉注意力层的mask矩阵  它的输入Quer来自于Masked Multi-Head Attention的输出,Keys和Values来自于Encoder中最后一层的输出。所以dec_input输入的是q enc_inputs输入的是k v

       dec_self_attns, dec_enc_attns = [], []
       for layer in self.layers:
           dec_outputs, dec_self_attn, dec_enc_attn = layer(dec_outputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask)
           dec_self_attns.append(dec_self_attn)
           dec_enc_attns.append(dec_enc_attn)
       return dec_outputs, dec_self_attns, dec_enc_attns
# 定义Transformer类,继承自nn.Module
class Transformer(nn.Module):
   # 初始化函数
   def __init__(self):
       # 调用父类的初始化函数
       super(Transformer, self).__init__()
       # 定义编码器
       self.encoder = Encoder()
       # 定义解码器
       self.decoder = Decoder()
       # 定义一个线性层,将d_model个神经元的编码器输出转化为目标词汇表大小个神经元的输出,不添加偏置
       self.projection = nn.Linear(d_model, tgt_vocab_size, bias=False)
   # 前向传播函数
   def forward(self, enc_inputs, dec_inputs):
       # 调用编码器
       enc_outputs, enc_self_attns = self.encoder(enc_inputs)
       # 调用解码器
       dec_outputs, dec_self_attns, dec_enc_attns = self.decoder(dec_inputs, enc_inputs, enc_outputs)#enc_inputs是告诉解码端那些是pad符号
       # 调用线性层
       dec_logits = self.projection(dec_outputs) # dec_logits : [batch_size x src_vocab_size x tgt_vocab_size]
       # 返回解码 logits
       return dec_logits.view(-1, dec_logits.size(-1)), enc_self_attns, dec_self_attns, dec_enc_attns

def showgraph(attn):
   attn = attn[-1].squeeze(0)[0]
   attn = attn.squeeze(0).data.numpy()
   fig = plt.figure(figsize=(n_heads, n_heads)) # [n_heads, n_heads]
   ax = fig.add_subplot(1, 1, 1)
   ax.matshow(attn, cmap='viridis')
   ax.set_xticklabels(['']+sentences[0].split(), fontdict={'fontsize': 14}, rotation=90)
   ax.set_yticklabels(['']+sentences[2].split(), fontdict={'fontsize': 14})
   plt.show()

if __name__ == '__main__':
   sentences = ['ich mochte ein bier P', 'S i want a beer', 'i want a beer E']

   # Transformer Parameters
   # Padding Should be Zero
   src_vocab = {'P': 0, 'ich': 1, 'mochte': 2, 'ein': 3, 'bier': 4}
   src_vocab_size = len(src_vocab)

   tgt_vocab = {'P': 0, 'i': 1, 'want': 2, 'a': 3, 'beer': 4, 'S': 5, 'E': 6}
   number_dict = {i: w for i, w in enumerate(tgt_vocab)}
   tgt_vocab_size = len(tgt_vocab)

   src_len = 5 # length of source
   tgt_len = 5 # length of target

   d_model = 512  # Embedding Size  词嵌入的维度 可以理解为一个词有512个维度 它被分解成了具有一个512维度特征的向量 更浅显易懂的说就是把“我”这个词和512种不同的词作比较从而得到向量
   d_ff = 2048  # 这个先不管 是前馈神经网络所需要提高的维度
   d_k = d_v = 64  # 特征k v 的特征维度
   n_layers = 6  # number of Encoder of Decoder Layer
   n_heads = 8  # 把qkv所需要分的头数  但是要注意 n_heads * d_k = d_model 为什么?因为输入的d_model被划分成8个子空间(也就是头数)这意味着每个头输出的维度是64计算完成后拼接起来

   model = Transformer()#  Transformer模型

   criterion = nn.CrossEntropyLoss()#交叉熵损失函数
   optimizer = optim.Adam(model.parameters(), lr=0.001)#    优化器

   enc_inputs, dec_inputs, target_batch = make_batch(sentences)
   print("sentences"+str(sentences))

   print("make_batch:"+str(enc_inputs.shape))
   print("make_batch:"+str(enc_inputs))
   print("make_batch:"+str(dec_inputs.shape))
   print("make_batch:"+str(dec_inputs))
   for epoch in range(20):
       optimizer.zero_grad()
       outputs, enc_self_attns, dec_self_attns, dec_enc_attns = model(enc_inputs, dec_inputs)
       loss = criterion(outputs, target_batch.contiguous().view(-1))
       print('Epoch:', '%04d' % (epoch + 1), 'cost =', '{:.6f}'.format(loss))
       loss.backward()
       optimizer.step()

   # Test
   predict, _, _, _ = model(enc_inputs, dec_inputs)
   predict = predict.data.max(1, keepdim=True)[1]
   print(sentences[0], '->', [number_dict[n.item()] for n in predict.squeeze()])

   print('first head of last state enc_self_attns')
   showgraph(enc_self_attns)

   print('first head of last state dec_self_attns')
   showgraph(dec_self_attns)

   print('first head of last state dec_enc_attns')
   showgraph(dec_enc_attns)

原文地址:https://blog.csdn.net/wq2571931803/article/details/140082839

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!