自学内容网 自学内容网

昇思25天学习打卡营第17天|LSTM+CRF

BIOE label: 

B start of an entity; O background; I other parts of an entity

We first compute a certain score.

def compute_score(emissions, tags, seq_ends, mask, trans, start_trans, end_trans):
    seq_length, batch_size = tag.shape
    mask = mask.astype(emissions.dtype)
    score = start_trans[tags[0]]
    score += emissions[0, mnp.arange(batch_size),tags[0]]
    for i in range(1, seq_length):
        score += trans[tags[i-1], tags[i]] * mask[i]
        score  += emissions[i, mnp.arange(batch_size), tags[i]] * mask[i]
    last_tags = tags[seq_ends, mnp.arange(batch_size)]
    score += end_trans[last_tags]
    return score

how to understand the score? 

Just two thing:

 1. When we consider a input seq: x = {x1, x2, x3...} and 

a label y = {y1, y2, y3 ...} correspondingly, there exists a transportation probablity 

where score(x, y) can represent the probablity  from x to generate y.

2. Okay, so we need to acculumate probablity of x_i to y_i, and y_(i-1) to y_i since it is important to generate a reasonable next label after one has been generated.

so the former probablity is called emission probablity \Psi_{EMIT}, and the next is called transportation probablity \Psi_{TRANS}, and we can define a kind of score by:

Score(x,y) = \sum_i log \Psi_{EMIT}(x_i \to y_i ) + log\Psi_{TRANS}(y_{i-1} \to y_i) 

Next we define  a concept called normalizer., which represents the denominator of the formula below:

        

def compute_normalizer(emissions, mask, trans, start_trans, end_trans):
    seq_length = emissions.shape[0]
    score = start_trans + emissions[0]
    for i in range(1, seq_length):
        broadcast_emissions = emissions[i].expand_dims(1)
        next_score = broadcast_score + trans + broadcast_emissions
        next_score = ops.logsumexp(next_score, axis = 1)
        score = mnp.where(mask[i].expand_dims(1), next_score, score)
    score += end_trans
    return ops.logsumexp(score, axis = 1)

    Viterbi算法

def viterbi_decode(emissions, mask, trans, start_trans, end_trans):
    seq_length = mask.shape[0]
    score = start_trans + emissions[0]
    history = ()
    for i in range(1, seq_length):
        broadcast_score = score.expand_dims(2)
        broadcast_emission = emissions[i].expand_dims(1)
        next_score = broadcast_score + trans + broadcast_emission
        indices = next_score.argmax(axis=1)    
        history += (indeices, )
        next_score = next_score.max(axis = 1)
        score = mnp.where(mask[i].expand_dims(1), next_score, score)
    score += end_trans
    return score, history

reasons:

Here is a decoder to get the best sequence predicted.

def post_decode(score, history,seq_length):
    batch_size = seq_length.shape[0]
    seq_ends = seq_length - 1
    best_tags_list = []
    for idx in range(batch_size):
        batch_last_tag = score[idx].argmax(axis = 0)
        best_tags = [int(best_last_tag.asnumpy())]
        for hist in reversed(history[:seq_ends[idx]]):
            best_last_tag = hist[idx][best_tags[-1]]
            best_tags.append(int(best_last_tag.asnumpy()))
        best_tags.reverse()
        best_tags_list.append(best_tags)
    return best_tags_list
def sequence_mask(seq_length, max_length, batch_first=False):
    range_vector = mnp.arange(0, max_length, 1, seq_length.dtype)
    result = range_vector < seq_length.view(seq_length.shape + (1,))
    if batch_first:
        return result.astype(ms.int64)
    return result.astype(ms.int64).swapaxes(0,1)
class CRF(nn.Cell):
    def __init__(self, num_tags:int, batch_first:bool = False, reduction:str='sum') ->None:
        if num_tags <= 0:
            raise ValueError(f'invalid number of tags:{num_tags}')
        super().__init__()
        if reduction not in ('none', 'sum', 'mean', 'token_mean'):
            raise ValueError(f'invalid reduction:{reduction}')
        self.num_tags = num_tags
        self.batch_first = batch_first
        self.reduction = reduction
        self.start_transitions = ms.Parameter(initializer(Uniform(0.1), (num_tags,)), name = 'start_transitions')
        self.end_transitions = ms.Parameter(initializer(Uniform(0.1), (num_tags,)),name='end_transitions')
        self.transitions = ms.Parameter(initializer(Uniform(0.1),(num_tags, num_tags)),name='transitions')
    def construct(self, emissions, tags = None, seq_length = None):
        if tags  is None:
            return self._decode(emissions, seq_length)
        return self._forward(emissions, tags, seq_length)
    def _forward(self, emissions, tags = None, seq_length = None):
        if self.batch_first:
            batch_size , max_length = tags.shape
            emissions = emissions.swapaxes(0,1)
            tags = tags.swapaxes(0,1)
        else :
            max_length, batch_size = tags.shape
        if seq_length is None :
            seq_length = mnp.full((batch_size, ), max_length,ms.int64)
        mask = sequence_mask(seq_length, max_length)
        numerator = compute_score(emissions, tags, seq_length- 1, mask, self.transitions, self.start_transitions, self.end_transitions)
        llh = denominator - numerator
        if self.reduction == 'none':
            return llh
        if self.reduction == 'sum':
            return llh.sum()
        if self.reduction == 'mean':
            return llh.mean()
        return llh.sum() / mask.astype(emissions.dtype).sum()
    def _decode(self, emissions, seq_length = None):
        if self.batch_first:
            batch_size, max_length = emissions.shape[:2]
            emissions = emissions.swapaxes(0,1)
        else :
            batch_size, max_length = emissions.shape[:2]
        if seq_length is None:
            seq_length = mnp.full((batch_size,), max_length, ms.int64)
        mask = sequence_mask(seq_length, max_length)
        return viterbi_decode(emissions, mask, self.transitions, self.start_transitions, self.end_transitions)

sequence_mask is mean to generate mask_matrix.

Next we construct a BiLSTM CRF model.

the architure is :

nn.Embedding -> nn.LSTM -> nn.Dense -> CRF

where LSTM is mean to get the feature and get the emission matrix after Dense layer , finally into CRF layer.

class BiLSTM_CRF(nn.Cell):
    def __init__(self, vocab_size, embbeding_dim, hidden_dim, num_tags, padding_idx=0):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx = padding_idx)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim //2, bidirection=True, batch_first = True)   
        self.hidden2tag = nn.Dense(hidden_dim, num_tags, 'he_uniform')
        self.crf = CRF(num_tags, batch_first = True)
    def construct(self, inputs, seq_length, tags = None):
        embeds  = self.embedding(inputs)
        outputs, _= self.lstm(embeds, seq_length = seq_length)
        feats =self.hidden2tag(outputs)
        crf_outs = self.crf(feats, tags, seq_length)
        return crf_outs

Now an example is given.

enbedding_dim = 16
hidden_dim = 32
training_data = [(
        '清华大学坐落在首都北京'.split(),
        'B I I I O O O O O B I'.split()
),(
   '重庆是一个魔幻城市'.split(),
    'B I O O O O O O O'.split()
)]
word_to_idx = {}
word_to_idx['<pad>'] = 0
for sentence, tags, in training_data:
    for word in sentence:
        if word not in word_to_idx:
            word_to_idx[word] = len(word_to_idx)
tag_to_idx = {'B': 0, 'I':1, '0': 2}

we instantialize the model and choose optimizers into wrapper together.

model = BiLSTM_CRF(len(word_to_idx), embedding_dim, hidden_dim, len(tag_to_idx))
optimizer = nn.SGD(model.trainable_params(), learning_rate = 0.01, weight_decay = 1e-4)
grad_fn = ms.value_and_grad(model, None, optimizer.parameters)
def train_step(data, seq_length, label):
    loss, grads = grad_fn(data, seq_length,label)
    optimizer(grads)
    return loss

process the data by patching data into batch and pad those sequence not enough long ,

def prepare_sequence(seqs, word_to_idx, tag_to_idx):
    seq_outputs, label_outputs, seq_length = [], [], []
    max_len = max([len(i[0]) for i in seqs])
    for seq, tag in seqs:
        seq_length.append(len(seq))
        idxs = [word_to_idx[w] for w in seq]
        labels = [tag_to_idx[t] for t in tag]
        idxs.extend([word_to_idx['<pad>'] for i in range(max_len - len(seq))])
        labels.extend([tag_to_idx['0'] for i in range(max_len - len(seq))])
        seq_outputs.append(idxs)
        label_outputs.append(labels)
    return ms.Tensor(seq_outputs, ms.int64), \
            ms.Tensor(label_outputs, ms.int64),\
                ms.Tensor(seq_length, ms.int64)
       
data, label, seq_length = prepare_sequence(training_data, word_to_idx, tag_to_idx)

we visualize the training.

steps = 500
with tqdm (total=steps) as t:
    for i in range(steps):
        loss = train_step(data, seq_length, label)
        t.set_postfix(loss=loss)
        t.update(1)
score, history = model(data, seq_length)
predict = post_decode(score, history, seq_length)
idx_to_tag = {idx:tag for tag, idx in tag_to_idx.items()}
def sequence_to_tag(sequences, idx_to_tag):
    outputs = []
    for seq in sequences:
        outputs.append([idx_to_tag[i] for i in seq])
    return outputs
sequence_to_tag(predict,idx_to_tag)


原文地址:https://blog.csdn.net/2301_78538042/article/details/140592969

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!