Sequence to Sequence 模型(Seq2Seq Models)

Photo by Léonard Cotte on Unsplash
Photo by Léonard Cotte on Unsplash
Sequence to Sequence(Seq2Seq)模型一個將序列(sequence)映射至另一個序列的 neural network 模型。它徹底改變了自然語言處理(NLP)領域,使得翻譯、文本摘要和聊天機器人等任務的效果大幅提升。本篇文章將深入探討 Seq2Seq 模型的原理。

Sequence to Sequence(Seq2Seq)模型一個將 sequence 映射至另一個 sequence 的 neural network 模型。它徹底改變了自然語言處理(NLP)領域,使得翻譯、文本摘要和聊天機器人等任務的效果大幅提升。本篇文章將深入探討 Seq2Seq 模型的原理。

完整程式碼可以在 下載。

Seq2Seq2 模型

Seq2Seq 模型本質上是一種 neural networks,專門用於將輸入 sequence 轉換為輸出 sequence。例如,將英文句子轉換為法文翻譯。而且,輸入 sequence 的長度與輸出 sequence 的長度可以不相同,也就是 RNN 的 many-to-many 類型。

Seq2Seq 模型的架構主要包含兩個部分:

  • Encoder:處理輸入 sequence,其最後的 hidden state,稱為 encoder state。我們可以將這個 encoder state 想成是 encoder 將輸入 sequence 的資訊壓縮成一個向量。因此,此 encoder state 也稱為上下文向量(context vector)或思想向量(thought vector)。
  • Decoder:利用 context vector 產生目標 sequence。

如下面的式子顯示,encoder 是用來計算 context vector v,而 decoder 是計算 conditional probability p(y_1,\cdots,y_{T^\prime}|x_1,\cdots,x_{T})

p(y_1,\cdots,y_{T^\prime}|x_1,\cdots,x_{T})=\displaystyle\prod_{t=1}^{T^\prime}p(y_t|v,y_1,\cdots,y_{t-1}) \\\\ (x_1,\cdots,x_{T}):\text{input sequence} \\\\ (y_1,\cdots,y_{T^\prime}):\text{output sequence} \\\\ T\text{ may differ from }T^\prime \\\\ v:\text{context vector}

下圖顯示 encoder 與 decoder 間的工作流程。Encoder 與 decoder 裡包含 single-layer 或 multi-layers 的 LSTMs。我們可以用一般的 RNN 替代圖中的 LSTM,但是這會有梯度消失(vanishing gradients)的問題,因此改用 GRU 或 LSTM 可以改善這個問題。

此外,在 sequence 的開頭與結尾會有額外的 <SOS><EOS> tokens,用來標示 sequence 的開始與結束。

Seq2Seq Model - Encoder and Deconder.
Seq2Seq Model – Encoder and Deconder.

實作

以下是 encoder 的實作。我們已經知道 encoder 裡主要是一個 single-layer 或 multi-layers 的 RNN。在這邊,我們使用 LSTM。此外,我們還需要一個 Embedding 來將 tokens 轉換成 word embeddings。Encoder 的 input_dim 是輸入 sequence 的 vocabulary size。

class Encoder(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim, num_layers=1):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(input_dim, embedding_dim, padding_idx=PAD_INDEX)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=num_layers, batch_first=True)
    def forward(self, input):
        """
        Args:
            input: (batch_size, seq_len)
        Returns
            output: (batch_size, seq_len, hidden_dim)
            hidden: (num_layers, batch_size, hidden_dim)
            cell: (num_layers, batch_size, hidden_dim)
        """
        embedding = self.embedding(input)
        output, (hidden, cell) = self.lstm(embedding)
        return output, hidden, cell

以下是 decoder 的實作。Decoder 的 output_dim 是輸出 sequence 的 vocabulary size如同 encoder,decoder 也有一個 Embedding 和 LSTM。此外,它還需要一個 full connected layer 將 LSTM 的輸出的維度從 hidden_dim 轉換成 output_dim

class Decoder(nn.Module):
    def __init__(self, output_dim, embedding_dim, hidden_dim, num_layers=1):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(output_dim, embedding_dim, padding_idx=PAD_INDEX)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=num_layers, batch_first=True)
        self.fc_out = nn.Linear(hidden_dim, output_dim)
    def forward(self, input, hidden, cell):
        """
        Args
            input: (batch_size,)
            hidden: (num_layers, batch_size, hidden_dim)
            cell: (num_layers, batch_size, hidden_dim)
        """
        embedding = self.embedding(input)
        output, (hidden, cell) = self.lstm(embedding, (hidden, cell))
        prediction = self.fc_out(output.squeeze(1))
        return prediction, hidden, cell

以下是 Seq2Seq 模型的實作。它在 forward pass 中,先將輸入 sequence 傳入 encoder,得到 hidden 和 cell。這兩個值就是所謂的 context vector 或 thought vector。之後,對 decoder 就會開始預測輸出 sequence。

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
    def forward(self, src, tgt, teacher_forcing_ratio=0.5):
        """
        Args:
            src: (batch_size, src_len)
            tgt: (batch_size, tgt_len)
            teacher_forcing_ratio: float - probability to use teacher forcing
        """
        batch_size = src.shape[0]
        tgt_len = tgt.shape[1]
        tgt_vocab_size = self.decoder.fc_out.out_features
        outputs = torch.zeros(batch_size, tgt_len, tgt_vocab_size)
        _, hidden, cell = self.encoder(src)
        input = tgt[:, 0]
        for t in range(1, tgt_len):
            input = input.unsqueeze(1)
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[:, t, :] = output
            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            input = tgt[:, t] if teacher_force else output.argmax(1)
        return outputs

以下是 training 的實作。對每一個 training data 的輸入與輸出 sequences,我們要將每個字轉為對應的 index,而且每個句子的前後要加上 <SOS> <EOS>。此外,我們還在句子後面加上 <PAD>,使得所有的輸入 sequence 長度一樣,所有的輸出 sequence 長度也一樣。

SOS_TOKEN = "<sos>"
EOS_TOKEN = "<eos>"
PAD_TOKEN = "<pad>"
SOS_INDEX = 0
EOS_INDEX = 1
PAD_INDEX = 2
english_sentences = [
    "hello world",
    "good morning",
    "i love you",
    "cat",
    "dog",
    "go home",
]
spanish_sentences = [
    "hola mundo",
    "buenos dias",
    "te amo",
    "gato",
    "perro",
    "ve a casa",
]
def build_vocab(sentences):
    vocab = list(set([word for sentence in sentences for word in sentence.split(" ")]))
    vocab = [SOS_TOKEN, EOS_TOKEN, PAD_TOKEN] + vocab
    tkn2idx = {tkn: i for i, tkn in enumerate(vocab)}
    idx2tkn = {i: tkn for tkn, i in tkn2idx.items()}
    return tkn2idx, idx2tkn
def convert_sentences_to_idx(sentences, tkn2idx):
    sentences_idx = [[tkn2idx[tkn] for tkn in sentence.split(" ")] for sentence in sentences]
    for sentence_idx in sentences_idx:
        sentence_idx.insert(0, tkn2idx[SOS_TOKEN])
        sentence_idx.append(tkn2idx[EOS_TOKEN])
    return sentences_idx
src_tkn2idx, src_idx2tkn = build_vocab(english_sentences)
tgt_tkn2idx, tgt_idx2tkn = build_vocab(spanish_sentences)
src_data = convert_sentences_to_idx(english_sentences, src_tkn2idx)
tgt_data = convert_sentences_to_idx(spanish_sentences, tgt_tkn2idx)
max_src_len = max([len(sentence) for sentence in src_data])
max_tgt_len = max([len(sentence) for sentence in tgt_data])
pair = []
for src, tgt in zip(src_data, tgt_data):
    src += [src_tkn2idx[PAD_TOKEN]] * (max_src_len - len(src))
    src_tensor = torch.tensor(src, dtype=torch.long)
    tgt += [tgt_tkn2idx[PAD_TOKEN]] * (max_tgt_len - len(tgt))
    tgt_tensor = torch.tensor(tgt, dtype=torch.long)
    pair.append((src_tensor, tgt_tensor))
EMBEDDING_DIM = 16
HIDDEN_DIM = 32
NUM_LAYERS = 4
LEARNING_RATE = 0.01
EPOCHS = 50
encoder = Encoder(len(src_idx2tkn), EMBEDDING_DIM, HIDDEN_DIM, NUM_LAYERS)
decoder = Decoder(len(tgt_idx2tkn), EMBEDDING_DIM, HIDDEN_DIM, NUM_LAYERS)
model = Seq2Seq(encoder, decoder)
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
criterion = nn.CrossEntropyLoss(ignore_index=PAD_INDEX)
def train():
    model.train()
    for epoch in range(EPOCHS):
        total_loss = 0
        for src_tensor, tgt_tensor in pair:
            src_tensor = src_tensor.unsqueeze(0)
            tgt_tensor = tgt_tensor.unsqueeze(0)
            optimizer.zero_grad()
            output = model(src_tensor, tgt_tensor, teacher_forcing_ratio=0.5)
            output_dim = output.shape[-1]
            output = output[:, 1:, :].reshape(-1, output_dim)
            tgt = tgt_tensor[:, 1:].reshape(-1)
            loss = criterion(output, tgt)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        if epoch % 10 == 0:
            print(f"epoch {epoch}, loss {total_loss / len(pair)}")

訓練好了之後,我們開始使用模型來將英文翻譯成西班牙文。以下程式碼,我們用 Beam Search 來生成翻譯。

def translate_beam_search(sentence, beam_width=3, max_length=10):
    model.eval()
    src_idx = convert_sentences_to_idx([sentence], src_tkn2idx)[0]
    src_idx += [src_tkn2idx[PAD_TOKEN]] * (max_src_len - len(src_idx))
    src_tensor = torch.tensor(src_idx, dtype=torch.long)
    with torch.no_grad():
        _, hidden, cell = model.encoder(src_tensor)
    beam = [([SOS_INDEX], hidden, cell, 0.0)]
    completed_sentences = []
    for _ in range(max_length):
        new_beam = []
        for tokens, hidden, cell, score in beam:
            if tokens[-1] == EOS_INDEX:
                completed_sentences.append((tokens, score))
                new_beam.append((tokens, hidden, cell, score))
                continue
            input_index = torch.tensor([tokens[-1]], dtype=torch.long)
            with torch.no_grad():
                output, hidden, cell = model.decoder(input_index, hidden, cell)
                log_probs = torch.log_softmax(output, dim=1).squeeze(0)
            topk = torch.topk(log_probs, beam_width)
            for tkn_idx, tkn_score in zip(topk.indices.tolist(), topk.values.tolist()):
                new_tokens = tokens + [tkn_idx]
                new_score = score + tkn_score
                new_beam.append((new_tokens, hidden, cell, new_score))
        new_beam.sort(key=lambda x: x[3], reverse=True)
        beam = new_beam[:beam_width]
    for tokens, hidden, cell, score in beam:
        if tokens[-1] != EOS_INDEX:
            completed_sentences.append((tokens, score))
    completed_sentences.sort(key=lambda x: x[1], reverse=True)
    best_tokens, best_score = completed_sentences[0]
    if best_tokens[0] == SOS_INDEX:
        best_tokens = best_tokens[1:]
    if EOS_INDEX in best_tokens:
        best_tokens = best_tokens[:best_tokens.index(EOS_INDEX)]
    return " ".join([tgt_idx2tkn[idx] for idx in best_tokens])
def test():
    test_sentences = [
        "hello world",
        "i love you",
        "cat",
        "go home",
    ]
    for sentence in test_sentences:
        translation = translate_beam_search(sentence)
        print(f"src: {sentence}, tgt: {translation}")

結語

現在相信你應該對於 Seq2Seq 模型有相當的了解。儘管 Seq2Seq 模型的效果不錯,但仍面臨處理長序列、有效維護上下文,以及計算效率等挑戰。後來的 Attention models 與 Transformer Networks 等新的技術被發明來解決這些問題。

參考

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *

You May Also Like