微信扫码
添加专属顾问
 
                        我要投稿
具体原理可以参考这篇文章Transformer[2]
因为我们的中文数据是繁体字,因此需将其转换为简体:
import copyimport mathimport matplotlib.pyplot as pltimport numpy as npimport osimport seaborn as snsimport timeimport torchimport torch.nn as nnimport torch.nn.functional as Ffrom collections import Counterfrom langconv import Converterfrom nltk import word_tokenizefrom torch.autograd import Variabledef cht_to_chs(sent):sent = Converter("zh-hans").convert(sent)sent.encode("utf-8")return sent
转换完数据之后,我们需要将每句话按字粒度切分开,并构建词表,然后将单词映射为索引,并按照数据长度划分批次
class PrepareData:def __init__(self, train_file, dev_file):# 读取数据、分词self.train_en, self.train_cn = self.load_data(train_file)self.dev_en, self.dev_cn = self.load_data(dev_file)# 构建词表self.en_word_dict, self.en_total_words, self.en_index_dict = \self.build_dict(self.train_en)self.cn_word_dict, self.cn_total_words, self.cn_index_dict = \self.build_dict(self.train_cn)# 单词映射为索引self.train_en, self.train_cn = self.word2id(self.train_en, self.train_cn, self.en_word_dict, self.cn_word_dict)self.dev_en, self.dev_cn = self.word2id(self.dev_en, self.dev_cn, self.en_word_dict, self.cn_word_dict)# 划分批次、填充、掩码self.train_data = self.split_batch(self.train_en, self.train_cn, BATCH_SIZE)self.dev_data = self.split_batch(self.dev_en, self.dev_cn, BATCH_SIZE)def load_data(self, path):"""读取英文、中文数据对每条样本分词并构建包含起始符和终止符的单词列表形式如:en = [['BOS', 'i', 'love', 'you', 'EOS'], ['BOS', 'me', 'too', 'EOS'], ...]cn = [['BOS', '我', '爱', '你', 'EOS'], ['BOS', '我', '也', '是', 'EOS'], ...]"""en = []cn = []with open(path, mode="r", encoding="utf-8") as f:for line in f.readlines():sent_en, sent_cn = line.strip().split("\t")sent_en = sent_en.lower()sent_cn = cht_to_chs(sent_cn)sent_en = ["BOS"] + word_tokenize(sent_en) + ["EOS"]# 中文按字符切分sent_cn = ["BOS"] + [char for char in sent_cn] + ["EOS"]en.append(sent_en)cn.append(sent_cn)return en, cndef build_dict(self, sentences, max_words=5e4):"""构造分词后的列表数据构建单词-索引映射(key为单词,value为id值)"""# 统计数据集中单词词频word_count = Counter([word for sent in sentences for word in sent])# 按词频保留前max_words个单词构建词典# 添加UNK和PAD两个单词ls = word_count.most_common(int(max_words))total_words = len(ls) + 2word_dict = {w[0]: index + 2 for index, w in enumerate(ls)}word_dict['UNK'] = UNKword_dict['PAD'] = PAD# 构建id2word映射index_dict = {v: k for k, v in word_dict.items()}return word_dict, total_words, index_dictdef word2id(self, en, cn, en_dict, cn_dict, sort=True):"""将英文、中文单词列表转为单词索引列表`sort=True`表示以英文语句长度排序,以便按批次填充时,同批次语句填充尽量少"""length = len(en)# 单词映射为索引out_en_ids = [[en_dict.get(word, UNK) for word in sent] for sent in en]out_cn_ids = [[cn_dict.get(word, UNK) for word in sent] for sent in cn]# 按照语句长度排序def len_argsort(seq):"""传入一系列语句数据(分好词的列表形式),按照语句长度排序后,返回排序后原来各语句在数据中的索引下标"""return sorted(range(len(seq)), key=lambda x: len(seq[x]))# 按相同顺序对中文、英文样本排序if sort:# 以英文语句长度排序sorted_index = len_argsort(out_en_ids)out_en_ids = [out_en_ids[idx] for idx in sorted_index]out_cn_ids = [out_cn_ids[idx] for idx in sorted_index]return out_en_ids, out_cn_idsdef split_batch(self, en, cn, batch_size, shuffle=True):"""划分批次`shuffle=True`表示对各批次顺序随机打乱"""# 每隔batch_size取一个索引作为后续batch的起始索引idx_list = np.arange(0, len(en), batch_size)# 起始索引随机打乱if shuffle:np.random.shuffle(idx_list)# 存放所有批次的语句索引batch_indexs = []for idx in idx_list:"""形如[array([4, 5, 6, 7]),array([0, 1, 2, 3]),array([8, 9, 10, 11]),...]"""# 起始索引最大的批次可能发生越界,要限定其索引batch_indexs.append(np.arange(idx, min(idx + batch_size, len(en))))# 构建批次列表batches = []for batch_index in batch_indexs:# 按当前批次的样本索引采样batch_en = [en[index] for index in batch_index]batch_cn = [cn[index] for index in batch_index]# 对当前批次中所有语句填充、对齐长度# 维度为:batch_size * 当前批次中语句的最大长度batch_cn = seq_padding(batch_cn)batch_en = seq_padding(batch_en)# 将当前批次添加到批次列表# Batch类用于实现注意力掩码batches.append(Batch(batch_en, batch_cn))return batches
好了,接下来正式进入transformer部分
首先我们把输入的单词转为词向量,它包括token embedding和position embedding两层,编码之后的词向量再分别的流向encoder里面的两层网络。
class Embeddings(nn.Module):def __init__(self, d_model, vocab):super(Embeddings, self).__init__()# Embedding层self.lut = nn.Embedding(vocab, d_model)# Embedding维数self.d_model = d_modeldef forward(self, x):# 返回x的词向量(需要乘以math.sqrt(d_model))return self.lut(x) * math.sqrt(self.d_model)
首先一个问题,为啥要进行位置编码呢。原因在于self-attention,将任意两个字之间距离缩小为1,丢失了字的位置信息,故我们需要加上这一信息。我们也可以想到两种方法
Transformer采用了这一方式,通过奇数列cos函数,偶数列sin函数方式,利用三角函数对位置进行固定编码。
固定编码方式简洁,不需要训练。且不受embedding table维度影响,理论上可以支持任意长度文本。(但要尽量避免预测文本很长,但训练集文本较短的case)
BERT采用了这种方式。先随机初始化一个embedding table,然后训练得到table 参数值。predict时通过embedding_lookup找到每个位置的embedding。这种方式和token embedding类似。
动态训练方式,在语料比较大时,准确度比较好。但需要训练,且最致命的是,限制了输入文本长度。当文本长度大于position embedding table维度时,超出的position无法查表得到embedding(可以理解为OOV了)。这也是为什么BERT模型文本长度最大512的原因。
position encoding直接采用了三角函数。对偶数列采用sin,奇数列采用cos。
class PositionalEncoding(nn.Module):def __init__(self, d_model, dropout, max_len=5000):super(PositionalEncoding, self).__init__()self.dropout = nn.Dropout(p=dropout)# 位置编码矩阵,维度[max_len, embedding_dim]pe = torch.zeros(max_len, d_model, device=DEVICE)# 单词位置position = torch.arange(0.0, max_len, device=DEVICE)position.unsqueeze_(1)# 使用exp和log实现幂运算div_term = torch.exp(torch.arange(0.0, d_model, 2, device=DEVICE) * (- math.log(1e4) / d_model))div_term.unsqueeze_(0)# 计算单词位置沿词向量维度的纹理值pe[:, 0 : : 2] = torch.sin(torch.mm(position, div_term))pe[:, 1 : : 2] = torch.cos(torch.mm(position, div_term))# 增加批次维度,[1, max_len, embedding_dim]pe.unsqueeze_(0)# 将位置编码矩阵注册为buffer(不参加训练)self.register_buffer('pe', pe)def forward(self, x):# 将一个批次中语句所有词向量与位置编码相加# 注意,位置编码不参与训练,因此设置requires_grad=Falsex += Variable(self.pe[:, : x.size(1), :], requires_grad=False)return self.dropout(x)
def attention(query, key, value, mask=None, dropout=None):"""Scaled Dot-Product Attention"""# q、k、v向量长度为d_kd_k = query.size(-1)# 矩阵乘法实现q、k点积注意力,sqrt(d_k)归一化scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)# 注意力掩码机制if mask is not None:scores = scores.masked_fill(mask==0, -1e9)# 注意力矩阵softmax归一化p_attn = F.softmax(scores, dim=-1)# dropoutif dropout is not None:p_attn = dropout(p_attn)# 注意力对v加权return torch.matmul(p_attn, value), p_attn
MultiHeadedAttention采用多头self-attention。它先将隐向量切分为h个头,然后每个头内部进行self-attention计算,最后再concat再一起。 这样做是为了获取语义的多层信息,最后再拼接到一起,得到的输出就包含了输入的多层信息。
def clones(module, N):"""克隆基本单元,克隆的单元之间参数不共享"""return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])class MultiHeadedAttention(nn.Module):"""Multi-Head Attention"""def __init__(self, h, d_model, dropout=0.1):super(MultiHeadedAttention, self).__init__()"""`h`:注意力头的数量`d_model`:词向量维数"""# 确保整除assert d_model % h == 0# q、k、v向量维数self.d_k = d_model // h# 头的数量self.h = h# WQ、WK、WV矩阵及多头注意力拼接变换矩阵WOself.linears = clones(nn.Linear(d_model, d_model), 4)self.attn = Noneself.dropout = nn.Dropout(p=dropout)def forward(self, query, key, value, mask=None):if mask is not None:mask = mask.unsqueeze(1)# 批次大小nbatches = query.size(0)# WQ、WK、WV分别对词向量线性变换,并将结果拆成h块query, key, value = [l(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2)for l, x in zip(self.linears, (query, key, value))]# 注意力加权x, self.attn = attention(query, key, value, mask=mask, dropout=self.dropout)# 多头注意力加权拼接x = x.transpose(1, 2).contiguous().view(nbatches, -1, self.h * self.d_k)# 对多头注意力加权拼接结果线性变换return self.linears[-1](x)
Add & Norm 层由 Add 和 Norm 两部分组成,其计算公式如下:
其中 X表示 Multi-Head Attention 或者 Feed Forward 的输入,MultiHeadAttention(X) 和 FeedForward(X) 表示输出 (输出与输入 X 维度是一样的,所以可以相加)。
Add指 X+MultiHeadAttention(X),是一种残差连接,通常用于解决多层网络训练的问题,可以让网络只关注当前差异的部分,在 ResNet 中经常用到。
Norm指 Layer Normalization,通常用于 RNN 结构,Layer Normalization 会将每一层神经元的输入都转成均值方差都一样的,这样可以加快收敛。
class SublayerConnection(nn.Module):"""通过层归一化和残差连接,连接Multi-Head Attention和Feed Forward"""def __init__(self, size, dropout):super(SublayerConnection, self).__init__()self.norm = LayerNorm(size)self.dropout = nn.Dropout(dropout)def forward(self, x, sublayer):# 层归一化x_ = self.norm(x)x_ = sublayer(x_)x_ = self.dropout(x_)# 残差连接return x + x_
Feed Forward 层比较简单,是一个两层的全连接层,第一层的激活函数为 Relu,第二层不使用激活函数,对应的公式如下:
X是输入,Feed Forward 最终得到的输出矩阵的维度与 X 一致。
class PositionwiseFeedForward(nn.Module):def __init__(self, d_model, d_ff, dropout=0.1):super(PositionwiseFeedForward, self).__init__()self.w_1 = nn.Linear(d_model, d_ff)self.w_2 = nn.Linear(d_ff, d_model)self.dropout = nn.Dropout(dropout)def forward(self, x):x = self.w_1(x)x = F.relu(x)x = self.dropout(x)x = self.w_2(x)return x
通过上面描述的 Multi-Head Attention, Feed Forward, Add & Norm 就可以构造出一个 Encoder block,Encoder block 接收输入矩阵 X(n×d),并输出一个矩阵 O(n×d)。通过多个 Encoder block 叠加就可以组成 Encoder。 第一个 Encoder block 的输入为句子单词的表示向量矩阵,后续 Encoder block 的输入是前一个 Encoder block 的输出,最后一个 Encoder block 输出的矩阵就是 编码信息矩阵 C,这一矩阵后续会用到 Decoder 中。
class EncoderLayer(nn.Module):def __init__(self, size, self_attn, feed_forward, dropout):super(EncoderLayer, self).__init__()self.self_attn = self_attnself.feed_forward = feed_forward# SublayerConnection作用连接multi和ffnself.sublayer = clones(SublayerConnection(size, dropout), 2)# d_modelself.size = sizedef forward(self, x, mask):# 将embedding层进行Multi head Attentionx = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))# attn的结果直接作为下一层输入return self.sublayer[1](x, self.feed_forward)
class Encoder(nn.Module):def __init__(self, layer, N):"""layer = EncoderLayer"""super(Encoder, self).__init__()# 复制N个编码器基本单元self.layers = clones(layer, N)# 层归一化self.norm = LayerNorm(layer.size)def forward(self, x, mask):"""循环编码器基本单元N次"""for layer in self.layers:x = layer(x, mask)return self.norm(x)
上图红色部分为 Transformer 的 Decoder block 结构,与 Encoder block 相似,但是存在一些区别:
•包含两个 Multi-Head Attention 层。•第一个Masked Multi-Head Attention 层采用了 Masked 操作。•第二个 Multi-Head Attention 层的 K, V 矩阵使用 Encoder 的编码信息矩阵 C 进行计算,而 Q 使用上一个 Decoder block 的输出计算。•最后有一个 Softmax 层计算下一个翻译单词的概率。
Decoder block 的第一个 Masked Multi-Head Self-Attention 采用了 Masked 操作,因为在翻译的过程中是顺序翻译的,即翻译完第 i 个单词,才可以翻译第 i+1 个单词。通过 Masked 操作可以防止第 i 个单词知道 i+1 个单词之后的信息。下面以 "我有一只猫" 翻译成 "I have a cat" 为例,了解一下 Masked 操作。
下面的描述中使用了类似 Teacher Forcing 的概念,不熟悉 Teacher Forcing 的童鞋可以参考以下上一篇文章Seq2Seq 模型详解。在 Decoder 的时候,是需要根据之前的翻译,求解当前最有可能的翻译,如下图所示。首先根据输入 "
Decoder 可以在训练的过程中使用 Teacher Forcing 并且并行化训练,即将正确的单词序列 (
第一步:是 Decoder 的输入矩阵和 Mask 矩阵,输入矩阵包含 "
第四步:使用 Mask QKT 与矩阵 V相乘,得到输出 Z,则单词 1 的输出向量 Z1 是只包含单词 1 信息的。
第五步:通过上述步骤就可以得到一个 Mask Multi-Head Self-Attention 的输出矩阵 Zi,然后和 Encoder 类似,通过 Multi-Head Self-Attention 拼接多个输出 Zi 然后计算得到第一个 Mask Multi-Head Self-Attention 的输出 Z,Z与输入 X 维度一样。
def subsequent_mask(size):"Mask out subsequent positions."# 设定subsequent_mask矩阵的shapeattn_shape = (1, size, size)# 生成一个右上角(不含主对角线)为全1,左下角(含主对角线)为全0的subsequent_mask矩阵subsequent_mask = np.triu(np.ones(attn_shape), k=1).astype('uint8')# 返回一个右上角(不含主对角线)为全False,左下角(含主对角线)为全True的subsequent_mask矩阵return torch.from_numpy(subsequent_mask) == 0
Decoder block 第二个 Multi-Head Attention 变化不大, 主要的区别在于其中 Self-Attention 的 K, V矩阵不是使用 上一个 Decoder block 的输出计算的,而是使用 Encoder 的编码信息矩阵 C 计算的。
根据 Encoder 的输出 C计算得到 K, V,根据上一个 Decoder block 的输出 Z 计算 Q (如果是第一个 Decoder block 则使用输入矩阵 X 进行计算),后续的计算方法与之前描述的一致。
这样做的好处是在 Decoder 的时候,每一位单词都可以利用到 Encoder 所有单词的信息 。
class DecoderLayer(nn.Module):def __init__(self, size, self_attn, src_attn, feed_forward, dropout):super(DecoderLayer, self).__init__()self.size = size# 自注意力机制self.self_attn = self_attn# 上下文注意力机制self.src_attn = src_attnself.feed_forward = feed_forwardself.sublayer = clones(SublayerConnection(size, dropout), 3)def forward(self, x, memory, src_mask, tgt_mask):# memory为编码器输出隐表示m = memory# 自注意力机制,q、k、v均来自解码器隐表示x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, tgt_mask))# 上下文注意力机制:q为来自解码器隐表示,而k、v为编码器隐表示x = self.sublayer[1](x, lambda x: self.self_attn(x, m, m, src_mask))return self.sublayer[2](x, self.feed_forward)
class Decoder(nn.Module):def __init__(self, layer, N):super(Decoder, self).__init__()self.layers = clones(layer, N)self.norm = LayerNorm(layer.size)def forward(self, x, memory, src_mask, tgt_mask):"""循环解码器基本单元N次"""for layer in self.layers:x = layer(x, memory, src_mask, tgt_mask)return self.norm(x)
class Generator(nn.Module):"""解码器输出经线性变换和softmax函数映射为下一时刻预测单词的概率分布"""def __init__(self, d_model, vocab):super(Generator, self).__init__()# decode后的结果,先进入一个全连接层变为词典大小的向量self.proj = nn.Linear(d_model, vocab)def forward(self, x):# 然后再进行log_softmax操作(在softmax结果上再做多一次log运算)return F.log_softmax(self.proj(x), dim=-1)
class Transformer(nn.Module):def __init__(self, encoder, decoder, src_embed, tgt_embed, generator):super(Transformer, self).__init__()self.encoder = encoderself.decoder = decoderself.src_embed = src_embedself.tgt_embed = tgt_embedself.generator = generatordef encode(self, src, src_mask):return self.encoder(self.src_embed(src), src_mask)def decode(self, memory, src_mask, tgt, tgt_mask):return self.decoder(self.tgt_embed(tgt), memory, src_mask, tgt_mask)def forward(self, src, tgt, src_mask, tgt_mask):# encoder的结果作为decoder的memory参数传入,进行decodereturn self.decode(self.encode(src, src_mask), src_mask, tgt, tgt_mask)def make_model(src_vocab, tgt_vocab, N=6, d_model=512, d_ff=2048, h=8, dropout=0.1):c = copy.deepcopy# 实例化Attention对象attn = MultiHeadedAttention(h, d_model).to(DEVICE)# 实例化FeedForward对象ff = PositionwiseFeedForward(d_model, d_ff, dropout).to(DEVICE)# 实例化PositionalEncoding对象position = PositionalEncoding(d_model, dropout).to(DEVICE)# 实例化Transformer模型对象model = Transformer(Encoder(EncoderLayer(d_model, c(attn), c(ff), dropout).to(DEVICE), N).to(DEVICE),Decoder(DecoderLayer(d_model, c(attn), c(attn), c(ff), dropout).to(DEVICE), N).to(DEVICE),nn.Sequential(Embeddings(d_model, src_vocab).to(DEVICE), c(position)),nn.Sequential(Embeddings(d_model, tgt_vocab).to(DEVICE), c(position)),Generator(d_model, tgt_vocab)).to(DEVICE)# This was important from their code.# Initialize parameters with Glorot / fan_avg.for p in model.parameters():if p.dim() > 1:# 这里初始化采用的是nn.init.xavier_uniformnn.init.xavier_uniform_(p)return model.to(DEVICE)
为了防止模型在训练时过于自信地预测标签,改善模型的泛化能力,我们可以增加一个label smoothing的操作
class LabelSmoothing(nn.Module):"""标签平滑"""def __init__(self, size, padding_idx, smoothing=0.0):super(LabelSmoothing, self).__init__()self.criterion = nn.KLDivLoss(reduction='sum')self.padding_idx = padding_idxself.confidence = 1.0 - smoothingself.smoothing = smoothingself.size = sizeself.true_dist = Nonedef forward(self, x, target):assert x.size(1) == self.sizetrue_dist = x.data.clone()true_dist.fill_(self.smoothing / (self.size - 2))true_dist.scatter_(1, target.data.unsqueeze(1), self.confidence)true_dist[:, self.padding_idx] = 0mask = torch.nonzero(target.data == self.padding_idx)if mask.dim() > 0:true_dist.index_fill_(0, mask.squeeze(), 0.0)self.true_dist = true_distreturn self.criterion(x, Variable(true_dist, requires_grad=False))
1.Transformer 与 RNN 不同,可以比较好地并行训练。2.Transformer 中 Multi-Head Attention 中有多个 Self-Attention,可以捕获单词之间多种维度上的相关系数 attention score。3.由于 self-attention 没有循环结构,Transformer 需要一种方式来表示序列中元素的相对或绝对位置关系。Position Embedding (PE) 就是该文提出的方案。但在一些研究中,模型加上 PE 和不加上 PE 并不见得有明显的差异
全部代码和数据都已经上传github transformer-english2Chinese[3]
[1] TOC: 基于Transformer的翻译模型(英->中)[2] Transformer: https://blog.csdn.net/u012526436/article/details/86295971[3] transformer-english2Chinese: https://github.com/seanzhang-zhichen/-transformer-english2chinese-
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业
 
            2025-08-21
2025-08-20
2025-09-07
2025-08-21
2025-08-19
2025-08-05
2025-09-16
2025-10-02
2025-08-20
2025-09-08
2025-10-31
2025-10-29
2025-10-29
2025-10-29
2025-10-28
2025-10-28
2025-10-28
2025-10-27