|
网站内容均来自网络,本站只提供信息平台,如有侵权请联系删除,谢谢!
本文为转载,原文链接:https://wmathor.com/index.php/archives/1455/
本文主要介绍一下如何使用PyTorch复现Transformer,实现简单的机器翻译任务。关于Transformer的详细介绍可以参考这篇文章Transformer详解。
Transformer结构
数据预处理
这里我并没有用什么大型的数据集,而是手动输入了两对德语→英语的句子,还有每个字的索引也是我手动硬编码上去的,主要是为了降低代码阅读难度,我希望读者能更关注模型实现的部分
import mathimport torchimport numpy as npimport torch.nn as nnimport torch.optim as optimimport torch.utils.data as Datadevice = 'cuda' if torch.cuda.is_available() else 'cpu'# S: Symbol that shows starting of decoding input# E: Symbol that shows starting of decoding output# P: Symbol that will fill in blank sequence if current batch data size is short than time stepssentences = [ # enc_input dec_input dec_output ['ich mochte ein bier P', 'S i want a beer .', 'i want a beer . E'], ['ich mochte ein cola P', 'S i want a coke .', 'i want a coke . E']]# Padding Should be Zerosrc_vocab = {'P': 0, 'ich': 1, 'mochte': 2, 'ein': 3, 'bier': 4, 'cola': 5}src_vocab_size = len(src_vocab)tgt_vocab = {'P': 0, 'i': 1, 'want': 2, 'a': 3, 'beer': 4, 'coke': 5, 'S': 6, 'E': 7, '.': 8}idx2word = {i: w for i, w in enumerate(tgt_vocab)}tgt_vocab_size = len(tgt_vocab)src_len = 5 # enc_input max sequence lengthtgt_len = 6 # dec_input(=dec_output) max sequence lengthdef make_data(sentences): enc_inputs, dec_inputs, dec_outputs = [], [], [] for i in range(len(sentences)): enc_input = [[src_vocab[n] for n in sentences[0].split()]] # [[1, 2, 3, 4, 0], [1, 2, 3, 5, 0]] dec_input = [[tgt_vocab[n] for n in sentences[1].split()]] # [[6, 1, 2, 3, 4, 8], [6, 1, 2, 3, 5, 8]] dec_output = [[tgt_vocab[n] for n in sentences[2].split()]] # [[1, 2, 3, 4, 8, 7], [1, 2, 3, 5, 8, 7]] enc_inputs.extend(enc_input) dec_inputs.extend(dec_input) dec_outputs.extend(dec_output) return torch.LongTensor(enc_inputs), torch.LongTensor(dec_inputs), torch.LongTensor(dec_outputs)enc_inputs, dec_inputs, dec_outputs = make_data(sentences)class MyDataSet(Data.Dataset): def __init__(self, enc_inputs, dec_inputs, dec_outputs): super(MyDataSet, self).__init__() self.enc_inputs = enc_inputs self.dec_inputs = dec_inputs self.dec_outputs = dec_outputs def __len__(self): return self.enc_inputs.shape[0] def __getitem__(self, idx): return self.enc_inputs[idx], self.dec_inputs[idx], self.dec_outputs[idx]loader = Data.DataLoader(MyDataSet(enc_inputs, dec_inputs, dec_outputs), 2, True)模型参数
下面变量代表的含义依次是
- 字嵌入 & 位置嵌入的维度,这俩值是相同的,因此用一个变量就行了
- FeedForward 层隐藏神经元个数
- Q、K、V 向量的维度,其中 Q 与 K 的维度必须相等,V 的维度没有限制,不过为了方便起见,我都设为 64
- Encoder 和 Decoder 的个数
- 多头注意力中 head 的数量
# Transformer Parametersd_model = 512 # Embedding sized_ff = 2048 # FeedForward dimensiond_k = d_v = 64 # dimension of K(=Q), Vn_layers = 6 # number of Encoder and Decoder Layern_heads = 8 # number of heads in Multi-Head Attention上面都比较简单,下面开始涉及到模型就比较复杂了,因此我会将模型拆分成以下几个部分进行讲解
- Positional Encoding
- Pad Mask(针对句子不够长,加了 pad,因此需要对 pad 进行 mask)
- Subsequence Mask(Decoder input 不能看到未来时刻单词信息,因此需要 mask)
- ScaledDotProductAttention(计算 context vector)
- Multi-Head Attention
- FeedForward Layer
- Encoder Layer
- Encoder
- Decoder Layer
- Decoder
- Transformer
关于代码中的注释,如果值为 src_len 或者 tgt_len 的,我一定会写清楚,但是有些函数或者类,Encoder 和 Decoder 都有可能调用,因此就不能确定究竟是 src_len 还是 tgt_len,对于不确定的,我会记作 seq_len
Positional Encoding
根据公式给出:
(B,S,D_new) -split -> (B, S, H, W) -> trans -> (B,H,S,W) # 分解为MultiHead Attention Q = self.W_Q(input_Q).view(batch_size,-1, n_heads, d_k).transpose(1,2) # Q:[batch_size, n_heads, len_q, d_k] K = self.W_K(input_K).view(batch_size,-1, n_heads, d_k).transpose(1,2) # K:[batch_size, n_heads, len_k, d_k] V = self.W_V(input_V).view(batch_size,-1, n_heads, d_v).transpose(1,2) # V:[batch_size, n_heads, len_v(=len_k, d_v] attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1) # attn_mask: [batch_size,n_heads, seq_len, seq_len] # [batch_size, n_heads, len_q, d_v], attn: [batch_size, n_heads, len_q, len_k] context, attn = ScaledDotProductAttention()(Q,K,V, attn_mask) context = context.transpose(1,2).reshape(batch_size, -1, n_heads * d_v) # context: [batch_size, len_q, n_heads * d_v] output = self.fc(context) return nn.LayerNorm(d_model).to(device)(output+residual),attn # Layer Normalization完整代码中一定会有三处地方调用 MultiHeadAttention(),Encoder Layer 调用一次,传入的 input_Q、input_K、input_V 全部都是 enc_inputs;Decoder Layer 中两次调用,第一次传入的全是 dec_inputs,第二次传入的分别是 dec_outputs,enc_outputs,enc_outputs
FeedForward Layer
class PoswiseFeedForwardNet(nn.Module): def __init__(self): super(PoswiseFeedForwardNet, self).__init__() self.fc = nn.Sequential( nn.Linear(d_model,d_ff,bias=False), nn.ReLU(), nn.Linear(d_ff, d_model, bias=False) ) def forward(self, inputs): ''' :param inputs: [batch_size, seq_len, d_model] :return: ''' residual = inputs output = self.fc(inputs) return nn.LayerNorm(d_model).to(device)(output+residual) #[batch_size, seq_len, d_model]这段代码非常简单,就是做两次线性变换,残差连接后再跟一个 Layer Norm
Encoder Layer
class EncoderLayer(nn.Module): def __init__(self): super(EncoderLayer, self).__init__() self.enc_self_attn = MultiHeadAttention() self.pos_ffn = PoswiseFeedForwardNet() def forward(self,enc_inputs, enc_self_attn_mask): ''' :param enc_inputs: [batch_size, src_len, d_model] :param enc_self_attn_mask: [batch_size, src_len, src_len] :return: ''' # enc_outputs: [batch_size, src_len, d_model], attn: [batch_size, n_heads, src_len, src_len] enc_outputs, attn = self.enc_self_attn(enc_inputs,enc_inputs,enc_inputs,enc_self_attn_mask) # enc_inputs to same Q,K,V enc_outputs = self.pos_ffn(enc_outputs) # enc_outputs: [batch_size, src_len, d_model] return enc_outputs, attn将上述组件拼起来,就是一个完整的 Encoder Layer
Encoder
使用nn.ModuleList(),里面的参数是列表,列表里面存了n_layers个Encoder Layer
由于我们控制好了Encoder Layer的输入和输出维度相同,
所以可以直接用个for循环以嵌套的方式,将上一次Encoder Layer的输出作为下一次Encoder Layer的输入
class Encoder(nn.Module): def __init__(self): super(Encoder, self).__init__() self.src_emb = nn.Embedding(src_vocab_size, d_model) self.pos_emb = PositionalEncoding(d_model) self.layers = nn.ModuleList([EncoderLayer() for _ in range(n_layers)]) def forward(self, enc_inputs): ''' :param enc_inputs: [batch_size, src_len] :return: ''' enc_outputs = self.src_emb(enc_inputs) # [batch_size, src_len, d_model] enc_outputs = self.pos_emb(enc_outputs.transpose(0,1)).transpose(0,1) # [batch_size, src_len, src_len] enc_self_attn_mask = get_attn_pad_mask(enc_inputs,enc_inputs) # [batch_size, src_len, src_len] enc_self_attns = [] for layer in self.layers: # enc_outputs: [batch_size, src_len, d_model], enc_self_attn: [batch_size, n_heads, src_len, src_len] enc_outputs, enc_self_attn = layer(enc_outputs, enc_self_attn_mask) enc_self_attns.append(enc_self_attn) return enc_outputs, enc_self_attnsDecoder Layer
class DecoderLayer(nn.Module): def __init__(self): super(DecoderLayer, self).__init__() self.dec_self_attn = MultiHeadAttention() self.dec_enc_attn = MultiHeadAttention() self.pos_ffn = PoswiseFeedForwardNet() def forward(self, dec_inputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask): ''' :param dec_inputs: [batch_size, tgt_len, d_model] :param enc_outputs: [batch_size, src_len, d_model] :param dec_self_attn_mask: [batch_size, tgt_len, tgt_len] :param dec_enc_attn_mask: [batch_size, tgt_len, src_len] :return: ''' # dec_outputs: [batch_size, tgt_len, d_model], dec_self_attn: [batch_size, n_heads, tgt_len, tgt_len] dec_outputs, dec_self_attn = self.dec_self_attn(dec_inputs, dec_inputs, dec_inputs, dec_self_attn_mask) # dec_outputs: [batch_size, tgt_len, d_model], dec_enc_attn: [batch_size, h_heads, tgt_len, src_len] dec_outputs, dec_enc_attn = self.dec_enc_attn(dec_outputs, enc_outputs, enc_outputs, dec_enc_attn_mask) dec_outputs = self.pos_ffn(dec_outputs) # [batch_size, tgt_len, d_model] return dec_outputs, dec_self_attn, dec_enc_attn在 Decoder Layer 中会调用两次 MultiHeadAttention,第一次是计算 Decoder Input 的 self-attention,得到输出 dec_outputs。然后将 dec_outputs 作为生成 Q 的元素,enc_outputs 作为生成 K 和 V 的元素,再调用一次 MultiHeadAttention,得到的是 Encoder 和 Decoder Layer 之间的 context vector。最后将 dec_outptus 做一次维度变换,然后返回
Decoder
class Decoder(nn.Module): def __init__(self): super(Decoder, self).__init__() self.tgt_emb = nn.Embedding(tgt_vocab_size, d_model) self.pos_emb = PositionalEncoding(d_model) self.layers = nn.ModuleList([DecoderLayer() for _ in range(n_layers)]) def forward(self, dec_inputs, enc_inputs, enc_outputs): ''' :param dec_inputs: [batch_size, tgt_len] :param enc_inputs: [batch_size, src_len] :param enc_outputs: [batch_size, src_len, d_model] :return: ''' dec_outputs = self.tgt_emb(dec_inputs) # [batch_size, tgt_len, d_model] dec_outputs = self.pos_emb(dec_outputs.transpose(0,1)).transpose(0,1).to(device) # [batch_size, tgt_len, d_model] dec_self_attn_pad_mask = get_attn_pad_mask(dec_inputs,dec_inputs).to(device) # [batch_size, tgt_len, tgt_len] dec_self_attn_subsequence_mask = get_attn_subsequence_mask(dec_inputs).to(device) #[batch_size, tgt_len, tgt_len] # torch.gt(a,value) :将a中各个位置上的元素和value进行比较,若大于value则该位置取1,否则取0 dec_self_attn_mask = torch.gt((dec_self_attn_pad_mask+dec_self_attn_subsequence_mask),0).to(device) # [batch_size, tgt_len, tgt_len] dec_enc_attn_mask = get_attn_pad_mask(dec_inputs,enc_inputs) #[batch_size, tgt_len, src_len] dec_self_attns, dec_enc_attns = [], [] for layer in self.layers: # dec_outputs: [batch_size, tgt_len, d_model] # dec_self_attn: [batch_size, n_heads, tgt_len, tgt_len] # dec_enc_attn: [batch_size, n_heads, tgt_len, src_len] dec_outputs, dec_self_attn, dec_enc_attn = layer(dec_outputs,enc_outputs,dec_self_attn_mask,dec_enc_attn_mask) dec_self_attns.append(dec_self_attn) dec_enc_attns.append(dec_enc_attn) return dec_outputs,dec_self_attns,dec_enc_attnsDecoder 中不仅要把 "pad"mask 掉,还要 mask 未来时刻的信息,因此就有了下面这三行代码,其中 torch.gt(a, value) 的意思是,将 a 中各个位置上的元素和 value 比较,若大于 value,则该位置取 1,否则取 0。
dec_self_attn_pad_mask = get_attn_pad_mask(dec_inputs, dec_inputs) # [batch_size, tgt_len, tgt_len] dec_self_attn_subsequence_mask = get_attn_subsequence_mask(dec_inputs) # [batch_size, tgt_len, tgt_len] dec_self_attn_mask = torch.gt((dec_self_attn_pad_mask + dec_self_attn_subsequence_mask), 0) # [batch_size, tgt_len, tgt_len]Transformer
class Transformer(nn.Module): def __init__(self): super(Transformer,self).__init__() self.encoder = Encoder().to(device) self.decoder = Decoder().to(device) self.projection = nn.Linear(d_model, tgt_vocab_size, bias=False).to(device) def forward(self,enc_inputs, dec_inputs): ''' :param enc_inputs: [batch_size, src_len] :param dec_inputs: [batch_size, tgt_len] :return: ''' # enc_outputs: [batch_size, src_len, d_model], # enc_self_attns: [n_layers, batch_size, n_heads, src_len, src_len] enc_outputs,enc_self_attns = self.encoder(enc_inputs) # dec_outputs: [batch_size, tgt_len, d_model], # dec_self_attns: [n_layers, batch_size, n_heads, tgt_len, tgt_len], # dec_enc_attn: [n_layers, batch_size, n_heads,tgt_len, src_len] dec_outputs, dec_self_attns, dec_enc_attns = self.decoder(dec_inputs, enc_inputs, enc_outputs) dec_logits = self.projection(dec_outputs) return dec_logits.view(-1,dec_logits.size(-1)), enc_self_attns, dec_self_attns, dec_enc_attnsTransformer 主要就是调用 Encoder 和 Decoder。最后返回 dec_logits 的维度是 [batch_size * tgt_len, tgt_vocab_size],可以理解为,一个句子,这个句子有 batch_size*tgt_len 个单词,每个单词有 tgt_vocab_size 种情况,取概率最大者
模型&损失函数&优化器
model = Transformer().to(device)criterion = nn.CrossEntropyLoss(ignore_index=0)optimizer = optim.SGD(model.parameters(),lr=1e-3,momentum=0.99)这里的损失函数里面我设置了一个参数 ignore_index=0,因为 "pad" 这个单词的索引为 0,这样设置以后,就不会计算 "pad" 的损失(因为本来 "pad" 也没有意义,不需要计算),关于这个参数更详细的说明,可以看这篇文章的最下面,稍微提了一下.
训练
for epoch in range(30): for enc_inputs, dec_inputs, dec_outputs in loader: ''' enc_inputs: [batch_size, src_len] dec_inputs: [batch_size, tgt_len] dec_outputs: [batch_size, tgt_len] ''' enc_inputs, dec_inputs, dec_outputs = enc_inputs.to(device),dec_inputs.to(device),dec_outputs.to(device) # outputs: [batch_size * tgt_len, tgt_vocab_size] outputs, enc_self_attns, dec_self_attns, dec_enc_attns = model(enc_inputs, dec_inputs) loss = criterion(outputs, dec_outputs.view(-1)) print('Epoch:','%04d' % (epoch+1), 'loss =','{:.6f}'.format(loss)) optimizer.zero_grad() loss.backward() optimizer.step()测试
def greedy_decoder(model, enc_input, start_symbol): """ For simplicity, a Greedy Decoder is Beam search when K=1. This is necessary for inference as we don't know the target sequence input. Therefore we try to generate the target input word by word, then feed it into the transformer. Starting Reference: http://nlp.seas.harvard.edu/2018/04/03/attention.html#greedy-decoding :param model: Transformer Model :param enc_input: The encoder input :param start_symbol: The start symbol. In this example it is 'S' which corresponds to index 4 :return: The target input """ enc_outputs, enc_self_attns = model.encoder(enc_input) dec_input = torch.zeros(1, 0).type_as(enc_input.data).to(device) terminal = False next_symbol = start_symbol while not terminal: dec_input=torch.cat([dec_input.detach(),torch.tensor([[next_symbol]],dtype=enc_input.dtype,device=device)],-1) dec_outputs, _, _ = model.decoder(dec_input, enc_input, enc_outputs) projected = model.projection(dec_outputs) prob = projected.squeeze(0).max(dim=-1, keepdim=False)[1] next_word = prob.data[-1] next_symbol = next_word if next_symbol == tgt_vocab["."]: terminal = True print(next_word) return dec_input# Testenc_inputs, _, _ = next(iter(loader))enc_inputs = enc_inputs.to(device)for i in range(len(enc_inputs)): greedy_dec_input = greedy_decoder(model, enc_inputs.view(1, -1), start_symbol=tgt_vocab["S"]) predict, _, _, _ = model(enc_inputs.view(1, -1), greedy_dec_input) predict = predict.data.max(1, keepdim=True)[1] print(enc_inputs, '->', [idx2word[n.item()] for n in predict.squeeze()])
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |
本帖子中包含更多资源
您需要 登录 才可以下载或查看,没有账号?立即注册
x
免责声明
1. 本论坛所提供的信息均来自网络,本网站只提供平台服务,所有账号发表的言论与本网站无关。
2. 其他单位或个人在使用、转载或引用本文时,必须事先获得该帖子作者和本人的同意。
3. 本帖部分内容转载自其他媒体,但并不代表本人赞同其观点和对其真实性负责。
4. 如有侵权,请立即联系,本网站将及时删除相关内容。
|