緣起

DeepFM不算什麼新技術了,用TensorFlow實現DeepFM也有開源實現,那我為什麼要炒這個冷飯,重複造輪子?

用Google搜索「TensorFlow+DeepFM」,一般都能搜索到「ChenglongChen/tensorflow-DeepFM」和「lambdaJi的TensorFlow Estimator of DeepFM」這二位的實現。二位不僅用TensorFlow實現了DeepFM,還在Criteo數據集上,給出了完整的訓練、測試的代碼,的確給了我很大的啟發,在這裡要表示感謝。

但是,同樣是由於二位的實現都是根據Criteo簡單數據集的,使他們的代碼,如果移植到實際的推薦系統中,存在一定困難。比如:

稀疏要求。儘管criteo的原始數據集是排零存儲的,但是以上的兩個實現,都是用稠密矩陣來表示輸入,將0又都補了回來。這種做法,在criteo這種只有39列的簡單數據集上是可行的,但是實際系統中,特徵數量以千、萬計,這種稀疏轉稠密的方式是不可取的。

一列多值的要求。Criteo數據集有13列numeric特徵+26列categorical特徵,所有列都只有一個值。但是,在實際系統中,一個field下往往有多個<feature:value>對。比如,我們用三個field來描述一個用戶的手機 使用習慣,「近xxx天活躍app」+「近xxx天新安裝app」+「近xxx天卸載app」。每個field下,再有「微信:0.9,微博:0.5,淘寶:0.3,……」等一系列的feature和它們的數值。

這個要求固然可以通過,去除field這個「特徵單位」,只針對一個個獨立的feature來建模。但是,這樣一來,既憑空增加了模型的規模,又破壞模型的「層次化」與「模塊化」,使代碼不易擴展與維護。

權值共享的要求。Criteo數據集經過脫敏感處理,我們無法知道每列的具體含義,自然也就沒有列與列之間共享權重的需求,以上提到的兩個實現也就只用一整塊稠密矩陣來建模embedding矩陣。

但是,以上面提到的「近xxx天活躍app」+「近xxx天新安裝app」+「近xxx天卸載app」這三個field為例,這些 field中的feature都來源於同一個」app字典」。如果不做權重共享,

  • 每個field都使用獨立的embedding矩陣來映射app向量,整個模型需要優化的變數是共享權重模型的3倍,既耗費了更多的計算資源,也容易導致過擬合。
  • 每個field的稀疏程度是不一樣的,同一個app,在「活躍列表」中出現得更頻繁,其embedding向量就有更多的訓練機會,而在「卸載列表」中較少出現,其embedding向量得不到足夠訓練,恐怕最後與隨機初始化無異。

因此,在實際系統中,「共享權重」是必須的,

  • 減小優化變數的數目,既節省計算資源,又減輕「過擬合」風險
  • 同一個embedding矩陣,為多個field提供映射向量,類似於「多任務學習」,使每個embedding向量得到更多的訓練機會,同時也要滿足多個field的需求(比如同一個app的向量,既要體現『經常使用它』對y的影響,也要體現『卸載它』對y值的影響),也降低了「過擬合」的風險。

正因為在目前我能夠找到的基於TensorFlow實現的DeepFM中,沒有一個能夠滿足以上「稀疏」、「多值」、「共享權重」這三個要求的,所以,我自己動手實現了一個,代碼見我的github。接下來,我簡單講解一下我的代碼。

數據預處理

我依然用criteo數據集來做演示之用。為了演示「一列多值」和「稀疏」,我把criteo中的特徵分為兩個field,所有數值特徵I1~I13歸為numeric field,所有類別特徵C1~C26歸為categorical field。

需要特別指出的是:

  • 這種處理方法,不是為了提高criteo數據集上的模型性能,只是為了模擬實際系統中將會遇到的「一列多值」和「稀疏」數據集。接下來會看到,DeepFM中,FM中的二階交叉,不會受拆分成兩個field的影響。受影響的主要是Deep側的輸入層,詳情見」DNN預測部分」一節 。
  • 另外,criteo數據集無法演示「權重共享」的功能。

對criteo中數值特徵與類別特徵,都是最常規的預處理,不是這次演示的重點

  • 數值特徵,因為多數表示"次數",因此先做了一個log變化,減弱長尾數據的影響,再做了一個min/max scaling,畢竟底層還是線性演算法,要排除特徵間不同scale的影響。注意,千萬不能做「zero mean, unit variance」的standardize,因為那樣會破壞數據的稀疏性
  • 類別特徵,剔除了一些生僻的tag,建立字典,將原始數據中的字元串tag轉化為整數的index

預處理的代碼見criteo_data_preproc.py,處理好的數據文件如下所示,圖中的亮塊是列分隔符。可以看到,每列是由多個tag_index:value「鍵值對」組成的,而不同行中「鍵值對」個數互不同,而value絕沒有0,實現排零、稀疏存儲

輸入數據

input_fn

為了配合TensorFlow Estimator,我們需要定義input_fn來讀取上圖所示的數據。

看似簡單的任務,實現起來,卻很花費了我一番功夫:

  • 網上能夠搜到的TensorFlow讀文本文件的代碼,都是讀「每列只有一個值的csv」這樣規則的數據格式。但是,上圖所示的數據,卻非常不規則,每行先是由「 」分隔,第列中再由「,」分隔成數目不同的「鍵值對」,每個『鍵值對』再由「:」分隔
  • 我希望提供給model稀疏矩陣,方便model中排零計算,提升效率。

最終,解析一行文本的代碼如下。

def _decode_tsv(line):
columns = tf.decode_csv(line, record_defaults=DEFAULT_VALUES, field_delim= )
y = columns[0]

feat_columns = dict(zip((t[0] for t in COLUMNS_MAX_TOKENS), columns[1:]))
X = {}
for colname, max_tokens in COLUMNS_MAX_TOKENS:
# 調用string_split時,第一個參數必須是一個list,所以要把columns[colname]放在[]中
# 這時每個kv還是k:v這樣的字元串
kvpairs = tf.string_split([feat_columns[colname]], ,).values[:max_tokens]

# k,v已經拆開, kvpairs是一個SparseTensor,因為每個kvpair格式相同,都是"k:v"
# 既不會出現"k",也不會出現"k:v1:v2:v3:..."
# 所以,這時的kvpairs實際上是一個滿陣
kvpairs = tf.string_split(kvpairs, :)

# kvpairs是一個[n_valid_pairs,2]矩陣
kvpairs = tf.reshape(kvpairs.values, kvpairs.dense_shape)

feat_ids, feat_vals = tf.split(kvpairs, num_or_size_splits=2, axis=1)
feat_ids = tf.string_to_number(feat_ids, out_type=tf.int32)
feat_vals = tf.string_to_number(feat_vals, out_type=tf.float32)

# 不能調用squeeze, squeeze的限制太多, 當原始矩陣有1行或0行時,squeeze都會報錯
X[colname + "_ids"] = tf.reshape(feat_ids, shape=[-1])
X[colname + "_values"] = tf.reshape(feat_vals, shape=[-1])

return X, y

然後,將整個文件轉化成TensorFlow Dataset的代碼如下所示。每一個field「xxx」在dataset中將由兩個SparseTensor表示,「xxx_ids」表示sparse ids,「xxx_values」表示sparse values。

def input_fn(data_file, n_repeat, batch_size, batches_per_shuffle):
# ----------- prepare padding
pad_shapes = {}
pad_values = {}
for c, max_tokens in COLUMNS_MAX_TOKENS:
pad_shapes[c + "_ids"] = tf.TensorShape([max_tokens])
pad_shapes[c + "_values"] = tf.TensorShape([max_tokens])

pad_values[c + "_ids"] = -1 # 0 is still valid token-id, -1 for padding
pad_values[c + "_values"] = 0.0

# no need to pad labels
pad_shapes = (pad_shapes, tf.TensorShape([]))
pad_values = (pad_values, 0)

# ----------- define reading ops
dataset = tf.data.TextLineDataset(data_file).skip(1) # skip the header
dataset = dataset.map(_decode_tsv, num_parallel_calls=4)

if batches_per_shuffle > 0:
dataset = dataset.shuffle(batches_per_shuffle * batch_size)

dataset = dataset.repeat(n_repeat)
dataset = dataset.padded_batch(batch_size=batch_size,
padded_shapes=pad_shapes,
padding_values=pad_values)

iterator = dataset.make_one_shot_iterator()
dense_Xs, ys = iterator.get_next()

# ----------- convert dense to sparse
sparse_Xs = {}
for c, _ in COLUMNS_MAX_TOKENS:
for suffix in ["ids", "values"]:
k = "{}_{}".format(c, suffix)
sparse_Xs[k] = tf_utils.to_sparse_input_and_drop_ignore_values(dense_Xs[k])

# ----------- return
return sparse_Xs, ys

其中也不得不調用padded_batch補齊,這一步也將稀疏格式轉化成了稠密格式,不過只是在一個batch(batch_size=128已經算很大了)中臨時稠密一下,很快就又通過調用to_sparse_input_and_drop_ignore_values這個函數重新轉化成稀疏格式了。to_sparse_input_and_drop_ignore_values實際上是從feature_column.py這個module中的_to_sparse_input_and_drop_ignore_values函數拷貝而來,因為原函數不是public的,無法在featurecolumn.py以外調用,所以我將它的代碼拷貝到tf_utils.py中。

建立共享權重

重申幾個概念。比如我們的特徵集中包括active_pkgs(app活躍情況)、install_pkgs(app安裝情況)、uninstall_pkgs(app卸載情況)。每列包含的內容是一系列feature和其數值,比如qq:0.1, weixin:0.9, taobao:1.1, ……。這些feature都來源於同一份名為package的字典

  • field就是active_pkgs、install_pkgs、uninstall_pkgs這些大類,是DataFrame中的每一列
  • feature就是每個field下包含的具體內容,一個field下允許存在多個feature,比如前面提到的qq, weixin, taobao這樣的app名稱。
  • vocabulary對應例子中的「package字典」。不同field下的feature可以來自同一個vocabulary,即若干field共享vocabulary

建立共享權重的代碼如下所示:

  • 一個vocab對應兩個embedding矩陣,一個對應FM中的線性部分的權重,另一個對應FM與DNN共享的隱向量(用於二階與高階交叉)。
  • 所有embedding矩陣,以」字典名」存入dict。不同field只要指定相同的「字典名」,就可以共享同一套embedding矩陣

class EmbeddingTable:
def __init__(self):
self._weights = {}

def add_weights(self, vocab_name, vocab_size, embed_dim):
"""
:param vocab_name: 一個field擁有兩個權重矩陣,一個用於線性連接,另一個用於非線性(二階或更高階交叉)連接
:param vocab_size: 字典總長度
:param embed_dim: 二階權重矩陣shape=[vocab_size, order2dim],映射成的embedding
既用於接入DNN的第一屋,也是用於FM二階交互的隱向量
:return: None
"""
linear_weight = tf.get_variable(name={}_linear_weight.format(vocab_name),
shape=[vocab_size, 1],
initializer=tf.glorot_normal_initializer(),
dtype=tf.float32)

# 二階(FM)與高階(DNN)的特徵交互,共享embedding矩陣
embed_weight = tf.get_variable(name={}_embed_weight.format(vocab_name),
shape=[vocab_size, embed_dim],
initializer=tf.glorot_normal_initializer(),
dtype=tf.float32)

self._weights[vocab_name] = (linear_weight, embed_weight)

def get_linear_weights(self, vocab_name): return self._weights[vocab_name][0]

def get_embed_weights(self, vocab_name): return self._weights[vocab_name][1]

def build_embedding_table(params):
embed_dim = params[embed_dim] # 必須有統一的embedding長度

embedding_table = EmbeddingTable()
for vocab_name, vocab_size in params[vocab_sizes].items():
embedding_table.add_weights(vocab_name=vocab_name, vocab_size=vocab_size, embed_dim=embed_dim)

return embedding_table

線性預測部分

def output_logits_from_linear(features, embedding_table, params):
field2vocab_mapping = params[field_vocab_mapping]
combiner = params.get(multi_embed_combiner, sum)

fields_outputs = []
# 當前field下有一系列的<tag:value>對,每個tag對應一個bias(待優化),
# 將所有tag對應的bias,按照其value進行加權平均,得到這個field對應的bias
for fieldname, vocabname in field2vocab_mapping.items():
sp_ids = features[fieldname + "_ids"]
sp_values = features[fieldname + "_values"]

linear_weights = embedding_table.get_linear_weights(vocab_name=vocabname)

# weights: [vocab_size,1]
# sp_ids: [batch_size, max_tags_per_example]
# sp_weights: [batch_size, max_tags_per_example]
# output: [batch_size, 1]
output = embedding_ops.safe_embedding_lookup_sparse(linear_weights, sp_ids, sp_values,
combiner=combiner,
name={}_linear_output.format(fieldname))

fields_outputs.append(output)

# 因為不同field可以共享同一個vocab的linear weight,所以將各個field的output相加,會損失大量的信息
# 因此,所有field對應的output拼接起來,反正每個field的output都是[batch_size,1],拼接起來,並不佔多少空間
# whole_linear_output: [batch_size, total_fields]
whole_linear_output = tf.concat(fields_outputs, axis=1)
tf.logging.info("linear output, shape={}".format(whole_linear_output.shape))

# 再映射到final logits(二分類,也是[batch_size,1])
# 這時,就不要用任何activation了,特別是ReLU
return tf.layers.dense(whole_linear_output, units=1, use_bias=True, activation=None)

二階交互預測部分

二階交互部分與DeepFM論文中稍有不同,而是使用了《Neural Factorization Machines for Sparse Predictive Analytics》中Bi-Interaction的公式。這也是網上實現的通用做法。

而我的實現與上邊公式最大的不同,就是不再只有一個embedding矩陣V,而是每個feature根據自己所在的field,再根據超參指定的field與vocabulary的映射關係,找到自己對應的embedding矩陣。某個field對應的embedding矩陣有可能是與另外一個field共享的。

另外, x_iv_i 實現了稀疏矩陣相乘,基於embedding_ops.safe_embedding_lookup_sparse實現。

def output_logits_from_bi_interaction(features, embedding_table, params):
field2vocab_mapping = params[field_vocab_mapping]

# 論文上的公式就是要求sum,而且我也試過mean和sqrtn,都比用mean要差上很多
# 但是,這種情況,僅僅是針對criteo數據的,還是理論上就必須用sum,而不能用mean和sqrtn
# 我還不太確定,所以保留一個介面能指定其他combiner的方法
combiner = params.get(multi_embed_combiner, sum)

# 見《Neural Factorization Machines for Sparse Predictive Analytics》論文的公式(4)
fields_embeddings = []
fields_squared_embeddings = []

for fieldname, vocabname in field2vocab_mapping.items():
sp_ids = features[fieldname + "_ids"]
sp_values = features[fieldname + "_values"]

# --------- embedding
embed_weights = embedding_table.get_embed_weights(vocabname)
# embedding: [batch_size, embed_dim]
embedding = embedding_ops.safe_embedding_lookup_sparse(embed_weights, sp_ids, sp_values,
combiner=combiner,
name={}_embedding.format(fieldname))
fields_embeddings.append(embedding)

# --------- square of embedding
squared_emb_weights = tf.square(embed_weights)

squared_sp_values = tf.SparseTensor(indices=sp_values.indices,
values=tf.square(sp_values.values),
dense_shape=sp_values.dense_shape)

# squared_embedding: [batch_size, embed_dim]
squared_embedding = embedding_ops.safe_embedding_lookup_sparse(squared_emb_weights, sp_ids, squared_sp_values,
combiner=combiner,
name={}_squared_embedding.format(fieldname))
fields_squared_embeddings.append(squared_embedding)

# calculate bi-interaction
sum_embedding_then_square = tf.square(tf.add_n(fields_embeddings)) # [batch_size, embed_dim]
square_embedding_then_sum = tf.add_n(fields_squared_embeddings) # [batch_size, embed_dim]
bi_interaction = 0.5 * (sum_embedding_then_square - square_embedding_then_sum) # [batch_size, embed_dim]
tf.logging.info("bi-interaction, shape={}".format(bi_interaction.shape))

# calculate logits
logits = tf.layers.dense(bi_interaction, units=1, use_bias=True, activation=None)

# 因為FM與DNN共享embedding,所以除了logits,還返回各field的embedding,方便搭建DNN
return logits, fields_embeddings

DNN預測部分

再次聲明,將criteo中原來的39列,拆分成2個field,並不是為了提升預測性能,只是為了模擬實際場景。導致的後果就是,Deep側第一層的輸入由原來的[batch_size, 39*embed_dim]變成了[batch_size, 2*embed_dim],使Deep側交叉不足。

儘管在criteo數據集上,deep側的輸入由feature_size*embed_dim變成了field_size*embed_dim,限制了交叉能力。但是,在實際系統中,field_size已經是成千上萬了,而每個field下的feature又是成千上萬,而且,因為embedding是稠密的,沒有稀疏優化的可能性。因此,在接入deep側之前,每個field內部先做一層pooling,將deep側輸入由feature_size*embed_dim壓縮成field_size*embed_dim,對於大規模機器學習,是十分必要的。

DNN的代碼如下所示。可以看到,其中沒有加入L1/L2 regularization,這是模仿TensorFlow自帶的Wide & Deep實現DNNLinearCombinedClassifier的寫法。L1/L2正則將通過設置optimizer的參數來實現。

def output_logits_from_dnn(fields_embeddings, params, is_training):
dropout_rate = params[dropout_rate]
do_batch_norm = params[batch_norm]

X = tf.concat(fields_embeddings, axis=1)
tf.logging.info("initial input to DNN, shape={}".format(X.shape))

for idx, n_units in enumerate(params[hidden_units], start=1):
X = tf.layers.dense(X, units=n_units, activation=tf.nn.relu)
tf.logging.info("layer[{}] output shape={}".format(idx, X.shape))

X = tf.layers.dropout(inputs=X, rate=dropout_rate, training=is_training)
if is_training:
tf.logging.info("layer[{}] dropout {}".format(idx, dropout_rate))

if do_batch_norm:
# BatchNormalization的調用、參數,是從DNNLinearCombinedClassifier源碼中拷貝過來的
batch_norm_layer = normalization.BatchNormalization(momentum=0.999, trainable=True,
name=batchnorm_{}.format(idx))
X = batch_norm_layer(X, training=is_training)

if is_training:
tf.logging.info("layer[{}] batch-normalize".format(idx))

# connect to final logits, [batch_size,1]
return tf.layers.dense(X, units=1, use_bias=True, activation=None)

model_fn

前面的代碼完成了「線性預測」+「二次交叉預測」+「深度預測」,則model_fn的實現就非常簡單了,只不過將三個部分得到的logits相加就可以了。

def model_fn(features, labels, mode, params):
for featname, featvalues in features.items():
if not isinstance(featvalues, tf.SparseTensor):
raise TypeError("feature[{}] isnt SparseTensor".format(featname))

# ============= build the graph
embedding_table = build_embedding_table(params)

linear_logits = output_logits_from_linear(features, embedding_table, params)

bi_interact_logits, fields_embeddings = output_logits_from_bi_interaction(features, embedding_table, params)

dnn_logits = output_logits_from_dnn(fields_embeddings, params, (mode == tf.estimator.ModeKeys.TRAIN))

general_bias = tf.get_variable(name=general_bias, shape=[1], initializer=tf.constant_initializer(0.0))

logits = linear_logits + bi_interact_logits + dnn_logits
logits = tf.nn.bias_add(logits, general_bias) # bias_add,獲取broadcasting的便利

# reshape [batch_size,1] to [batch_size], to match the shape of labels
logits = tf.reshape(logits, shape=[-1])

probabilities = tf.sigmoid(logits)

# ============= predict spec
if mode == tf.estimator.ModeKeys.PREDICT:
return tf.estimator.EstimatorSpec(
mode=mode,
predictions={probabilities: probabilities})

# ============= evaluate spec
# STUPID TENSORFLOW CANNOT AUTO-CAST THE LABELS FOR ME
loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=logits, labels=tf.cast(labels, tf.float32)))

eval_metric_ops = {auc: tf.metrics.auc(labels, probabilities)}
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(
mode=mode,
loss=loss,
eval_metric_ops=eval_metric_ops)

# ============= train spec
assert mode == tf.estimator.ModeKeys.TRAIN
train_op = params[optimizer].minimize(loss, global_step=tf.train.get_global_step())
return tf.estimator.EstimatorSpec(mode,
loss=loss,
train_op=train_op,
eval_metric_ops=eval_metric_ops)

訓練與評估

完成了model_fn之後,拜TensorFlow Estimator框架所賜,訓練與評估變得非常簡單,設定超參數之後(注意在指定optimizer時設置了L1/L2的正則權重),調用tf.estimator.train_and_evaluate即可。

def get_hparams():
vocab_sizes = {
numeric: 13,
# there are totally 14738 categorical tags occur >= 200
# since 0 is reserved for OOV, so total vocab_size=14739
categorical: 14739
}

optimizer = tf.train.ProximalAdagradOptimizer(
learning_rate=0.01,
l1_regularization_strength=0.001,
l2_regularization_strength=0.001)

return {
embed_dim: 128,
vocab_sizes: vocab_sizes,
# 在這個case中,沒有多個field共享同一個vocab的情況,而且field_name和vocab_name相同
field_vocab_mapping: {numeric: numeric, categorical: categorical},
dropout_rate: 0.3,
batch_norm: False,
hidden_units: [64, 32],
optimizer: optimizer
}

if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
tf.set_random_seed(999)

hparams = get_hparams()
deepfm = tf.estimator.Estimator(model_fn=model_fn,
model_dir=models/criteo,
params=hparams)

train_spec = tf.estimator.TrainSpec(input_fn=lambda: input_fn(data_file=dataset/criteo/whole_train.tsv,
n_repeat=10,
batch_size=128,
batches_per_shuffle=10))

eval_spec = tf.estimator.EvalSpec(input_fn=lambda: input_fn(data_file=dataset/criteo/whole_test.tsv,
n_repeat=1,
batch_size=128,
batches_per_shuffle=-1))

tf.estimator.train_and_evaluate(deepfm, train_spec, eval_spec)

測試集上的部分結果所下所示,測試集上的AUC在0.765左右,沒有Kaggle solution上0.8+的AUC高。正如前文所說的,將原來criteo數據集中的39列拆分成2個field,只是為了演示「一列多值」、「稀疏」的DeepFM實現,但限制了Deep側的交叉能力,對最終模型的性能造成一定負面影響。不過,仍然證明,文中展示的DeepFM實現是正確的。

小結

本文展示了我寫的一套基於TensorFlow的DeepFM的實現。重點闡述了「一列多值」、「稀疏」、「權重共享」在實際推薦系統中的重要性,和我是如何在DeepFM中實現以上需求的。歡迎各位看官指正。


推薦閱讀:
查看原文 >>
相關文章