在實際項目或者刷競賽的時候,經常會遇到訓練數據非常大導致一些演算法實際上不能操作的問題。比如在廣告行業中,因為DSP的請求數據量特別大,一個星期的數據往往有上百G,這種級別的數據在訓練的時候,直接套用一些演算法框架是沒辦法訓練的,基本上在特徵工程的階段就一籌莫展。通常採用採樣、截斷的方式獲取更小的數據集,或者使用大數據集群的方式進行訓練,但是這兩種方式在作者看來目前存在兩個問題:
我自己以前在刷競賽的時候,看到別人使用過FM+FTRL的模型實現了一個CTR演算法,印象很深。自己使用的是DNN+Embedding的方式做的一個演算法模型,從理論上看embedding肯定比one-hot encoder的方式更加先進且能真實反饋特徵數據的相關性,但是實際效果看對方的FM_FRTL得到的AUC比我高近一個百分點,而且可以在10G的數據上一個多小時跑完,而我的DNN+Embedding演算法,因為沒有GPU主機,跑一次需要十二個小時,嚴重影響了調參的積極性,這也是我非常想掌握FTRL的出發點。
更重要的是,考慮到刷競賽與實際演算法是否可工程化的的角度,FTRL結合LR或者FM是一個非常好的方向,比Top1開源的代碼使用了PCA、NLP處理特徵以及多個模型stacking的技巧,更具有學習或者借鑒的價值。
本文主要根據谷歌給出的FTRL理論論文,以及FTRL+LR的工程化實現論文,從理論到工程化實現LR+FTRL的開發,任一後端開發人員都能根據文末給出的python代碼,簡單的開發就能實現一個簡單、高性能、高可靠的CTR預測模型。
關鍵概念點:
1.logistic distribution 2.幾率 3.對數幾率 4.損失函數 5.參數估計 6.誤差計算 7.隨機梯度下降
LR(logistic regression model)是一種分類模型,我們通常都知道:
但是對其實際的數學原理卻不是很了解。logistics regression中的logistic指的是統計學中的logistic distribution:
可以說LR是線性回歸模型通過logistic的分布函數,將預測結果映射到概率空間,進而預測不同分類的概率,其概率由條件概率P(Y|X)表示。
LR模型既可以做二分類也可以做多分類,因為從事的是廣告、營銷行業的模型預估,本文只針對二分類的LR模型(binary logistic regression model)做原理講解以及模型工程化實現。
二分類的LR模型是如下條件概率的分布:
幾率:如果一個事件發生的概率是p,那麼該事件發生的幾率是發生的概率/不發生的概率:
對數幾率:對幾率取對數。
代入公式(2)和公式(3)得到:
可以看出LR也可以認為是用線性回歸模型的預測結果來預測事件發生的對數幾率。
LR有很多優點,比如:
邏輯回歸模型學習時,對於給定的訓練集,可以使用極大似然估計法估計模型參數(將權重向量w和輸入向量x擴充一位),設:
似然函數是:
對數似然函數:
這樣邏輯回歸的參數優化問題就變成了以對數似然函數為目標的最優化問題,也就是對公式10求最大值時的參數w;
實際應用中,通常是使用最小化損失函數的概念來尋找最佳參數空間,所以對公式10取負號得到邏輯回歸模型的損失函數:
也就是我們常說的交叉熵損失函數,是一個高階可導連續凸函數。根據凸函數優化理論,可以使用梯度下降法、牛頓法、trust-region等方法進行優化。
以梯度下降法為例,計算l(w)的梯度為:
所以樣本第j個參數的優化方式為:
所有樣本的第j個參數更新方式為:
當樣本數據里N很大的時候,通常採用的是隨機梯度下降法,演算法如下所示:
while { for i in range(0,m): w_j = w_j + a * g_j }
隨機梯度下降的好處是可以實現分散式並行化,具體計算流程是:
主要是需要實現參數更新計算以及損失函數計算。
廣告、營銷、推薦行業傳統的機器學習開發流程基本是以下步驟:
這種方式主要存在兩個瓶頸:
針對這兩個問題,一般而言有兩種解決方式:
傳統的LR或者FM離線訓練方法中,通常使用L1正則化的方法獲取稀疏解;但是在線學習的時候,針對每一個樣本的梯度下降方向並不是全局的,而是一個隨機梯度下降的方式,簡單的使用L1正則化並不能獲得正確的稀疏解。
比較出名的在線最優化的方法有:
其中TG截斷比較武斷;FOBOS能獲得較好的精度,但是稀疏性較差;RDA演算法會在犧牲一定的精度條件下,獲得較好的稀疏性;而FTRL演算法技能提高OGD(online-gradient-descent)的精確度,又能獲得更好的稀疏性。
參數權重更新
FTRL演算法權重更新公式為:
由於 相對於W來說是一個常數,並且令 :
公式15等價於:
針對特徵權重的各個維度將其拆解成N個單獨的標量最小化問題:
公式17是一個無約束的非平滑參數優化問題,其中第二項 在 處不可導。假設 是其最優解,並且定義 是 在? 的次導數,那麼有:
對公式17進行求導,並等於0,有:
由於 大於0,針對公式19分三種情況:
綜合上面的分析,可以得到FTRL特徵權重各個維度更新的公式為:
per-coordinate learning rates
在FTRL中,每個維度的學習率都是單獨考慮的。標準的OGD理論對每一個樣本都是使用的相同的學習率: 。
一個簡單的思維推理就能證明這種情況並不一定是最理想的:假如我們每一輪扔十個硬幣,然後使用LR去預估i個硬幣正面朝上的概率: 。假設使用相同的學習率 ,其中 是 正面朝上的次數。如果 比 正面的次數更多,那麼 的學習率應該比 下降的更快,表現為 具有更多的數據。 的學習率保持的比較高,意味著對 的新數據更敏感。
從另一個方面說,如果 從沒有投出正面,但是我們依舊在調低 的學習率,這明顯是不合理的。
因此,至少對於某些問題而言,使用per-coordinate learning rates是有好處的。谷歌使用以下的公式計算per-coordinate learning rates:
其中 是第s個樣本計算的梯度向量 中 coordinate gradient。
谷歌經驗表面, 通常是要調節的, 取1就好,且在使用了per-coordinate learning rates之後,AUCLoss下降了11.2%,對於谷歌的廣告系統而言,1%的AUCLoss降低就被認為是很好的優化了。
工程化實現偽代碼
谷歌給出了FTRL的工程化演算法偽碼,其中
其它谷歌實現的FTRL工程化技巧
根據前面的學習,可以使用LR作為基本學習器,使用FTRL作為在線最優化的方法來獲取LR的權重係數,從而達到在不損失精度的前提下獲得稀疏解的目標。
工程化實現的幾個核心點是:
演算法代碼實現如下所示,這裡只給出了核心部分代碼,主要做了以下優化:
from datetime import datetime from csv import DictReader from math import exp, log, sqrt import gzip import random import json import argparse
class FTRLProximal(object): """ FTRL Proximal engineer project with logistic regression Reference: https://static.googleusercontent.com/media/research.google.com/zh-CN//pubs/archive/41159.pdf """
def __init__(self, alpha, beta, L1, L2, D, interaction=False, dropout=1.0, dayfeature=True, device_counters=False):
# parameters self.alpha = alpha self.beta = beta self.L1 = L1 self.L2 = L2 self.dayfeature = dayfeature self.device_counters = device_counters
# feature related parameters self.D = D self.interaction = interaction self.dropout = dropout
# model self.n = [0.] * D self.z = [0.] * D self.w = [0.] * D
def _indices(self, x): A helper generator that yields the indices in x The purpose of this generator is to make the following code a bit cleaner when doing feature interaction.
for i in x: yield i
if self.interaction: D = self.D L = len(x) for i in range(1, L): # skip bias term, so we start at 1 for j in range(i + 1, L): # one-hot encode interactions with hash trick yield abs(hash(str(x[i]) + _ + str(x[j]))) % D
def predict(self, x, dropped=None): """ use x and computed weight to get predict :param x: :param dropped: :return: """ # wTx is the inner product of w and x wTx = 0. for j, i in enumerate(self._indices(x)):
if dropped is not None and dropped[j]: continue
wTx += self.w[i]
if dropped is not None: wTx /= self.dropout
# bounded sigmoid function, this is the probability estimation return 1. / (1. + exp(-max(min(wTx, 35.), -35.)))
def update(self, x, y): """ update weight and coordinate learning rate based on x and y :param x: :param y: :return: """
ind = [i for i in self._indices(x)]
if self.dropout == 1: dropped = None else: dropped = [random.random() > self.dropout for i in range(0, len(ind))]
p = self.predict(x, dropped)
# gradient under logloss g = p - y
# update z and n for j, i in enumerate(ind):
# implement dropout as overfitting prevention if dropped is not None and dropped[j]: continue
g_i = g * i sigma = (sqrt(self.n[i] + g_i * g_i) - sqrt(self.n[i])) / self.alpha self.z[i] += g_i - sigma * self.w[i] self.n[i] += g_i * g_i
sign = -1. if self.z[i] < 0 else 1. # get sign of z[i]
# build w on the fly using z and n, hence the name - lazy weights - if sign * self.z[i] <= self.L1: # w[i] vanishes due to L1 regularization self.w[i] = 0. else: # apply prediction time L1, L2 regularization to z and get self.w[i] = (sign * self.L1 - self.z[i]) / ((self.beta + sqrt(self.n[i])) / self.alpha + self.L2)
def save_model(self, save_file): """ 保存weight數據到本地 :param save_file: :return: """ with open(save_file, "w") as f: w = {k: v for k, v in enumerate(self.w) if v != 0} z = {k: v for k, v in enumerate(self.z) if v != 0} n = {k: v for k, v in enumerate(self.n) if v != 0} data = { w: w, z: z, n: n } json.dump(data, f)
def load_weight(self, model_file, D): """ loada weight data :param model_file: :return: """ with open(model_file, "r") as f: data = json.load(f) self.w = data.get(w, [0.] * D) self.z = data.get(z, [0.] * D) self.n = data.get(n, [0.] * D)
@staticmethod def loss(y, y_pred): """ log loss for LR model :param y: :param y_pred: :return: """ p = max(min(y_pred, 1. - 10e-15), 10e-15) return -log(p) if y == 1. else -log(1. - p)
def data(f_train, D, dayfilter=None, dayfeature=True, counters=False): GENERATOR: Apply hash-trick to the original csv row and for simplicity, we one-hot-encode everything INPUT: path: path to training or testing file D: the max index that we can hash to YIELDS: ID: id of the instance, mainly useless x: a list of hashed and one-hot-encoded indices we only need the index since all values are either 0 or 1 y: y = 1 if we have a click, else we have y = 0
device_ip_counter = {} device_id_counter = {}
for t, row in enumerate(DictReader(f_train)): # process id ID = row[id] del row[id]
# process clicks y = 0. if click in row: if row[click] == 1: y = 1. del row[click]
# turn hour really into hour, it was originally YYMMDDHH
date = row[hour][0:6] row[hour] = row[hour][6:]
if dayfilter != None and not date in dayfilter: continue
if dayfeature: # extract date row[wd] = str(int(date) % 7) row[wd_hour] = "%s_%s" % (row[wd], row[hour])
if counters: d_ip = row[device_ip] d_id = row["device_id"] try: device_ip_counter[d_ip] += 1 device_id_counter[d_id] += 1 except KeyError: device_ip_counter[d_ip] = 1 device_id_counter[d_id] = 1 row["ipc"] = str(min(device_ip_counter[d_ip], 8)) row["idc"] = str(min(device_id_counter[d_id], 8))
# build x x = [0] # 0 is the index of the bias term for key in row: value = row[key] # one-hot encode everything with hash trick index = abs(hash(key + _ + value)) % D x.append(index) yield t, ID, x, y
推薦閱讀: