torchtext是PyTorch中用於處理文本預處理的包,高度封裝使用起來非常簡單快捷。

本文介紹如何保存Datasets對象到本地並重新載入。

如果使用過torchtext,預處理階段使用的時間一定會讓你印象深刻,如果你需要預處理的文本數據較大的話。我們以自然語言推斷(NLI)中經典的數據集SNLI為例。SNLI數據大小為570K,本人在實驗室伺服器上(TiTan XP顯卡,128G內存,28Core)預處理花費的時間為300s左右。單次處理可以勉強接受,但是如果每一次調整參數或者debug都需要等待5分鐘,相信沒有人可以接受。

預處理SNLI的代碼如下代碼所示:

from torchtext import data
from torchtext import datasets
from torchtext.vocab import GloVe

from nltk import word_tokenize
import numpy as np

class SNLI():
def __init__(self, args):
self.TEXT = data.Field(batch_first=True, include_lengths=True, tokenize=word_tokenize, lower=True)
self.LABEL = data.Field(sequential=False, unk_token=None)

self.train, self.dev, self.test = datasets.SNLI.splits(self.TEXT, self.LABEL)

self.TEXT.build_vocab(self.train, self.dev, self.test, vectors=GloVe(name=840B, dim=300))
self.LABEL.build_vocab(self.train)

self.train_iter, self.dev_iter, self.test_iter =
data.BucketIterator.splits((self.train, self.dev, self.test),
batch_size=args.batch_size,
device=args.gpu)

如果不知道上述基礎代碼什麼意思,建議參考torchtext文檔或者其他基礎教程,本文不展開介紹。

實驗中約9成的時間花費在以下數據劃分中

self.train, self.dev, self.test = datasets.SNLI.splits(self.TEXT, self.LABEL)

得到的self.train等是不可以直接序列化的,pickle和dill都不可以。通過debug可以發現處理後的數據保存在datasets.examples中,而examples是可以通過dill保存到本地的列表。

為了能夠在代碼中復用保存到本地的examples,我們需要根據examples重建datasets,即是上述的self.train,self.dev,self.test。目前官方沒有給出解決方案,思路我們參考了下面的文章,基本的思路也是保存dataset的examples。如果有其他思路,可以和我分享一下。

Use torchtext to Load NLP Datasets?

towardsdatascience.com

torchtext.dataset.SNLI類繼承自TabularDataset,而後者繼承自Dataset。Dataset新建時需要傳遞examples和fields,如下所示。

class torchtext.data.Dataset(examples, fields, filter_pred=None)

examples即是我們保存到本地的數據,使用dill再次載入即可。fields是一個字典,可以debug看具體信息,SNLI預處理中如下。

fields = {premise: self.TEXT, hypothesis: self.TEXT, label: self.LABEL}

按照如下代碼即可從本地載入example並構建datasets

# 從本地載入切分好的數據集
def load_split_datasets(self, fields):
# 載入examples
with open(snli_train_examples_path, rb)as f:
train_examples = dill.load(f)
with open(snli_dev_examples_path, rb)as f:
dev_examples = dill.load(f)
with open(snli_test_examples_path, rb)as f:
test_examples = dill.load(f)

# 恢複數據集
train = SNLIDataset(examples=train_examples, fields=fields)
dev = SNLIDataset(examples=dev_examples, fields=fields)
test = SNLIDataset(examples=test_examples, fields=fields)
return train, dev, test

總體的思路大概就是這樣,本文只是一個簡單的記錄,沒有展開過多的細節。如果遇到了相應的問題,上述對於解決問題基本也是足夠了。優化後處理時間縮短到6秒左右,本地保存的examples有200M左右。優化後的結果如下:

優化後的SNLI處理時間

最後給出優化後的全部代碼:

from torchtext import data
from torchtext.data import Dataset
from torchtext import datasets
from torchtext.vocab import GloVe

from nltk import word_tokenize
import time
import dill
from config import *

class SNLIDataset(Dataset):
@staticmethod
def sort_key(ex):
return data.interleave_keys(
len(ex.premise), len(ex.hypothesis))

class SNLI(object):
def __init__(self, args):
start_time = time.clock()
# 定義如何處理數據和標籤
print(1定義如何處理文本和標籤)
self.TEXT = data.Field(batch_first=True,
include_lengths=True, # 是否(返回)一個包含最小batch的句子長度
tokenize=word_tokenize, # 分詞
lower=True) # 數據轉換成小寫

self.LABEL = data.Field(sequential=False, # 是否把數據表示成序列,如果是False, 不能使用分詞
unk_token=None) # unk的默認為<unk>,todo 為什麼改變
step1_time = time.clock()
print( 耗時%.4fs % (step1_time - start_time))

# 劃分數據集
print(2劃分數據集)

if self.if_split_already():
print(從本地載入劃分好的數據集...)
fields = {premise: self.TEXT, hypothesis: self.TEXT, label: self.LABEL}
self.train, self.dev, self.test = self.load_split_datasets(fields)
else:
print(本地沒有發現數據集,開始劃分數據集...)
self.train, self.dev, self.test = datasets.SNLI.splits(self.TEXT, self.LABEL, root=.data)
self.dump_examples(self.train, self.dev, self.test)

step2_time = time.clock()
print( 耗時%.4fs % (step2_time - step1_time))

# 創建辭彙表
print(3創建辭彙表)
# self.TEXT.build_vocab(self.train, self.dev, self.test, vectors=GloVe(name=840B, dim=300))
# self.LABEL.build_vocab(self.train)
if os.path.exists(snli_text_vocab_path) and os.path.exists(snli_label_vocab_path):
print(載入已創建的辭彙表...)
with open(snli_text_vocab_path, rb)as f:
self.TEXT.vocab = dill.load(f)
with open(snli_label_vocab_path, rb)as f:
self.LABEL.vocab = dill.load(f)
else:
print(本地沒有發現辭彙表,新建辭彙表...)
self.TEXT.build_vocab(self.train, self.dev, self.test, vectors=GloVe(name=840B, dim=300))
self.LABEL.build_vocab(self.train)
with open(snli_text_vocab_path, wb)as f:
dill.dump(self.TEXT.vocab, f)
with open(snli_label_vocab_path, wb)as f:
dill.dump(self.LABEL.vocab, f)

step3_time = time.clock()
print( 耗時%.4fs % (step3_time - step2_time))

# 生成Batch迭代器
print(4生成Batch迭代器)
self.train_iter, self.dev_iter, self.test_iter =
data.BucketIterator.splits((self.train, self.dev, self.test),
batch_size=args.batch_size,
device=args.gpu)
step4_time = time.clock()
print( 耗時%.4fs % (step4_time - step3_time))

# 判斷是否已經劃分好數據並保存到本地
# 全部3個文件全部存在則返回True;否則返回False
def if_split_already(self):
for path in snli_split_path_lst:
if not os.path.exists(path):
return False
return True

# 從本地載入切分好的數據集
def load_split_datasets(self, fields):
# 載入examples
with open(snli_train_examples_path, rb)as f:
train_examples = dill.load(f)
with open(snli_dev_examples_path, rb)as f:
dev_examples = dill.load(f)
with open(snli_test_examples_path, rb)as f:
test_examples = dill.load(f)

# 恢複數據集
train = SNLIDataset(examples=train_examples, fields=fields)
dev = SNLIDataset(examples=dev_examples, fields=fields)
test = SNLIDataset(examples=test_examples, fields=fields)
return train, dev, test

# 將切分好的數據集保存到本地
def dump_examples(self, train, dev, test):
# 保存examples
if not os.path.exists(snli_train_examples_path):
with open(snli_train_examples_path, wb)as f:
dill.dump(train.examples, f)
if not os.path.exists(snli_dev_examples_path):
with open(snli_dev_examples_path, wb)as f:
dill.dump(dev.examples, f)
if not os.path.exists(snli_test_examples_path):
with open(snli_test_examples_path, wb)as f:
dill.dump(test.examples, f)

代碼中用到了很多路徑,可以自行修改,本人的路徑設置如下。

import os
from os.path import join

# 新建的文件夾
new_dir = []
# 當前配置文件路徑
config_path = os.path.abspath(__file__)
# 項目文件夾路徑;默認config文件在項目的一級子目錄下
project_dir_path = os.path.dirname(config_path)
# 項目的數據文件夾路徑
data_dir_path = join(project_dir_path, .data)
# 劃分後的snli數據集保存的文件夾
snli_split_dir_path = join(data_dir_path, snli_split)
snli_train_examples_path = join(snli_split_dir_path, train_examples)
snli_dev_examples_path = join(snli_split_dir_path, dev_examples)
snli_test_examples_path = join(snli_split_dir_path, test_examples)

snli_split_path_lst = [snli_train_examples_path, snli_dev_examples_path, snli_test_examples_path]

snli_text_vocab_path = join(snli_split_dir_path, text_vocab)
snli_label_vocab_path = join(snli_split_dir_path, label_vocab)

new_dir.append(snli_split_dir_path)

# 新建文件夾
for dir in new_dir:
if not os.path.exists(dir):
print(新建文件夾:, dir)
os.mkdir(dir)

推薦閱讀:

相关文章