自学内容网 自学内容网

w04_nlp大模型训练·中文分词

一、基于pytorch的网络编写一个分词模型

#coding:utf8

import torch
import torch.nn as nn
import jieba
import numpy as np
import random
import json
from torch.utils.data import DataLoader

"""
基于pytorch的网络编写一个分词模型
我们使用jieba分词的结果作为训练数据
看看是否可以得到一个效果接近的神经网络模型
"""

# TorchModel 类定义了一个包含嵌入层、RNN 层和线性分类层的神经网络
# 使用 nn.Embedding 将字符映射到高维空间,通过 nn.RNN 处理序列信息,使用 nn.Linear 进行分类。
class TorchModel(nn.Module):
    def __init__(self, input_dim, hidden_size, num_rnn_layers, vocab):
        super(TorchModel, self).__init__()
        self.embedding = nn.Embedding(len(vocab)+1, input_dim, padding_idx=0)
        self.rnn_layer = nn.RNN(input_size=input_dim,
                          hidden_size=hidden_size,
                          batch_first=True,
                          num_layers=num_rnn_layers
                          )
        self.classify = nn.Linear(hidden_size, 2)
        self.loss_func = nn.CrossEntropyLoss(ignore_index=-100)

    def forward(self, x, y=None):
        x = self.embedding(x)  #input shape: (batch_size, sen_len), output shape:(batch_size, sen_len, input_dim)
        x, _ = self.rnn_layer(x)  #output shape:(batch_size, sen_len, hidden_size)
        y_pred = self.classify(x)   #output shape:(batch_size, sen_len, 2) -> y_pred.view(-1, 2) (batch_size*sen_len, 2)
        if y is not None:
            return self.loss_func(y_pred.view(-1, 2), y.view(-1))
        else:
            return y_pred


# Dataset 类用于处理语料库数据,从文件中读取句子
# 使用 sentence_to_sequence 将句子转换为数字序列,使用 sequence_to_label 生成标记序列,使用 padding 方法将序列和标签填充到固定长度。
class Dataset:
    def __init__(self, corpus_path, vocab, max_length):
        self.vocab = vocab
        self.corpus_path = corpus_path
        self.max_length = max_length
        self.load()

    def load(self):
        self.data = []
        with open(self.corpus_path, encoding="utf8") as f:
            for line in f:
                sequence = sentence_to_sequence(line, self.vocab)
                label = sequence_to_label(line)
                sequence, label = self.padding(sequence, label)
                sequence = torch.LongTensor(sequence)
                label = torch.LongTensor(label)
                self.data.append([sequence, label])
                #使用部分数据做展示,使用全部数据训练时间会相应变长
                if len(self.data) > 10000:
                    break

    #将文本截断或补齐到固定长度
    def padding(self, sequence, label):
        sequence = sequence[:self.max_length]
        sequence += [0] * (self.max_length - len(sequence))
        label = label[:self.max_length]
        label += [-100] * (self.max_length - len(label))
        return sequence, label

    def __len__(self):
        return len(self.data)

    def __getitem__(self, item):
        return self.data[item]

#文本转化为数字序列,为embedding做准备
def sentence_to_sequence(sentence, vocab):
    sequence = [vocab.get(char, vocab['unk']) for char in sentence]
    return sequence

#基于结巴生成分级结果的标注
def sequence_to_label(sentence):
    words = jieba.lcut(sentence)
    label = [0] * len(sentence)
    pointer = 0
    for word in words:
        pointer += len(word)
        label[pointer - 1] = 1
    return label

#加载字表
def build_vocab(vocab_path):
    vocab = {}
    with open(vocab_path, "r", encoding="utf8") as f:
        for index, line in enumerate(f):
            char = line.strip()
            vocab[char] = index + 1   #每个字对应一个序号
    vocab['unk'] = len(vocab) + 1
    return vocab

#建立数据集
def build_dataset(corpus_path, vocab, max_length, batch_size):
    dataset = Dataset(corpus_path, vocab, max_length) #diy __len__ __getitem__
    data_loader = DataLoader(dataset, shuffle=True, batch_size=batch_size) #torch
    return data_loader


def main():
    epoch_num = 5        #训练轮数
    batch_size = 20       #每次训练样本个数
    char_dim = 50         #每个字的维度
    hidden_size = 100     #隐含层维度
    num_rnn_layers = 1    #rnn层数
    max_length = 20       #样本最大长度
    learning_rate = 1e-3  #学习率
    vocab_path = "chars.txt"  #字表文件路径
    corpus_path = "../corpus.txt"  #语料文件路径
    vocab = build_vocab(vocab_path)       #建立字表
    data_loader = build_dataset(corpus_path, vocab, max_length, batch_size)  #建立数据集
    model = TorchModel(char_dim, hidden_size, num_rnn_layers, vocab)   #建立模型
    optim = torch.optim.Adam(model.parameters(), lr=learning_rate)     #建立优化器
    #训练开始
    for epoch in range(epoch_num):
        model.train()
        watch_loss = []
        for x, y in data_loader:
            optim.zero_grad()    #梯度归零
            loss = model.forward(x, y)   #计算loss
            loss.backward()      #计算梯度
            optim.step()         #更新权重
            watch_loss.append(loss.item())
        print("=========\n第%d轮平均loss:%f" % (epoch + 1, np.mean(watch_loss)))
    #保存模型
    torch.save(model.state_dict(), "model.pth")
    return

#最终预测
def predict(model_path, vocab_path, input_strings):
    #配置保持和训练时一致
    char_dim = 50  # 每个字的维度
    hidden_size = 100  # 隐含层维度
    num_rnn_layers = 1  # rnn层数
    vocab = build_vocab(vocab_path)       #建立字表
    model = TorchModel(char_dim, hidden_size, num_rnn_layers, vocab)   #建立模型
    model.load_state_dict(torch.load(model_path))   #加载训练好的模型权重
    model.eval()
    for input_string in input_strings:
        #逐条预测
        x = sentence_to_sequence(input_string, vocab)
        with torch.no_grad():
            result = model.forward(torch.LongTensor([x]))[0]
            result = torch.argmax(result, dim=-1)  #预测出的01序列
            #在预测为1的地方切分,将切分后文本打印出来
            for index, p in enumerate(result):
                if p == 1:
                    print(input_string[index], end=" ")
                else:
                    print(input_string[index], end="")
            print()



if __name__ == "__main__":
    # main()
    input_strings = ["同时国内有望出台新汽车刺激方案",
                     "沪胶后市有望延续强势",
                     "经过两个交易日的强势调整后",
                     "昨日上海天然橡胶期货价格再度大幅上扬"]
    
    predict("model.pth", "chars.txt", input_strings)

模型分析

  • 模型定义
    • TorchModel 类定义了一个包含嵌入层、RNN 层和线性分类层的神经网络,使用 nn.Embedding 将字符映射到高维空间,通过 nn.RNN 处理序列信息,使用 nn.Linear 进行分类。
    • 数据集处理
      • Dataset 类用于处理语料库数据,从文件中读取句子,使用 sentence_to_sequence 将句子转换为数字序列,使用 sequence_to_label 生成标记序列,使用 padding 方法将序列和标签填充到固定长度。
      • build_vocab 函数从文件中构建词汇表,build_dataset 函数使用 Dataset 类和 DataLoader 进行批处理。
    • 训练部分
      • main 函数中,设置超参数,构建模型和优化器,进行多轮训练,计算损失、反向传播和更新参数,保存训练好的模型。
    • 预测部分
      • predict 函数加载训练好的模型和词汇表,对输入句子进行分词预测,将预测为 1 的位置进行分词,将结果打印输出。
  • 数据预处理
    • sentence_to_sequence 函数将输入的句子中的字符根据词汇表转换为数字序列,未在词汇表中的字符使用 unk 的索引。
    • sequence_to_label 函数利用结巴分词的结果,将分词结束位置标记为 1,其余为 0,生成标记序列。
    • Dataset 类的 padding 方法确保所有序列和标签具有相同的长度,便于批处理。
  • 模型架构
    • TorchModel 类的 embedding 层将输入的数字序列映射到高维空间,rnn_layer 处理序列信息,classify 层将 RNN 的输出映射到 2 个类别(分词或不分词)。
    • 训练时使用 CrossEntropyLoss 计算损失,预测时使用 torch.argmax 找到最可能的类别。
  • 训练和预测流程
    • main 函数设置训练的超参数,创建数据集和模型,使用 Adam 优化器进行优化,保存训练好的模型。
    • predict 函数加载训练好的模型,对输入句子进行分词预测并输出结果。

二、DAG(有向无环图)法做分词

import jieba

#词典,每个词后方存储的是其词频,仅为示例,也可自行添加
Dict = {"经常":0.1,
        "经":0.05,
        "有":0.1,
        "常":0.001,
        "有意见":0.1,
        "歧":0.001,
        "意见":0.2,
        "分歧":0.2,
        "见":0.05,
        "意":0.05,
        "见分歧":0.05,
        "分":0.1}

#根据上方词典,对于输入文本,构造一个存储有所有切分方式的信息字典
#学术叫法为有向无环图,DAG(Directed Acyclic Graph),不理解也不用纠结,只当是个专属名词就好
#这段代码直接来自于jieba分词
# jieba.cut
def calc_dag(sentence):
    DAG = {}
    n = len(sentence)
    for k in range(n):
        i = k
        tmplist = []
        while i < n:
            frag = sentence[k: i+1]
            if frag in Dict:
                tmplist.append(i)
            i += 1
        if not tmplist:
            tmplist = [k]
        DAG[k] = tmplist
    return DAG

sentence = "经常有意见分歧"
print(calc_dag(sentence))
#结果应该为{0: [0, 1], 1: [1], 2: [2, 4], 3: [3, 4], 4: [4, 6], 5: [5, 6], 6: [6]}
#0:[0,1]代表句子中的第0个字,可以单独成词,或与第1个字一起成词
#2:[2,4]代表句子中的第2个字,可以单独成词,或第2-4个字一起成词
#依次类推
#这个字典中实际上就存储了所有可能的切分方式的信息


#将DAG中的信息解码(还原)出来,用文本展示出所有切分方式
class DAGDecode:
    #通过两个队列来实现
    def __init__(self, sentence):
        self.sentence = sentence
        self.DAG = calc_dag(sentence)  #使用了上方的函数
        self.length = len(sentence)
        self.unfinish_path = [[]]   #保存待解码序列的队列
        self.finish_path = []  #保存解码完成的序列的队列

    #对于每一个序列,检查是否需要继续解码
    #不需要继续解码的,放入解码完成队列
    #需要继续解码的,将生成的新队列,放入待解码队列
    #path形如:["经常", "有", "意见"]
    def decode_next(self, path):
        path_length = len("".join(path))
        if path_length == self.length:  #已完成解码
            self.finish_path.append(path)
            return
        candidates = self.DAG[path_length]
        new_paths = []
        for candidate in candidates:
            new_paths.append(path + [self.sentence[path_length:candidate+1]])
        self.unfinish_path += new_paths  #放入待解码对列
        return

    #递归调用序列解码过程
    def decode(self):
        while self.unfinish_path != []:
            path = self.unfinish_path.pop(0) #从待解码队列中取出一个序列
            self.decode_next(path)     #使用该序列进行解码


sentence = "经常有意见分歧"
dd = DAGDecode(sentence)
dd.decode()
print(dd.finish_path)

代码分析

一、函数和类的功能分析:

  • calc_dag(sentence)函数:

    • 功能:
      • 该函数的主要目的是根据输入的句子和预定义的词典 Dict 构建一个有向无环图(DAG),用于存储句子中所有可能的词切分信息。
    • 实现步骤:
      1. 首先,初始化一个空字典 DAG 用于存储结果。
      2. 获取输入句子的长度 n
      3. 遍历句子中的每个字符,从当前字符开始,通过不断增加子串长度,检查子串是否在 Dict 中。
      4. 若子串在 Dict 中,将该子串结束字符的索引添加到 tmplist 中。
      5. 若 tmplist 为空,说明当前字符没有可切分的词,将当前字符索引添加到 tmplist
      6. 最后将 k 作为键,tmplist 作为值存储在 DAG 中。
  • DAGDecode类:

    • __init__(self, sentence)方法:
      • 功能:
        • 对输入的句子进行初始化操作,为后续的解码操作准备所需的数据结构。
      • 实现步骤:
        1. 存储输入的句子。
        2. 调用 calc_dag(sentence) 函数生成有向无环图,并存储在 self.DAG 中。
        3. 存储句子的长度。
        4. 初始化两个队列:self.unfinish_path 存储待解码的序列,初始化为只包含一个空列表的列表;self.finish_path 存储已完成解码的序列,初始化为空列表。
    • decode_next(self, path)方法:
      • 功能:
        • 对于给定的部分解码路径,判断是否完成解码,若未完成则根据 self.DAG 生成新的待解码路径并添加到 self.unfinish_path 中,若完成则添加到 self.finish_path 中。
      • 实现步骤:
        1. 计算当前 path 所代表的字符串的长度。
        2. 若长度等于句子长度,说明解码完成,将 path 加入 self.finish_path
        3. 若未完成,根据 self.DAG 中存储的信息,找出可能的下一个词的结束位置,生成新的解码路径并添加到 self.unfinish_path 中。
    • decode(self)方法:
      • 功能:
        • 循环从 self.unfinish_path 中取出路径,调用 decode_next 方法进行解码,直到 self.unfinish_path 为空。
      • 实现步骤:
        1. 只要 self.unfinish_path 不为空,就取出其中的一个元素。
        2. 调用 decode_next 方法对该元素进行解码。

二、代码逻辑总结:

  • 首先,使用 calc_dag(sentence) 函数对输入的句子构建一个有向无环图,该图以字典的形式存储了从每个字符开始的所有可能的词切分信息。例如对于输入 "经常有意见分歧",会得到 {0: [0, 1], 1: [1], 2: [2, 4], 3: [3, 4], 4: [4, 6], 5: [5, 6], 6: [6]}
  • 然后,DAGDecode 类利用这个有向无环图进行解码操作:
    • 在 __init__ 阶段,存储句子、有向无环图、句子长度,并初始化待解码和已完成解码的队列。
    • decode_next 方法会根据当前的部分解码结果判断是否继续解码,若继续解码,会根据 DAG 生成新的可能路径添加到待解码队列中,若完成则添加到已完成队列中。
    • decode 方法通过不断调用 decode_next 方法处理待解码队列中的元素,最终将所有可能的句子切分方式存储在 finish_path 中。

三、代码解释示例:

  • 以输入句子 "经常有意见分歧" 为例:
    • 在 calc_dag 函数中:
      • 从 k = 0 开始,"经" 在 Dict 中,"经常" 也在 Dict 中,所以 DAG[0] = [0, 1]
      • 对于 k = 1,只有 "常" 在 Dict 中,所以 DAG[1] = [1]
      • 对于 k = 2"有" 在 Dict 中,"有意见" 也在 Dict 中,所以 DAG[2] = [2, 4]
      • 以此类推,最终得到完整的 DAG
    • 在 DAGDecode 类中:
      • 初始化时,unfinish_path = [[]]finish_path = []
      • 第一次调用 decode_next 对 [] 进行处理,会根据 DAG[0] 生成 ["经"] 和 ["经常"] 等新路径添加到 unfinish_path
      • 不断循环调用 decode_next,直到 unfinish_path 为空,最终得到所有可能的句子切分方式存储在 finish_path 中。

原文地址:https://blog.csdn.net/Captain823Jack/article/details/144696310

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!