前言:
本文章將通過pytorch作為主要工具實現不同的模型(包括HMM,CRF,Bi-LSTM,Bi-LSTM+CRF)來解決中文命名實體識別問題,文章不會涉及過多的數學推導,但會從直觀上簡單解釋模型的原理,主要的內容會集中在代碼部分。
本文的目錄結構如下:
首先,我們明確一下命名實體識別的概念:
命名實體識別(英語:Named Entity Recognition),簡稱NER,是指識別文本中具有特定意義的實體,主要包括人名、地名、機構名、專有名詞等,以及時間、數量、貨幣、比例數值等文字。
舉個例子,假如有這麼一句話:
ACM宣佈,深度學習的三位創造者Yoshua Bengio, Yann LeCun, 以及Geoffrey Hinton獲得了2019年的圖靈獎。
那麼NER的任務就是從這句話中提取出
目前在NER上表現較好的模型都是基於深度學習或者是統計學習的方法的,這些方法共同的特點都是需要大量的數據來進行學習,所以接下來我先介紹一下本項目使用的數據集的格式,好讓讀者在閱讀模型的代碼之前對數據的形式有個直觀的認識。
數據集用的是論文ACL 2018Chinese NER using Lattice LSTM中從新浪財經收集的簡曆數據,數據的格式如下,它的每一行由一個字及其對應的標註組成,標註集採用BIOES(B表示實體開頭,E表示實體結尾,I表示在實體內部,O表示非實體),句子之間用一個空行隔開。
美 B-LOC 國 E-LOC 的 O 華 B-PER 萊 I-PER 士 E-PER
我 O 跟 O 他 O 談 O 笑 O 風 O 生 O
下面是四種不同的模型以及這Ensemble這四個模型預測結果的準確率(取最好):
最後一行Ensemble是將這四個模型的預測結果結合起來,使用「投票表決」的方法得出最後的預測結果。
下面本文將詳細接受每種模型的實現:
隱馬爾可夫模型描述由一個隱藏的馬爾科夫鏈隨機生成不可觀測的狀態隨機序列,再由各個狀態生成一個觀測而產生觀測隨機序列的過程(李航 統計學習方法)。隱馬爾可夫模型由初始狀態分佈,狀態轉移概率矩陣以及觀測概率矩陣所確定。
上面的定義太過學術看不懂沒關係,我們只需要知道,NER本質上可以看成是一種序列標註問題(預測每個字的BIOES標記),在使用HMM解決NER這種序列標註問題的時候,我們所能觀測到的是字組成的序列(觀測序列),觀測不到的是每個字對應的標註(狀態序列)。
對應的,HMM的三個要素可以解釋為,初始狀態分佈就是每一個標註作為句子第一個字的標註的概率,狀態轉移概率矩陣就是由某一個標註轉移到下一個標註的概率(設狀態轉移矩陣為 ,那麼若前一個詞的標註為 ,則下一個詞的標註為 的概率為 ),觀測概率矩陣就是指在某個標註下,生成某個詞的概率。根據HMM的三個要素,我們可以定義如下的HMM模型:
class HMM(object): def __init__(self, N, M): """Args: N: 狀態數,這裡對應存在的標註的種類 M: 觀測數,這裡對應有多少不同的字 """ self.N = N self.M = M
# 狀態轉移概率矩陣 A[i][j]表示從i狀態轉移到j狀態的概率 self.A = torch.zeros(N, N) # 觀測概率矩陣, B[i][j]表示i狀態下生成j觀測的概率 self.B = torch.zeros(N, M) # 初始狀態概率 Pi[i]表示初始時刻為狀態i的概率 self.Pi = torch.zeros(N)
有了模型定義,接下來的問題就是訓練模型了。HMM模型的訓練過程對應隱馬爾可夫模型的學習問題(李航 統計學習方法),實際上就是根據訓練數據根據最大似然的方法估計模型的三個要素,即上文提到的初始狀態分佈、狀態轉移概率矩陣以及觀測概率矩陣。舉個例子幫助理解,在估計初始狀態分佈的時候,假如某個標記在數據集中作為句子第一個字的標記的次數為k,句子的總數為N,那麼該標記作為句子第一個字的概率可以近似估計為k/N,很簡單對吧,使用這種方法,我們近似估計HMM的三個要素,代碼如下(出現過的函數將用省略號代替):
k/N
class HMM(object): def __init__(self, N, M): .... def train(self, word_lists, tag_lists, word2id, tag2id): """HMM的訓練,即根據訓練語料對模型參數進行估計, 因為我們有觀測序列以及其對應的狀態序列,所以我們 可以使用極大似然估計的方法來估計隱馬爾可夫模型的參數 參數: word_lists: 列表,其中每個元素由字組成的列表,如 [擔,任,科,員] tag_lists: 列表,其中每個元素是由對應的標註組成的列表,如 [O,O,B-TITLE, E-TITLE] word2id: 將字映射為ID tag2id: 字典,將標註映射為ID """
assert len(tag_lists) == len(word_lists)
# 估計轉移概率矩陣 for tag_list in tag_lists: seq_len = len(tag_list) for i in range(seq_len - 1): current_tagid = tag2id[tag_list[i]] next_tagid = tag2id[tag_list[i+1]] self.A[current_tagid][next_tagid] += 1 # 一個重要的問題:如果某元素沒有出現過,該位置為0,這在後續的計算中是不允許的 # 解決方法:我們將等於0的概率加上很小的數 self.A[self.A == 0.] = 1e-10 self.A = self.A / self.A.sum(dim=1, keepdim=True)
# 估計觀測概率矩陣 for tag_list, word_list in zip(tag_lists, word_lists): assert len(tag_list) == len(word_list) for tag, word in zip(tag_list, word_list): tag_id = tag2id[tag] word_id = word2id[word] self.B[tag_id][word_id] += 1 self.B[self.B == 0.] = 1e-10 self.B = self.B / self.B.sum(dim=1, keepdim=True)
# 估計初始狀態概率 for tag_list in tag_lists: init_tagid = tag2id[tag_list[0]] self.Pi[init_tagid] += 1 self.Pi[self.Pi == 0.] = 1e-10 self.Pi = self.Pi / self.Pi.sum()
模型訓練完畢之後,要利用訓練好的模型進行解碼,就是對給定的模型未見過的句子,求句子中的每個字對應的標註,針對這個解碼問題,我們使用的是維特比(viterbi)演算法。關於該演算法的數學推導,可以查閱一下李航統計學習方法10.4.2,或者是Speech and Language Processing8.4.5,這兩本參考書都講的十分詳細並且容易理解,建議閱讀過相關部分再來看看代碼加深理解。
它實現的細節如下:
class HMM(object): ... def decoding(self, word_list, word2id, tag2id): """ 使用維特比演算法對給定觀測序列求狀態序列, 這裡就是對字組成的序列,求其對應的標註。 維特比演算法實際是用動態規劃解隱馬爾可夫模型預測問題,即用動態規劃求概率最大路徑(最優路徑) 這時一條路徑對應著一個狀態序列 """ # 問題:整條鏈很長的情況下,十分多的小概率相乘,最後可能造成下溢 # 解決辦法:採用對數概率,這樣源空間中的很小概率,就被映射到對數空間的大的負數 # 同時相乘操作也變成簡單的相加操作 A = torch.log(self.A) B = torch.log(self.B) Pi = torch.log(self.Pi)
# 初始化 維比特矩陣viterbi 它的維度為[狀態數, 序列長度] # 其中viterbi[i, j]表示標註序列的第j個標註為i的所有單個序列(i_1, i_2, ..i_j)出現的概率最大值 seq_len = len(word_list) viterbi = torch.zeros(self.N, seq_len) # backpointer是跟viterbi一樣大小的矩陣 # backpointer[i, j]存儲的是 標註序列的第j個標註為i時,第j-1個標註的id # 等解碼的時候,我們用backpointer進行回溯,以求出最優路徑 backpointer = torch.zeros(self.N, seq_len).long()
# self.Pi[i] 表示第一個字的標記為i的概率 # Bt[word_id]表示字為word_id的時候,對應各個標記的概率 # self.A.t()[tag_id]表示各個狀態轉移到tag_id對應的概率
# 所以第一步為 start_wordid = word2id.get(word_list[0], None) Bt = B.t() if start_wordid is None: # 如果字不再字典裏,則假設狀態的概率分佈是均勻的 bt = torch.log(torch.ones(self.N) / self.N) else: bt = Bt[start_wordid] viterbi[:, 0] = Pi + bt backpointer[:, 0] = -1
# 遞推公式: # viterbi[tag_id, step] = max(viterbi[:, step-1]* self.A.t()[tag_id] * Bt[word]) # 其中word是step時刻對應的字 # 由上述遞推公式求後續各步 for step in range(1, seq_len): wordid = word2id.get(word_list[step], None) # 處理字不在字典中的情況 # bt是在t時刻字為wordid時,狀態的概率分佈 if wordid is None: # 如果字不再字典裏,則假設狀態的概率分佈是均勻的 bt = torch.log(torch.ones(self.N) / self.N) else: bt = Bt[wordid] # 否則從觀測概率矩陣中取bt for tag_id in range(len(tag2id)): max_prob, max_id = torch.max( viterbi[:, step-1] + A[:, tag_id], dim=0 ) viterbi[tag_id, step] = max_prob + bt[tag_id] backpointer[tag_id, step] = max_id
# 終止, t=seq_len 即 viterbi[:, seq_len]中的最大概率,就是最優路徑的概率 best_path_prob, best_path_pointer = torch.max( viterbi[:, seq_len-1], dim=0 )
# 回溯,求最優路徑 best_path_pointer = best_path_pointer.item() best_path = [best_path_pointer] for back_step in range(seq_len-1, 0, -1): best_path_pointer = backpointer[best_path_pointer, back_step] best_path_pointer = best_path_pointer.item() best_path.append(best_path_pointer)
# 將tag_id組成的序列轉化為tag assert len(best_path) == len(word_list) id2tag = dict((id_, tag) for tag, id_ in tag2id.items()) tag_list = [id2tag[id_] for id_ in reversed(best_path)]
return tag_list
以上就是HMM的實現了,全部代碼可見文末。
上面講的HMM模型中存在兩個假設,一是輸出觀察值之間嚴格獨立,二是狀態轉移過程中當前狀態只與前一狀態有關。也就是說,在命名實體識別的場景下,HMM認為觀測到的句子中的每個字都是相互獨立的,而且當前時刻的標註只與前一時刻的標註相關。但實際上,命名實體識別往往需要更多的特徵,比如詞性,詞的上下文等等,同時當前時刻的標註應該與前一時刻以及後一時刻的標註都相關聯。由於這兩個假設的存在,顯然HMM模型在解決命名實體識別的問題上是存在缺陷的。
而條件隨機場就沒有這種問題,它通過引入自定義的特徵函數,不僅可以表達觀測之間的依賴,還可表示當前觀測與前後多個狀態之間的複雜依賴,可以有效克服HMM模型面臨的問題。
下面是條件隨機場的數學形式(如果覺得不好理解。也可以直接跳到代碼部分):
為了建立一個條件隨機場,我們首先要定義一個特徵函數集,該函數集內的每個特徵函數都以標註序列作為輸入,提取的特徵作為輸出。假設該函數集為:
其中 表示觀測序列, 表示狀態序列。然後,條件隨機場使用對數線性模型來計算給定觀測序列下狀態序列的條件概率:
其中 是是所有可能的狀態序列, 是條件隨機場模型的參數,可以把它看成是每個特徵函數的權重。CRF模型的訓練其實就是對參數 的估計。假設我們有 個已經標註好的數據 ,
則其對數似然函數的正則化形式如下:
那麼,最優參數 就是:
模型訓練結束之後,對給定的觀測序列 ,它對應的最優狀態序列應該是:
解碼的時候與HMM類似,也可以採用維特比演算法。
下面是代碼實現:
from sklearn_crfsuite import CRF # CRF的具體實現太過複雜,這裡我們藉助一個外部的庫
def word2features(sent, i): """抽取單個字的特徵""" word = sent[i] prev_word = "<s>" if i == 0 else sent[i-1] next_word = "</s>" if i == (len(sent)-1) else sent[i+1] # 因為每個詞相鄰的詞會影響這個詞的標記 # 所以我們使用: # 前一個詞,當前詞,後一個詞, # 前一個詞+當前詞, 當前詞+後一個詞 # 作為特徵 features = { w: word, w-1: prev_word, w+1: next_word, w-1:w: prev_word+word, w:w+1: word+next_word, bias: 1 } return features
def sent2features(sent): """抽取序列特徵""" return [word2features(sent, i) for i in range(len(sent))]
class CRFModel(object): def __init__(self, algorithm=lbfgs, c1=0.1, c2=0.1, max_iterations=100, all_possible_transitions=False ):
self.model = CRF(algorithm=algorithm, c1=c1, c2=c2, max_iterations=max_iterations, all_possible_transitions=all_possible_transitions)
def train(self, sentences, tag_lists): """訓練模型""" features = [sent2features(s) for s in sentences] self.model.fit(features, tag_lists)
def test(self, sentences): """解碼,對給定句子預測其標註""" features = [sent2features(s) for s in sentences] pred_tag_lists = self.model.predict(features) return pred_tag_lists
除了以上兩種基於概率圖模型的方法,LSTM也常常被用來解決序列標註問題。和HMM、CRF不同的是,LSTM是依靠神經網路超強的非線性擬合能力,在訓練時將樣本通過高維空間中的複雜非線性變換,學習到從樣本到標註的函數,之後使用這個函數為指定的樣本預測每個token的標註。下方就是使用雙向LSTM(雙向能夠更好的捕捉序列之間的依賴關係)進行序列標註的示意圖:
LSTM比起CRF模型最大的好處就是簡單粗暴,不需要做繁雜的特徵工程,直接訓練即可,同時比起HMM,LSTM的準確率也比較高。
下面是基於雙向LSTM的序列標註模型的實現:
import torch import torch.nn as nn from torch.nn.utils.rnn import pad_packed_sequence, pack_padded_sequence
class BiLSTM(nn.Module): def __init__(self, vocab_size, emb_size, hidden_size, out_size): """初始化參數: vocab_size:字典的大小 emb_size:詞向量的維數 hidden_size:隱向量的維數 out_size:標註的種類 """ super(BiLSTM, self).__init__() self.embedding = nn.Embedding(vocab_size, emb_size) self.bilstm = nn.LSTM(emb_size, hidden_size, batch_first=True, bidirectional=True)
self.lin = nn.Linear(2*hidden_size, out_size)
def forward(self, sents_tensor, lengths): emb = self.embedding(sents_tensor) # [B, L, emb_size]
packed = pack_padded_sequence(emb, lengths, batch_first=True) rnn_out, _ = self.bilstm(packed) # rnn_out:[B, L, hidden_size*2] rnn_out, _ = pad_packed_sequence(rnn_out, batch_first=True)
scores = self.lin(rnn_out) # [B, L, out_size]
return scores
def test(self, sents_tensor, lengths, _): """解碼""" logits = self.forward(sents_tensor, lengths) # [B, L, out_size] _, batch_tagids = torch.max(logits, dim=2)
return batch_tagid
簡單的LSTM的優點是能夠通過雙向的設置學習到觀測序列(輸入的字)之間的依賴,在訓練過程中,LSTM能夠根據目標(比如識別實體)自動提取觀測序列的特徵,但是缺點是無法學習到狀態序列(輸出的標註)之間的關係,要知道,在命名實體識別任務中,標註之間是有一定的關係的,比如B類標註(表示某實體的開頭)後面不會再接一個B類標註,所以LSTM在解決NER這類序列標註任務時,雖然可以省去很繁雜的特徵工程,但是也存在無法學習到標註上下文的缺點。
相反,CRF的優點就是能對隱含狀態建模,學習狀態序列的特點,但它的缺點是需要手動提取序列特徵。所以一般的做法是,在LSTM後面再加一層CRF,以獲得兩者的優點。
下面是給Bi-LSTM加一層CRF的代碼實現:
class BiLSTM_CRF(nn.Module): def __init__(self, vocab_size, emb_size, hidden_size, out_size): """初始化參數: vocab_size:字典的大小 emb_size:詞向量的維數 hidden_size:隱向量的維數 out_size:標註的種類 """ super(BiLSTM_CRF, self).__init__() # 這裡的BiLSTM就是LSTM模型部分所定義的BiLSTM模型 self.bilstm = BiLSTM(vocab_size, emb_size, hidden_size, out_size)
# CRF實際上就是多學習一個轉移矩陣 [out_size, out_size] 初始化為均勻分佈 self.transition = nn.Parameter( torch.ones(out_size, out_size) * 1/out_size) # self.transition.data.zero_()
def forward(self, sents_tensor, lengths): # [B, L, out_size] emission = self.bilstm(sents_tensor, lengths)
# 計算CRF scores, 這個scores大小為[B, L, out_size, out_size] # 也就是每個字對應對應一個 [out_size, out_size]的矩陣 # 這個矩陣第i行第j列的元素的含義是:上一時刻tag為i,這一時刻tag為j的分數 batch_size, max_len, out_size = emission.size() crf_scores = emission.unsqueeze( 2).expand(-1, -1, out_size, -1) + self.transition.unsqueeze(0)
return crf_scores
def test(self, test_sents_tensor, lengths, tag2id): """使用維特比演算法進行解碼""" start_id = tag2id[<start>] end_id = tag2id[<end>] pad = tag2id[<pad>] tagset_size = len(tag2id)
crf_scores = self.forward(test_sents_tensor, lengths) device = crf_scores.device # B:batch_size, L:max_len, T:target set size B, L, T, _ = crf_scores.size() # viterbi[i, j, k]表示第i個句子,第j個字對應第k個標記的最大分數 viterbi = torch.zeros(B, L, T).to(device) # backpointer[i, j, k]表示第i個句子,第j個字對應第k個標記時前一個標記的id,用於回溯 backpointer = (torch.zeros(B, L, T).long() * end_id).to(device) lengths = torch.LongTensor(lengths).to(device) # 向前遞推 for step in range(L): batch_size_t = (lengths > step).sum().item() if step == 0: # 第一個字它的前一個標記只能是start_id viterbi[:batch_size_t, step, :] = crf_scores[: batch_size_t, step, start_id, :] backpointer[: batch_size_t, step, :] = start_id else: max_scores, prev_tags = torch.max( viterbi[:batch_size_t, step-1, :].unsqueeze(2) + crf_scores[:batch_size_t, step, :, :], # [B, T, T] dim=1 ) viterbi[:batch_size_t, step, :] = max_scores backpointer[:batch_size_t, step, :] = prev_tags
# 在回溯的時候我們只需要用到backpointer矩陣 backpointer = backpointer.view(B, -1) # [B, L * T] tagids = [] # 存放結果 tags_t = None for step in range(L-1, 0, -1): batch_size_t = (lengths > step).sum().item() if step == L-1: index = torch.ones(batch_size_t).long() * (step * tagset_size) index = index.to(device) index += end_id else: prev_batch_size_t = len(tags_t)
new_in_batch = torch.LongTensor( [end_id] * (batch_size_t - prev_batch_size_t)).to(device) offset = torch.cat( [tags_t, new_in_batch], dim=0 ) # 這個offset實際上就是前一時刻的 index = torch.ones(batch_size_t).long() * (step * tagset_size) index = index.to(device) index += offset.long()
tags_t = backpointer[:batch_size_t].gather( dim=1, index=index.unsqueeze(1).long()) tags_t = tags_t.squeeze(1) tagids.append(tags_t.tolist())
# tagids:[L-1](L-1是因為扣去了end_token),大小的liebiao # 其中列表內的元素是該batch在該時刻的標記 # 下面修正其順序,並將維度轉換為 [B, L] tagids = list(zip_longest(*reversed(tagids), fillvalue=pad)) tagids = torch.Tensor(tagids).long()
# 返回解碼的結果 return tagids
以上就是這四個模型的具體實現了,模型的效果比較在前面已經給出了。
全部的代碼地址在
(這個項目後續會保持更新,同時嘗試使用其他模型來做這個問題,如果覺得有幫助可以點個star哦~)
CRF Layer on the Top of BiLSTM - 5?createmomo.github.ioPENG:Bi-LSTM-CRF for Sequence Labeling?zhuanlan.zhihu.comPytorch Bi-LSTM + CRF 代碼詳解?blog.csdn.net 推薦閱讀: