PyTorch 学习笔记: 古诗 Seq2seq with Attention
前排重要提示: 并不是标准的Seq2seq,纯粹练手用, 效果一般。Pytorch下改成标准的模型用GPU运行的话慢大概10倍左右。
样板
柚子水
夏空日月明山色,
彼方美人不可为。
天神上下不相知,
乱漫山中有水流。
魔女人间不可寻,
夜宴深溪上清风。
千恋里中无一事,
万花顷刻一枝新。
模型
用Google的P-100, 全部数据算一次要7分钟,K-80则需要半小时。每个batch大小为 256x100,需要12G的GPU显存
Loss
Mini batch (1 step) 是 256 (古诗)x 100(字元数,不够的用0补足),
11
缺陷
- 关键词全部为单字, 部分抽出来的字意义不明。用jieba分词出来的效果也不太理想,比如:「千山/ 万山/ 如火发」千山和万山都是虚数,应该是同类,「如 火 发」则各有含义。
- LSTM 和 GRU 都当成黑盒用,Pytorch只能输出最后的Hidden state 和各阶段的 output。 如果改用循环来获取各阶段的hidden state, 要慢10倍左右 ><。 因此Attention 部分用GRU的output代替 (输出的激活函数是 tanh)。
数据处理
数据全为唐宋诗(json格式), 只保留逗号和句号。标题后加上<SOP>; 句尾加上<EOP>. 输出一个大 list [[古诗1], [古诗2], [古诗3], ...]. 然后用来训练一个 Word2vec 模型,用其字向量加Textrank找出每句2个关键字. 格式如下。 最后导入DataLoader
[ [[关键字1, 2],[古诗1]], [[关键字1, 2],[古诗2]], ... ]
# preprocess_data.py
patterns = [(.*), "{.*}", "《.*》", "[.*]", "<.*>", ")", "』", ":", "「.*」",
[, 」, ;, 》, (, ), /, `, 、, :,
《, *, -, =, {, }]
def clean_sentence(sentence):
for x in patterns:
sentence = re.sub(x, , sentence)
return sentence.strip()
def split_poetry_2_list(poetry):
# one entry in a json file
# return a flatten list of words
text = poetry.get(paragraphs) # may be []
if text:
text = [clean_sentence(x.strip()) for x in text]
text = list(chain.from_iterable(text)) # flatten list of sentence
text = [<SOP>] + text
text[-1] = "<EOP>"
title = poetry.get(title)
title = "".join(title.split())
title = clean_sentence(title)
text = list(title) + text
return text
def process_data(json_file):
"""
:param json_file:
:return: nested list of poetry
"""
with open(json_file, rb) as f:
data = json.load(f)
poetry_text = [] # nested list
word_set = set()
for poetry in data:
text = split_poetry_2_list(poetry) # flatten list
if text:
word_set.update(text)
poetry_text.append(text)
return poetry_text, word_set
批量化
按每首古诗处理,最长为100字元,不足够的补0。 关键字最多为20个,同样不够的用0补
# model_rnn.py
class PoetryRNNData(Dataset):
def __init__(self, data, max_topic_counts=20, max_poetry_length=100):
# when chunk size = 120, evenly divide; = 259, leave one out
# most poetries have length around 40 - 80
# data is nested list of word idx
assert any(isinstance(i, list) for i in data)
# topic words
topics = [i[0] for i in data]
self.topic_lens = torch.LongTensor([min(len(x), max_topic_counts) for x in topics])
# poetry text
data = [i[1] for i in data]
self.lengths = torch.LongTensor([min(len(x), max_poetry_length) - 1 for x in data])
self.lens = len(self.lengths)
# pad data
max_len = min(max(self.lengths), max_poetry_length)
self.topics = torch.zeros((self.lens, max_topic_counts)).long()
self.data = torch.zeros((self.lens, max_len)).long()
self.target = torch.zeros((self.lens, max_len)).long()
for i in range(self.lens):
TL = min(self.topic_lens[i], max_topic_counts)
self.topics[i, :TL] = torch.LongTensor(topics[i][:TL])
L = min(self.lengths[i], max_poetry_length)
self.data[i, :L] = torch.LongTensor(data[i][:L])
self.target[i, :L] = torch.LongTensor(data[i][1:(L + 1)])
if use_cuda:
self.topics = self.topics.cuda()
self.topic_lens = self.topic_lens.cuda()
self.data = self.data.cuda()
self.target = self.target.cuda()
self.lengths = self.lengths.cuda()
def __len__(self):
return self.lens
def __getitem__(self, index):
out = (
self.topics[index, :], self.topic_lens[index], self.data[index, :], self.target[index, :],
self.lengths[index])
return out
def prepare_data_loader(data, max_topic_len, max_length, batch_size, shuffle):
dataset = PoetryRNNData(data, max_topic_counts=max_topic_len, max_poetry_length=max_length)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)
return dataloader
Pytorch 的 RNN都接受不同长度的数据, 处理顺序如下
将古诗组合成矩阵(用0补足长度),记录每行实际长度, 输入RNN,分解矩阵。
pack sequence->recurrent network->unpack sequence
Frequently Asked Questions
但要按长度排,每个batch输入前要按长度整理。
# model_rnn.py
def sort_batches(batches):
topics, topics_len, x, y, lengths = batches
# topic key words
sorted_topic_lens, sorted_topic_idx = topics_len.sort(dim=0, descending=True)
sorted_topics = topics.index_select(dim=0, index=sorted_topic_idx.squeeze())
sorted_topics = sorted_topics.contiguous()
pad_topic_lens = sorted_topic_lens.squeeze()
# poetry text
sorted_lens, sorted_idx = lengths.sort(dim=0, descending=True)
sorted_x = x.index_select(dim=0, index=sorted_idx.squeeze()) # x[sorted_idx, :]
sorted_y = y.index_select(dim=0, index=sorted_idx.squeeze()) # y[sorted_idx, :]
pad_len = sorted_lens.squeeze()
unpad_y = [sorted_y[i, :pad_len[i]] for i in range(len(pad_len))]
unpad_y = torch.cat(unpad_y)
sorted_x = sorted_x.contiguous()
unpad_y = unpad_y.contiguous()
out = sorted_topics, pad_topic_lens, sorted_x, unpad_y, pad_len
return out
Word2Vec
用Pytorch 写了 skip-gram 和 negative sampling,用了2个word embedding。 理论上是可以用2个 全链接层(Dense Layer), 未测试过速度,但估计会更慢: 1) embedding 层是直接选取字的向量,2)torch.bmm有优化
Google的 K-80下全部数据运行一次要约11小时, 只用CPU的话要超过24小时
# model_negative_sampling.py
class SkipGramNegaSampling(nn.Module):
def __init__(self, vocab_size, embed_dim):
super(SkipGramNegaSampling, self).__init__()
self.embed_hidden = nn.Embedding(vocab_size, embed_dim, sparse=True)
self.embed_output = nn.Embedding(vocab_size, embed_dim, sparse=True)
self.log_sigmoid = nn.LogSigmoid()
def forward(self, input_batch, negative_batch):
# input_batch (N x 2) [x, y]
# negative_batch (N x k)
x, y = input_batch
embed_hidden = self.embed_hidden(x) # N x 1 x D
embed_target = self.embed_output(y) # N x 1 x D
embed_neg = -self.embed_output(negative_batch) # N x k x D
positive_score = embed_target.bmm(embed_hidden.transpose(1, 2)).squeeze(2) # N x 1
negative_score = embed_neg.bmm(embed_hidden.transpose(1, 2)).squeeze(2).sum(dim=1, keepdim=True) # N x 1
loss = self.log_sigmoid(positive_score) + self.log_sigmoid(negative_score)
return -torch.mean(loss)
实际上是换成 Gensim的Word2vec模块出来的结果 。 2332
# word2vec_gensim.py
word2vec_params = {
sg: 1, # 0 : CBOW; 1 : skip-gram
"size": 300,
"alpha": 0.01,
"min_alpha": 0.0005,
window: 10,
min_count: 1,
seed: 1,
"workers": 6,
"negative": 0,
"hs": 1, # 0: negative sampling, 1:Hierarchical softmax
compute_loss: True,
iter: 50,
cbow_mean: 0,
}
with open(./data/poetry.json, rb) as f:
sentences = json.load(f)
model = Word2Vec(**word2vec_params)
model.build_vocab(sentences)
trained_word_count, raw_word_count = model.train(sentences, compute_loss=True,
total_examples=model.corpus_count,
epochs=model.epochs)
Text Rank
model 来自上面的 gensim.Word2vec。按每首古诗, 每次处理一小句。权重换成了 Word2vec 的 cosine similarity
# add_keywords_poetry.py
def create_word_similar_board(sentence, model):
# sentence: list of words
# return: a matrix of words similarity
num_words = len(sentence)
weight_board = np.zeros((num_words, num_words))
for i in range(num_words):
for j in range(num_words):
if j != i:
weight_board[i][j] = compute_similarity(sentence[i], sentence[j], model)
# else 0
return weight_board
def word_score_diff(score_old, score_new):
min_diff = 1e-3
diff = np.abs(score_old - score_new)
if np.any(diff > min_diff): # if anyone is larger than min diff, continue
return True
else:
return False
def compute_word_scores(weight_board, score_old):
d = 0.85
score_new = np.zeros_like(score_old)
for i in range(len(score_old)):
upper = weight_board[i] * score_old
lower = np.sum(weight_board, axis=0) # weight_board[i] = 0, so the ith nominator is 0
posi_idx = np.where(lower > 0) # avoid denominator = 0
lower = lower[posi_idx]
upper = upper[posi_idx]
score_new[i] = (1 - d) + d * np.sum(upper / lower)
return score_new
def word_rank(sentence, model, max_iter=100):
score_old = np.zeros(len(sentence))
score_new = np.ones_like(score_old)
weight_board = create_word_similar_board(sentence, model)
counter = 0
while word_score_diff(score_old, score_new) and counter < max_iter:
score_old = score_new
score_new = compute_word_scores(weight_board, score_old)
counter += 1
return score_new
def get_n_highest_rank(sentence, n, model, with_value=False):
set_sentence = list(set(sentence))
score = word_rank(set_sentence, model)
word_score = [[i, j] for i, j in zip(set_sentence, score)]
word_score.sort(key=itemgetter(1), reverse=True)
if not with_value:
word_score = [i[0] for i in word_score]
return word_score[:min(len(sentence), n)]
模型核心
class PoetryRNN(nn.Module):
def __init__(self, encoder_embed, encoder_topic_len,
decoder_embed, rnn_hidden_dim, rnn_layers, rnn_bidre, rnn_dropout,
dense_dim, dense_h_dropout,
freeze_embed=True):
super(PoetryRNN, self).__init__()
self.encoder = PoetryEncoder(encoder_embed,
rnn_hidden_dim, rnn_layers,
freeze_embed)
self.attention = KeywordAttention(encoder_topic_len, rnn_hidden_dim, encoder_embed.size(1))
self.decoder = PoetryDecoder(decoder_embed,
rnn_hidden_dim, rnn_layers, rnn_bidre, rnn_dropout,
dense_dim, dense_h_dropout,
freeze_embed)
def forward(self, batch_topic, topic_lens, batch_input, sorted_lens):
encoded_output, encoded_hidden = self.encoder.forward(batch_topic, topic_lens)
target_score, hidden = self.decoder.forward(batch_input, sorted_lens, encoded_output, encoded_hidden,
self.attention)
return target_score, hidden
GRU 编码器 (Encoder)
# model_rnn.py
class PoetryEncoder(nn.Module):
def __init__(self, encoder_embed,
rnn_hidden_dim, rnn_layers,
freeze_embed=False):
super(PoetryEncoder, self).__init__()
self.embed = nn.Embedding.from_pretrained(encoder_embed, freeze=freeze_embed)
self.vocab_dim, self.embed_hidden = encoder_embed.size()
# GRU: the output looks similar to hidden state of standard RNN
self.rnn_hidden_dim = rnn_hidden_dim // 2
self.rnn_layers = rnn_layers
self.rnn = nn.GRU(self.embed_hidden, self.rnn_hidden_dim, batch_first=True,
num_layers=self.rnn_layers, bidirectional=True)
# attention
def forward(self, batch_input, sorted_lens):
# batch_input: batch, seq_len -
# sorted_lens: batch,
# embed output: batch, seq_len,embed_dim
# pack_pad_seq input: batch, Seq_len, *
# rnn input : batch, seq_len,input_size; output: seq_len, batch, rnn_hidden_dim
# pad_pack_seq output: seq_len, batch, *
word_vec = self.embed(batch_input)
word_vec = pack_padded_sequence(word_vec, sorted_lens.tolist(), batch_first=True)
rnn_out, hidden = self.rnn(word_vec) # hidden : layer*direction, batch, hidden dim
rnn_out, _ = pad_packed_sequence(rnn_out, batch_first=True, total_length=batch_input.size(1))
hidden = self.reshape_bidirec_hidden(hidden)
return rnn_out, hidden
def merge_bidirec_hidden(self, hidden_state):
# in bi-directions layers, each layer contains 2 directions of hidden states, so
# take their average for each layer
h = hidden_state
h = torch.cat(list(((h[i] + h[i + 1]) / 2).unsqueeze(0) for i in range(0, self.rnn_layers * 2, 2)))
# c = torch.cat(list(((c[i] + c[i + 1])/2).unsqueeze(0) for i in range(0, self.rnn_layers * 2, 2)))
return (h, h)
def reshape_bidirec_hidden(self, hidden_state):
h = hidden_state
num_layers, batch, hidden_size = h.size()
h = h.reshape(num_layers // 2, batch, -1)
# c = torch.zeros_like(h)
return (h, h)
Attention
# model_rnn.py
class KeywordAttention(nn.Module):
def __init__(self, encoder_topic_len, encoder_hidden_dim, decoder_embed_hidden, attention_dropout=0.1):
super(KeywordAttention, self).__init__()
self.atten_weights = nn.Linear(encoder_hidden_dim, decoder_embed_hidden)
self.softmax = nn.Softmax(dim=2)
self.context_out = nn.Linear(encoder_topic_len + decoder_embed_hidden, decoder_embed_hidden)
self.dropout = nn.Dropout(attention_dropout)
self.activation_out = nn.SELU()
def forward(self, decoder_input, encoder_output):
# decoder_input: batch, seq_len, embedding_hidden
# rnn_output: batch, seq_len, rnn_hidden
# encoder_output: batch, topic_len, rnn_hidden
# context_state = encoder_hidden[0].t() # --> batch, num_layer, hidden_dim
context_state = self.dropout(encoder_output)
attention = self.atten_weights(context_state).transpose(1, 2) # --> batch, decoder_embed_hidden, topic_len
attention_w = decoder_input.bmm(attention) # batch, seq_len, topic_len
attention = self.softmax(attention_w)
context_concat = torch.cat([decoder_input, attention], dim=2) # batch, seq_len, topic_len+embed_hidden
out = self.context_out(context_concat) # batch, seq_len, embed_hidden
out = self.activation_out(out)
return out
LSTM 解码器 (Decoder)
# model_rnn.py
class PoetryDecoder(nn.Module):
def __init__(self, decoder_embed,
rnn_hidden_dim, rnn_layers, rnn_bidre, rnn_dropout,
dense_dim, dense_h_dropout,
freeze_embed=False):
super(PoetryDecoder, self).__init__()
# pre-trained word embedding
self.embed = nn.Embedding.from_pretrained(decoder_embed, freeze=freeze_embed)
self.vocab_dim, self.embed_hidden = decoder_embed.size()
# LSTM
self.rnn_hidden_dim = rnn_hidden_dim // 2 if rnn_bidre else rnn_hidden_dim
self.rnn_layers = rnn_layers
self.bi_direc = 2 if rnn_bidre else 1
self.rnn = nn.LSTM(self.embed_hidden, self.rnn_hidden_dim, batch_first=True,
num_layers=rnn_layers, bidirectional=rnn_bidre, dropout=rnn_dropout)
# self.init_rnn_xavier_normal()
# dense hidden layers
self.dense_h_dropout = dense_h_dropout
self.dense_h0 = self.dense_layer(rnn_hidden_dim, dense_dim, nn.SELU(), dropout=True)
# output layer
self.output_linear = nn.Linear(dense_dim, self.vocab_dim)
self.log_softmax = nn.LogSoftmax(dim=1)
def forward_(self, batch_input, sorted_lens, encoder_output, rnn_hidden, attention):
# batch_input: batch, seq_len -
# sorted_lens: batch,
# encoder_output: batch, topic_len, hidden_dim
# rnn_hidden: (h,c), h: num_layers, batch, hidden_dim (concat)
# embed output: batch, seq_len,embed_dim
# pack_pad_seq input: batch, Seq_len, *
# rnn input : batch, seq_len,input_size; output: seq_len, batch, rnn_hidden_dim
# pad_pack_seq output: seq_len, batch, *
word_vec = self.embed(batch_input)
# mask out zeros for padded 0s
word_vec = self.mask_zeros(word_vec, sorted_lens)
# attention
word_vec = attention.forward(word_vec, encoder_output)
word_vec = pack_padded_sequence(word_vec, sorted_lens.tolist(), batch_first=True)
rnn_out, hidden = self.rnn(word_vec, rnn_hidden) # hidden : layer*direction, batch, hidden dim
rnn_out, _ = pad_packed_sequence(rnn_out, batch_first=True, total_length=batch_input.size(1))
# attention
# dense layers
unpad = [rnn_out[i, :sorted_lens[i], :] for i in range(len(sorted_lens))]
decoded = [self.forward_dense(x) for x in unpad]
# final output
return decoded, hidden
def forward(self, batch_input, sorted_lens, encoder_output, rnn_hidden, attention):
decoded, hidden = self.forward_(batch_input, sorted_lens, encoder_output, rnn_hidden, attention)
target_score = [self.log_softmax(x) for x in decoded]
target_score = torch.cat(target_score) # batch*seq_len, vocab_size
return target_score, hidden
def forward_dense(self, rnn_out):
# hidden layers
dense_h0 = self.dense_h0(rnn_out)
decoded = self.output_linear(dense_h0)
return decoded
def dense_layer(self, input_dim, output_dim, activation, dropout=True):
dense = nn.Sequential(
nn.Linear(input_dim, output_dim),
nn.LayerNorm(output_dim),
activation
)
if dropout:
dense.add_module("Dropout", nn.Dropout(self.dense_h_dropout))
return dense
@staticmethod
def mask_zeros(word_vec, sorted_lens):
# word_vec: batch, seq_len, embed_dim
# Each example has different lengths, but the padded 0s have value after the embedding layer, so mask them 0
for i in range(len(sorted_lens)):
if sorted_lens[i] < word_vec[i].size(0):
word_vec[i, sorted_lens[i]:, :] = 0
return word_vec
def init_rnn_xavier_normal(self):
for name, weights in self.rnn.named_parameters():
weights = weights.view(-1, 1)
torch.nn.init.xavier_normal_(weights)
weights.squeeze()
def predict_softmax_score(self,batch_input, sorted_lens, encoder_output, rnn_hidden, attention):
assert not self.training
decoded, hidden = self.forward_(batch_input, sorted_lens, encoder_output, rnn_hidden, attention)
target_score = [F.softmax(x, dim=1) for x in decoded]
target_score = torch.cat(target_score) # batch*seq_len, vocab_size
return target_score, hidden
古诗生成: Beam Search
生成古诗时, 将选取的第一个字当成下一个字的输入数据。如果只根据softmax的值来选往后的字,得到的结果未必是最好的。更好的方法(?) 是按softmax选取前N个字, 然后用它们各生成下一步的N个字,得到N*N个字,再按softmax选前N个。
下面代码效率不高,有待改进.....
# generate.py
def beam_search_forward(model, cache, encoder_output, beam_size, skip_words=non_words,
remove_punctuation=True):
caches = []
scores = []
score_idx = []
hiddens = []
sorted_lens = torch.LongTensor([1])
for score, init, hidden in cache:
# target_score, hidden_state = model.predict_softmax_score(init[-1], hidden)
target_score, hidden_state = model.decoder.predict_softmax_score(init[-1], sorted_lens,
encoder_output, hidden,
model.attention)
best_score, index = torch.sort(target_score.squeeze(), dim=0, descending=True)
if remove_punctuation:
best_score = best_score[:beam_size * 2]
index = index[:beam_size * 2]
s = []
IDS = []
for i, idx in enumerate(index):
if idx.item() not in skip_words:
s.append(best_score[i])
IDS.append(index[i])
best_score = torch.FloatTensor(s[:beam_size])
index = torch.LongTensor(IDS[:beam_size])
else:
best_score = best_score[:beam_size]
index = index[:beam_size]
scores += list(best_score.data.numpy())
score_idx += list(index.data.numpy())
hiddens.append(hidden_state)
topN, indexs = torch.sort(torch.FloatTensor(scores), dim=0, descending=True)
top_scores = topN[:beam_size]
chosen_word = indexs[:beam_size]
chosen_word = [torch.LongTensor([i]).view(1, 1) for i in chosen_word]
for i, (score, init, hidden) in enumerate(cache):
caches.append([top_scores[i] + score, init + [chosen_word[i]], hiddens[i]])
return caches
def beam_search(model, init_word, encoder_output, rnn_hidden,
beam_size=3, text_length=100, remove_punctuation=True):
sorted_lens = torch.LongTensor([init_word.size(1)])
target_score, hidden_state = model.decoder.predict_softmax_score(init_word, sorted_lens,
encoder_output, rnn_hidden,
model.attention)
target_score = torch.sum(target_score, dim=0).squeeze()
sorted_score, index = torch.sort(target_score, descending=True)
cache = [[sorted_score[i],
[torch.LongTensor([index[i]]).view(1, 1)],
hidden_state] for i in range(beam_size)]
for i in range(text_length):
cache = beam_search_forward(model, cache, encoder_output, beam_size, remove_punctuation=remove_punctuation)
scores = [cache[i][0] for i in range(beam_size)]
max_id = scores.index(max(scores))
text = [i.item() for i in cache[max_id][1]]
return text, cache[max_id][2] # text, last_hidden_state
```
def generate_my_poetry(model, title, title_key_word, text_key_words, sentence_len=7, beam_search_size=7,
remove_punctuation=True):
assert not model.training
topic_lengths = model.attention.context_out.in_features - model.attention.context_out.out_features
iter_punctuation = cycle([comma, period])
# topic --> encoder
key_word_flatted = title_key_word + list(chain.from_iterable(text_key_words))
key_word_flatted += [0] * (topic_lengths - len(key_word_flatted))
topics = torch.LongTensor(key_word_flatted).view(1, -1)
topic_lens = torch.LongTensor([len(key_word_flatted)])
encoder_output, encoder_hidden = model.encoder.forward(topics, topic_lens)
# title
title_words = title + [title_end]
generated_text = title_words
sorted_lens = torch.LongTensor([len(title_words)])
title_words = torch.LongTensor(title_words).view(1, -1)
_, rnn_hidden = model.decoder.forward(title_words, sorted_lens,
encoder_output, encoder_hidden,
model.attention)
# main text
for words in text_key_words:
text_len = sentence_len - len(words) - 1 # exclude punctuation
init_word = torch.LongTensor(words).view(1, -1)
text, rnn_hidden = beam_search(model, init_word, encoder_output, rnn_hidden,
beam_size=beam_search_size, text_length=text_len,
remove_punctuation=remove_punctuation)
punctuation = [next(iter_punctuation)]
generated_text += words + text + punctuation
return generated_text
参考文献
Translation with a Sequence to Sequence Network and AttentionZhe Wang, Wei He, Hua Wu, Haiyang Wu, Wei Li, Haifeng Wang, Enhong Chen
Chinese Poetry Generation with Planning based Neural Network推荐阅读: