
PyTorch 學習筆記: 古詩 Seq2seq with Attention

前排重要提示: 並不是標準的Seq2seq,純粹練手用, 效果一般。Pytorch下改成標準的模型用GPU運行的話慢大概10倍左右。




用Google的P-100, 全部數據算一次要7分鐘,K-80則需要半小時。每個batch大小為 256x100,需要12G的GPU顯存


Mini batch (1 step) 是 256 (古詩)x 100(字元數,不夠的用0補足),



  • 關鍵詞全部為單字, 部分抽出來的字意義不明。用jieba分詞出來的效果也不太理想,比如:「千山/ 萬山/ 如火發」千山和萬山都是虛數,應該是同類,「如 火 發」則各有含義。
  • LSTM 和 GRU 都當成黑盒用,Pytorch只能輸出最後的Hidden state 和各階段的 output。 如果改用循環來獲取各階段的hidden state, 要慢10倍左右 ><。 因此Attention 部分用GRU的output代替 (輸出的激活函數是 tanh)。


數據全為唐宋詩(json格式), 只保留逗號和句號。標題後加上<SOP>; 句尾加上<EOP>. 輸出一個大 list [[古詩1], [古詩2], [古詩3], ...]. 然後用來訓練一個 Word2vec 模型,用其字向量加Textrank找出每句2個關鍵字. 格式如下。 最後導入DataLoader

[ [[關鍵字1, 2],[古詩1]], [[關鍵字1, 2],[古詩2]], ... ]

Github 古詩?


# preprocess_data.py

patterns = [(.*), "{.*}", "《.*》", "[.*]", "<.*>", ")", "』", ":", "「.*」",
[, , , , , , /, `, , ,
, *, -, =, {, }]

def clean_sentence(sentence):
for x in patterns:
sentence = re.sub(x, , sentence)
return sentence.strip()

def split_poetry_2_list(poetry):
# one entry in a json file
# return a flatten list of words
text = poetry.get(paragraphs) # may be []
if text:
text = [clean_sentence(x.strip()) for x in text]
text = list(chain.from_iterable(text)) # flatten list of sentence
text = [<SOP>] + text
text[-1] = "<EOP>"

title = poetry.get(title)
title = "".join(title.split())
title = clean_sentence(title)
text = list(title) + text
return text

def process_data(json_file):
:param json_file:
:return: nested list of poetry
with open(json_file, rb) as f:
data = json.load(f)
poetry_text = [] # nested list
word_set = set()
for poetry in data:
text = split_poetry_2_list(poetry) # flatten list
if text:
return poetry_text, word_set


按每首古詩處理,最長為100字元,不足夠的補0。 關鍵字最多為20個,同樣不夠的用0補

# model_rnn.py

class PoetryRNNData(Dataset):
def __init__(self, data, max_topic_counts=20, max_poetry_length=100):
# when chunk size = 120, evenly divide; = 259, leave one out
# most poetries have length around 40 - 80
# data is nested list of word idx
assert any(isinstance(i, list) for i in data)

# topic words
topics = [i[0] for i in data]
self.topic_lens = torch.LongTensor([min(len(x), max_topic_counts) for x in topics])

# poetry text
data = [i[1] for i in data]
self.lengths = torch.LongTensor([min(len(x), max_poetry_length) - 1 for x in data])
self.lens = len(self.lengths)

# pad data
max_len = min(max(self.lengths), max_poetry_length)

self.topics = torch.zeros((self.lens, max_topic_counts)).long()
self.data = torch.zeros((self.lens, max_len)).long()
self.target = torch.zeros((self.lens, max_len)).long()
for i in range(self.lens):
TL = min(self.topic_lens[i], max_topic_counts)
self.topics[i, :TL] = torch.LongTensor(topics[i][:TL])

L = min(self.lengths[i], max_poetry_length)
self.data[i, :L] = torch.LongTensor(data[i][:L])
self.target[i, :L] = torch.LongTensor(data[i][1:(L + 1)])
if use_cuda:
self.topics = self.topics.cuda()
self.topic_lens = self.topic_lens.cuda()
self.data = self.data.cuda()
self.target = self.target.cuda()
self.lengths = self.lengths.cuda()

def __len__(self):
return self.lens

def __getitem__(self, index):
out = (
self.topics[index, :], self.topic_lens[index], self.data[index, :], self.target[index, :],
return out

def prepare_data_loader(data, max_topic_len, max_length, batch_size, shuffle):
dataset = PoetryRNNData(data, max_topic_counts=max_topic_len, max_poetry_length=max_length)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)
return dataloader

Pytorch 的 RNN都接受不同長度的數據, 處理順序如下

pack sequence->recurrent network->unpack sequence

將古詩組合成矩陣(用0補足長度),記錄每行實際長度, 輸入RNN,分解矩陣。

# model_rnn.py

def sort_batches(batches):
topics, topics_len, x, y, lengths = batches
# topic key words
sorted_topic_lens, sorted_topic_idx = topics_len.sort(dim=0, descending=True)
sorted_topics = topics.index_select(dim=0, index=sorted_topic_idx.squeeze())
sorted_topics = sorted_topics.contiguous()
pad_topic_lens = sorted_topic_lens.squeeze()

# poetry text
sorted_lens, sorted_idx = lengths.sort(dim=0, descending=True)

sorted_x = x.index_select(dim=0, index=sorted_idx.squeeze()) # x[sorted_idx, :]
sorted_y = y.index_select(dim=0, index=sorted_idx.squeeze()) # y[sorted_idx, :]

pad_len = sorted_lens.squeeze()
unpad_y = [sorted_y[i, :pad_len[i]] for i in range(len(pad_len))]
unpad_y = torch.cat(unpad_y)

sorted_x = sorted_x.contiguous()
unpad_y = unpad_y.contiguous()

out = sorted_topics, pad_topic_lens, sorted_x, unpad_y, pad_len
return out


用Pytorch 寫了 skip-gram 和 negative sampling,用了2個word embedding。 理論上是可以用2個 全鏈接層(Dense Layer), 未測試過速度,但估計會更慢: 1) embedding 層是直接選取字的向量,2)torch.bmm有優化

Google的 K-80下全部數據運行一次要約11小時, 只用CPU的話要超過24小時

# model_negative_sampling.py

class SkipGramNegaSampling(nn.Module):
def __init__(self, vocab_size, embed_dim):
super(SkipGramNegaSampling, self).__init__()
self.embed_hidden = nn.Embedding(vocab_size, embed_dim, sparse=True)
self.embed_output = nn.Embedding(vocab_size, embed_dim, sparse=True)
self.log_sigmoid = nn.LogSigmoid()

def forward(self, input_batch, negative_batch):
# input_batch (N x 2) [x, y]
# negative_batch (N x k)
x, y = input_batch
embed_hidden = self.embed_hidden(x) # N x 1 x D
embed_target = self.embed_output(y) # N x 1 x D
embed_neg = -self.embed_output(negative_batch) # N x k x D
positive_score = embed_target.bmm(embed_hidden.transpose(1, 2)).squeeze(2) # N x 1
negative_score = embed_neg.bmm(embed_hidden.transpose(1, 2)).squeeze(2).sum(dim=1, keepdim=True) # N x 1

loss = self.log_sigmoid(positive_score) + self.log_sigmoid(negative_score)
return -torch.mean(loss)

實際上是換成 Gensim的Word2vec模塊出來的結果 。 2332

# word2vec_gensim.py

word2vec_params = {
sg: 1, # 0 : CBOW; 1 : skip-gram
"size": 300,
"alpha": 0.01,
"min_alpha": 0.0005,
window: 10,
min_count: 1,
seed: 1,
"workers": 6,
"negative": 0,
"hs": 1, # 0: negative sampling, 1:Hierarchical softmax
compute_loss: True,
iter: 50,
cbow_mean: 0,
with open(./data/poetry.json, rb) as f:
sentences = json.load(f)
model = Word2Vec(**word2vec_params)
trained_word_count, raw_word_count = model.train(sentences, compute_loss=True,

Text Rank

model 來自上面的 gensim.Word2vec。按每首古詩, 每次處理一小句。權重換成了 Word2vec 的 cosine similarity

# add_keywords_poetry.py

def create_word_similar_board(sentence, model):
# sentence: list of words
# return: a matrix of words similarity
num_words = len(sentence)
weight_board = np.zeros((num_words, num_words))
for i in range(num_words):
for j in range(num_words):
if j != i:
weight_board[i][j] = compute_similarity(sentence[i], sentence[j], model)
# else 0
return weight_board

def word_score_diff(score_old, score_new):
min_diff = 1e-3
diff = np.abs(score_old - score_new)
if np.any(diff > min_diff): # if anyone is larger than min diff, continue
return True
return False

def compute_word_scores(weight_board, score_old):
d = 0.85
score_new = np.zeros_like(score_old)
for i in range(len(score_old)):
upper = weight_board[i] * score_old
lower = np.sum(weight_board, axis=0) # weight_board[i] = 0, so the ith nominator is 0
posi_idx = np.where(lower > 0) # avoid denominator = 0
lower = lower[posi_idx]
upper = upper[posi_idx]
score_new[i] = (1 - d) + d * np.sum(upper / lower)

return score_new

def word_rank(sentence, model, max_iter=100):
score_old = np.zeros(len(sentence))
score_new = np.ones_like(score_old)
weight_board = create_word_similar_board(sentence, model)

counter = 0
while word_score_diff(score_old, score_new) and counter < max_iter:
score_old = score_new
score_new = compute_word_scores(weight_board, score_old)
counter += 1
return score_new

def get_n_highest_rank(sentence, n, model, with_value=False):
set_sentence = list(set(sentence))
score = word_rank(set_sentence, model)
word_score = [[i, j] for i, j in zip(set_sentence, score)]
word_score.sort(key=itemgetter(1), reverse=True)
if not with_value:
word_score = [i[0] for i in word_score]
return word_score[:min(len(sentence), n)]


class PoetryRNN(nn.Module):
def __init__(self, encoder_embed, encoder_topic_len,
decoder_embed, rnn_hidden_dim, rnn_layers, rnn_bidre, rnn_dropout,
dense_dim, dense_h_dropout,
super(PoetryRNN, self).__init__()

self.encoder = PoetryEncoder(encoder_embed,
rnn_hidden_dim, rnn_layers,
self.attention = KeywordAttention(encoder_topic_len, rnn_hidden_dim, encoder_embed.size(1))
self.decoder = PoetryDecoder(decoder_embed,
rnn_hidden_dim, rnn_layers, rnn_bidre, rnn_dropout,
dense_dim, dense_h_dropout,

def forward(self, batch_topic, topic_lens, batch_input, sorted_lens):
encoded_output, encoded_hidden = self.encoder.forward(batch_topic, topic_lens)
target_score, hidden = self.decoder.forward(batch_input, sorted_lens, encoded_output, encoded_hidden,
return target_score, hidden

GRU 編碼器 (Encoder)

# model_rnn.py

class PoetryEncoder(nn.Module):
def __init__(self, encoder_embed,
rnn_hidden_dim, rnn_layers,
super(PoetryEncoder, self).__init__()
self.embed = nn.Embedding.from_pretrained(encoder_embed, freeze=freeze_embed)
self.vocab_dim, self.embed_hidden = encoder_embed.size()

# GRU: the output looks similar to hidden state of standard RNN
self.rnn_hidden_dim = rnn_hidden_dim // 2
self.rnn_layers = rnn_layers
self.rnn = nn.GRU(self.embed_hidden, self.rnn_hidden_dim, batch_first=True,
num_layers=self.rnn_layers, bidirectional=True)

# attention

def forward(self, batch_input, sorted_lens):
# batch_input: batch, seq_len -
# sorted_lens: batch,
# embed output: batch, seq_len,embed_dim
# pack_pad_seq input: batch, Seq_len, *
# rnn input : batch, seq_len,input_size; output: seq_len, batch, rnn_hidden_dim
# pad_pack_seq output: seq_len, batch, *

word_vec = self.embed(batch_input)
word_vec = pack_padded_sequence(word_vec, sorted_lens.tolist(), batch_first=True)
rnn_out, hidden = self.rnn(word_vec) # hidden : layer*direction, batch, hidden dim
rnn_out, _ = pad_packed_sequence(rnn_out, batch_first=True, total_length=batch_input.size(1))
hidden = self.reshape_bidirec_hidden(hidden)
return rnn_out, hidden

def merge_bidirec_hidden(self, hidden_state):
# in bi-directions layers, each layer contains 2 directions of hidden states, so
# take their average for each layer
h = hidden_state
h = torch.cat(list(((h[i] + h[i + 1]) / 2).unsqueeze(0) for i in range(0, self.rnn_layers * 2, 2)))
# c = torch.cat(list(((c[i] + c[i + 1])/2).unsqueeze(0) for i in range(0, self.rnn_layers * 2, 2)))
return (h, h)

def reshape_bidirec_hidden(self, hidden_state):
h = hidden_state
num_layers, batch, hidden_size = h.size()
h = h.reshape(num_layers // 2, batch, -1)

# c = torch.zeros_like(h)
return (h, h)


# model_rnn.py

class KeywordAttention(nn.Module):
def __init__(self, encoder_topic_len, encoder_hidden_dim, decoder_embed_hidden, attention_dropout=0.1):
super(KeywordAttention, self).__init__()
self.atten_weights = nn.Linear(encoder_hidden_dim, decoder_embed_hidden)
self.softmax = nn.Softmax(dim=2)
self.context_out = nn.Linear(encoder_topic_len + decoder_embed_hidden, decoder_embed_hidden)
self.dropout = nn.Dropout(attention_dropout)
self.activation_out = nn.SELU()

def forward(self, decoder_input, encoder_output):
# decoder_input: batch, seq_len, embedding_hidden
# rnn_output: batch, seq_len, rnn_hidden
# encoder_output: batch, topic_len, rnn_hidden
# context_state = encoder_hidden[0].t() # --> batch, num_layer, hidden_dim
context_state = self.dropout(encoder_output)
attention = self.atten_weights(context_state).transpose(1, 2) # --> batch, decoder_embed_hidden, topic_len

attention_w = decoder_input.bmm(attention) # batch, seq_len, topic_len
attention = self.softmax(attention_w)

context_concat = torch.cat([decoder_input, attention], dim=2) # batch, seq_len, topic_len+embed_hidden
out = self.context_out(context_concat) # batch, seq_len, embed_hidden
out = self.activation_out(out)
return out

LSTM 解碼器 (Decoder)

# model_rnn.py
class PoetryDecoder(nn.Module):
def __init__(self, decoder_embed,
rnn_hidden_dim, rnn_layers, rnn_bidre, rnn_dropout,
dense_dim, dense_h_dropout,
super(PoetryDecoder, self).__init__()

# pre-trained word embedding
self.embed = nn.Embedding.from_pretrained(decoder_embed, freeze=freeze_embed)
self.vocab_dim, self.embed_hidden = decoder_embed.size()

self.rnn_hidden_dim = rnn_hidden_dim // 2 if rnn_bidre else rnn_hidden_dim
self.rnn_layers = rnn_layers
self.bi_direc = 2 if rnn_bidre else 1
self.rnn = nn.LSTM(self.embed_hidden, self.rnn_hidden_dim, batch_first=True,
num_layers=rnn_layers, bidirectional=rnn_bidre, dropout=rnn_dropout)

# self.init_rnn_xavier_normal()

# dense hidden layers
self.dense_h_dropout = dense_h_dropout
self.dense_h0 = self.dense_layer(rnn_hidden_dim, dense_dim, nn.SELU(), dropout=True)

# output layer
self.output_linear = nn.Linear(dense_dim, self.vocab_dim)
self.log_softmax = nn.LogSoftmax(dim=1)

def forward_(self, batch_input, sorted_lens, encoder_output, rnn_hidden, attention):
# batch_input: batch, seq_len -
# sorted_lens: batch,
# encoder_output: batch, topic_len, hidden_dim
# rnn_hidden: (h,c), h: num_layers, batch, hidden_dim (concat)
# embed output: batch, seq_len,embed_dim
# pack_pad_seq input: batch, Seq_len, *
# rnn input : batch, seq_len,input_size; output: seq_len, batch, rnn_hidden_dim
# pad_pack_seq output: seq_len, batch, *

word_vec = self.embed(batch_input)
# mask out zeros for padded 0s
word_vec = self.mask_zeros(word_vec, sorted_lens)
# attention
word_vec = attention.forward(word_vec, encoder_output)

word_vec = pack_padded_sequence(word_vec, sorted_lens.tolist(), batch_first=True)
rnn_out, hidden = self.rnn(word_vec, rnn_hidden) # hidden : layer*direction, batch, hidden dim
rnn_out, _ = pad_packed_sequence(rnn_out, batch_first=True, total_length=batch_input.size(1))

# attention

# dense layers
unpad = [rnn_out[i, :sorted_lens[i], :] for i in range(len(sorted_lens))]
decoded = [self.forward_dense(x) for x in unpad]

# final output
return decoded, hidden

def forward(self, batch_input, sorted_lens, encoder_output, rnn_hidden, attention):
decoded, hidden = self.forward_(batch_input, sorted_lens, encoder_output, rnn_hidden, attention)
target_score = [self.log_softmax(x) for x in decoded]
target_score = torch.cat(target_score) # batch*seq_len, vocab_size
return target_score, hidden

def forward_dense(self, rnn_out):
# hidden layers
dense_h0 = self.dense_h0(rnn_out)
decoded = self.output_linear(dense_h0)
return decoded

def dense_layer(self, input_dim, output_dim, activation, dropout=True):
dense = nn.Sequential(
nn.Linear(input_dim, output_dim),
if dropout:
dense.add_module("Dropout", nn.Dropout(self.dense_h_dropout))
return dense

def mask_zeros(word_vec, sorted_lens):
# word_vec: batch, seq_len, embed_dim
# Each example has different lengths, but the padded 0s have value after the embedding layer, so mask them 0

for i in range(len(sorted_lens)):
if sorted_lens[i] < word_vec[i].size(0):
word_vec[i, sorted_lens[i]:, :] = 0
return word_vec

def init_rnn_xavier_normal(self):
for name, weights in self.rnn.named_parameters():
weights = weights.view(-1, 1)

def predict_softmax_score(self,batch_input, sorted_lens, encoder_output, rnn_hidden, attention):
assert not self.training
decoded, hidden = self.forward_(batch_input, sorted_lens, encoder_output, rnn_hidden, attention)
target_score = [F.softmax(x, dim=1) for x in decoded]
target_score = torch.cat(target_score) # batch*seq_len, vocab_size
return target_score, hidden

古詩生成: Beam Search

生成古詩時, 將選取的第一個字當成下一個字的輸入數據。如果只根據softmax的值來選往後的字,得到的結果未必是最好的。更好的方法(?) 是按softmax選取前N個字, 然後用它們各生成下一步的N個字,得到N*N個字,再按softmax選前N個。


# generate.py

def beam_search_forward(model, cache, encoder_output, beam_size, skip_words=non_words,
caches = []
scores = []
score_idx = []
hiddens = []

sorted_lens = torch.LongTensor([1])
for score, init, hidden in cache:
# target_score, hidden_state = model.predict_softmax_score(init[-1], hidden)
target_score, hidden_state = model.decoder.predict_softmax_score(init[-1], sorted_lens,
encoder_output, hidden,

best_score, index = torch.sort(target_score.squeeze(), dim=0, descending=True)

if remove_punctuation:
best_score = best_score[:beam_size * 2]
index = index[:beam_size * 2]

s = []
IDS = []
for i, idx in enumerate(index):
if idx.item() not in skip_words:

best_score = torch.FloatTensor(s[:beam_size])
index = torch.LongTensor(IDS[:beam_size])
best_score = best_score[:beam_size]
index = index[:beam_size]

scores += list(best_score.data.numpy())
score_idx += list(index.data.numpy())

topN, indexs = torch.sort(torch.FloatTensor(scores), dim=0, descending=True)
top_scores = topN[:beam_size]
chosen_word = indexs[:beam_size]
chosen_word = [torch.LongTensor([i]).view(1, 1) for i in chosen_word]
for i, (score, init, hidden) in enumerate(cache):
caches.append([top_scores[i] + score, init + [chosen_word[i]], hiddens[i]])
return caches

def beam_search(model, init_word, encoder_output, rnn_hidden,
beam_size=3, text_length=100, remove_punctuation=True):
sorted_lens = torch.LongTensor([init_word.size(1)])
target_score, hidden_state = model.decoder.predict_softmax_score(init_word, sorted_lens,
encoder_output, rnn_hidden,
target_score = torch.sum(target_score, dim=0).squeeze()
sorted_score, index = torch.sort(target_score, descending=True)

cache = [[sorted_score[i],
[torch.LongTensor([index[i]]).view(1, 1)],
hidden_state] for i in range(beam_size)]

for i in range(text_length):
cache = beam_search_forward(model, cache, encoder_output, beam_size, remove_punctuation=remove_punctuation)
scores = [cache[i][0] for i in range(beam_size)]
max_id = scores.index(max(scores))
text = [i.item() for i in cache[max_id][1]]
return text, cache[max_id][2] # text, last_hidden_state


def generate_my_poetry(model, title, title_key_word, text_key_words, sentence_len=7, beam_search_size=7,
assert not model.training

topic_lengths = model.attention.context_out.in_features - model.attention.context_out.out_features
iter_punctuation = cycle([comma, period])

# topic --> encoder
key_word_flatted = title_key_word + list(chain.from_iterable(text_key_words))
key_word_flatted += [0] * (topic_lengths - len(key_word_flatted))
topics = torch.LongTensor(key_word_flatted).view(1, -1)
topic_lens = torch.LongTensor([len(key_word_flatted)])
encoder_output, encoder_hidden = model.encoder.forward(topics, topic_lens)

# title
title_words = title + [title_end]
generated_text = title_words
sorted_lens = torch.LongTensor([len(title_words)])
title_words = torch.LongTensor(title_words).view(1, -1)
_, rnn_hidden = model.decoder.forward(title_words, sorted_lens,
encoder_output, encoder_hidden,

# main text
for words in text_key_words:

text_len = sentence_len - len(words) - 1 # exclude punctuation
init_word = torch.LongTensor(words).view(1, -1)
text, rnn_hidden = beam_search(model, init_word, encoder_output, rnn_hidden,
beam_size=beam_search_size, text_length=text_len,
punctuation = [next(iter_punctuation)]
generated_text += words + text + punctuation

return generated_text


