WDL架构图 与Wide & Deep模型相比,Deep FM的低阶特征组合与高阶特征组合共享输入,避免了Wide & Deep模型需要预处理低阶特征组合模型的输入,同时通过对低阶特征组合模型与高阶特征组合模型的组合优化,获得了更好的泛化能力。
模型训练
建模主要使用了tensorflow的keras API,对sparse特征进行label encoder之后,使用embedding layer输入;对连续特征归一化后进dense layer,参照了DeepFM with tensorflow 以及另一位大神的思路。这里没有连续特征,所以只对sparse特征进行处理,进embedding层之前首先需要判断embeding层的size,代码如下:
SingleFeat = collections . namedtuple (
SingleFeat , [ name , dimension , ])
sparse_feature_list = [ SingleFeat ( feat , data [ feat ] . nunique ())
for feat in sparse_features ]
使用SingleFeat保存sparse特征在embedding层的权重数目。
模型训练代码如下所示:
import pandas as pd
from sklearn.preprocessing import LabelEncoder , MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import log_loss , roc_auc_score
from deepctr.models import DeepFM
from deepctr import SingleFeat
# from tensorflow.keras.utils import multi_gpu_model
import argparse
import os
from tensorflow.python.keras.metrics import binary_crossentropy
from deepctr.udf_metrics import auc
from tensorflow.python.keras.callbacks import EarlyStopping
import tensorflow as tf
from tensorflow import keras
import shutil
import pickle
import numpy as np
def batcher ( X_ , y_ = None , batch_size =- 1 ):
"""
keras fit_generator
:param X_:
:param y_:
:param batch_size:
:return:
"""
while True :
n_samples = X_ [ 0 ] . shape [ 0 ]
assert n_samples == y_ . shape [ 0 ], "input data sample shape not equal to label shape"
if batch_size == - 1 :
batch_size = n_samples
if batch_size < 1 :
raise ValueError ( Parameter batch_size={} is unsupported . format ( batch_size ))
for i in range ( 0 , n_samples , batch_size ):
upper_bound = min ( i + batch_size , n_samples )
feature_batch = []
for fea in X_ :
feature_batch . append ( fea [ i : upper_bound ])
ret_y = None
if y_ is not None :
ret_y = y_ [ i : upper_bound ]
yield ( feature_batch , ret_y )
if __name__ == "__main__" :
parser = argparse . ArgumentParser ()
parser . add_argument ( -m , --mode , action = store , default = "gpu" , dest = mode , help = "run mode" )
args = parser . parse_args ()
data_dir = os . path . abspath ( f "{os.path.abspath(os.path.dirname(os.path.realpath(__file__)))}/../../Data/avazu_ctr/" )
model_dir = os . path . abspath (
f "{os.path.abspath(os.path.dirname(os.path.realpath(__file__)))}/../../Data/avazu_serving/" )
data = pd . read_csv ( f {data_dir}/train_sample.txt )
# data_online =
# data = pd.read_csv(f{data_dir}/train.txt)
# sparse_features = [C + str(i) for i in range(1, 27)]
# dense_features = [I+str(i) for i in range(1, 14)]
sparse_features = [ hour , C1 , banner_pos , site_id , site_domain ,
site_category , app_id , app_domain , app_category , device_id ,
device_ip , device_model , device_type , device_conn_type , C14 ,
C15 , C16 , C17 , C18 , C19 , C20 , C21 ]
dense_features = []
data [ sparse_features ] = data [ sparse_features ] . fillna ( -1 , )
data [ dense_features ] = data [ dense_features ] . fillna ( 0 , )
target = [ click ]
# 1.Label Encoding for sparse features,and do simple Transformation for dense features
les = {}
for feat in sparse_features :
lbe = LabelEncoder ()
data [ feat ] = lbe . fit_transform ( data [ feat ])
les [ feat ] = lbe
loabel_encoder_file = f "{data_dir}/label_encodel.pkl"
with open ( loabel_encoder_file , wb ) as f :
pickle . dump ( les , f , - 1 )
# mms = MinMaxScaler(feature_range=(0, 1))
# data[dense_features] = mms.fit_transform(data[dense_features])
# 2.count #unique features for each sparse field,and record dense feature field name
sparse_feature_list = [ SingleFeat ( feat , data [ feat ] . nunique ())
for feat in sparse_features ]
dense_feature_list = [ SingleFeat ( feat , 0 )
for feat in dense_features ]
# 3.generate input data for model
train , test = train_test_split ( data , test_size = 0.2 )
train_model_input = [ train [ feat . name ] . values . reshape (( - 1 , 1 )) for feat in sparse_feature_list ]
test_model_input = [ test [ feat . name ] . values . reshape (( - 1 , 1 )) for feat in sparse_feature_list ]
refit_data_input = [ data [ feat . name ] . values . reshape (( - 1 , 1 )) for feat in sparse_feature_list ]
# 4.Define Model,train,predict and evaluate
model = DeepFM ({ "sparse" : sparse_feature_list ,
}, final_activation = sigmoid )
auc_stop = EarlyStopping ( monitor = val_auc , min_delta = 0.0001 , patience = 0 , verbose = 1 , mode = max )
call_backs = [ auc_stop ]
model . summary ()
if args . mode == cpu :
print ( f "run mode in cpu" )
# history = model.fit_generator(train_model_input, train[target].values,
# batch_size=256, epochs=10, verbose=2, validation_split=0.2, )
model . compile ( "adam" , "binary_crossentropy" ,
metrics = [ binary_crossentropy , auc ], )
with keras . backend . get_session () as sess :
init_op = tf . global_variables_initializer ()
sess . run ( init_op )
history = model . fit_generator (
batcher ( train_model_input , train [ target ] . values . reshape (( - 1 , 1 )), 256 ),
train_model_input [ 0 ] . shape [ 0 ] // 256 ,
epochs = 1 ,
verbose = 1 ,
validation_data = ( test_model_input , test [ target ] . values . reshape (( - 1 , 1 ))),
use_multiprocessing = True ,
callbacks = call_backs
)
pred_ans = model . predict ( test_model_input , batch_size = 256 )
print ( "test LogLoss" , round ( log_loss ( test [ target ] . values , pred_ans ), 4 ))
print ( "test AUC" , round ( roc_auc_score ( test [ target ] . values , pred_ans ), 4 ))
pred_all = model . predict ( refit_data_input , batch_size = 256 )
print ( "all AUC" , round ( roc_auc_score ( data [ target ] . values , pred_all ), 4 ))
pred_score_file = f "{data_dir}/all_predict.npy"
np . save ( pred_score_file , pred_all )
print ( f "all predict score is {pred_all}" )
model_dir = f "{model_dir}/1"
if os . path . isdir ( model_dir ):
print (
Already saved a model, cleaning up
)
shutil . rmtree ( model_dir )
tf . saved_model . simple_save (
sess ,
model_dir ,
inputs = { i . name : i for i in model . inputs },
outputs = { "ctr" : model . output })
else :
print ( f "run mode in gpu" )
pass
# gpu_model = multi_gpu_model(
# model=model,
# gpus=[1, 2, 3]
# )
# gpu_model.compile("adam", "binary_crossentropy",
# metrics=[binary_crossentropy, auc], )
# history = gpu_model.fit_generator(
# batcher(train_model_input, train[target].values.reshape((-1, 1)), 256),
# train_model_input[0].shape[0] // 256,
# epochs=10,
# verbose=1,
# validation_data=(test_model_input, test[target].values.reshape((-1, 1))),
# callbacks=call_backs
# )
# pred_ans = gpu_model.predict(test_model_input, batch_size=256)
# print("test LogLoss", round(log_loss(test[target].values, pred_ans), 4))
# print("test AUC", round(roc_auc_score(test[target].values, pred_ans), 4))
支持CPU和GPU两种训练模式,具体哪一种训练模式需要根据命令行参数决定;这里对Deep FM模型做了一个整体的封装,这种封装形式将sparse特征和连续特征的处理方式标准化,而且支持LR+FM+DEEP参数配置,可以选择不同的特征组合方式来进行训练,比如只使用DEEP、只使用FM、只使用LR、或者两两组合。
增加了early stopping,模型在训练的时候可以根据valid数据集上面的AUC分数是否还在继续优化决定当前epoch模型训练是否退出。
模型保存
模型保存代码如下所示:
model_dir = f "{model_dir}/1"
if os . path . isdir ( model_dir ):
print (
Already saved a model, cleaning up
)
shutil . rmtree ( model_dir )
tf . saved_model . simple_save (
sess ,
model_dir ,
inputs = { i . name : i for i in model . inputs },
outputs = { "ctr" : model . output })
需要注意的是,模型保存的时候要带上版本号,否则在模型载入的时候会报错;此外,官方给的例子只有这几行代码,但是实际上需要模型训练的时候进行全局初始化,否则模型保存时候会报出有未初始化的值。
docker安装
环境配置的难题
软体开发最大的麻烦事之一,就是环境配置。用户计算机的环境都不相同,你怎么知道自家的软体,能在那些机器跑起来?
用户必须保证两件事:操作系统的设置,各种库和组件的安装。只有它们都正确,软体才能运行。举例来说,安装一个 Python 应用,计算机必须有 Python 引擎,还必须有各种依赖,可能还要配置环境变数。
如果某些老旧的模块与当前环境不兼容,那就麻烦了。开发者常常会说:"它在我的机器可以跑了"(It works on my machine),言下之意就是,其他机器很可能跑不了。
环境配置如此麻烦,换一台机器,就要重来一次,旷日费时。很多人想到,能不能从根本上解决问题,软体可以带环境安装?也就是说,安装的时候,把原始环境一模一样地复制过来。
虚拟机
虚拟机(virtual machine)就是带环境安装的一种解决方案。它可以在一种操作系统里面运行另一种操作系统,比如在 Windows 系统里面运行 Linux 系统。应用程序对此毫无感知,因为虚拟机看上去跟真实系统一模一样,而对于底层系统来说,虚拟机就是一个普通文件,不需要了就删掉,对其他部分毫无影响。
虽然用户可以通过虚拟机还原软体的原始环境。但是,这个方案有几个缺点。
资源占用多
虚拟机会独占一部分内存和硬碟空间。它运行的时候,其他程序就不能使用这些资源了。哪怕虚拟机里面的应用程序,真正使用的内存只有 1MB,虚拟机依然需要几百 MB 的内存才能运行。
冗余步骤多
虚拟机是完整的操作系统,一些系统级别的操作步骤,往往无法跳过,比如用户登录。
启动慢
启动操作系统需要多久,启动虚拟机就需要多久。可能要等几分钟,应用程序才能真正运行。
Linux 容器
由于虚拟机存在这些缺点,Linux 发展出了另一种虚拟化技术:Linux 容器(Linux Containers,缩写为 LXC)。
Linux 容器不是模拟一个完整的操作系统,而是对进程进行隔离。 或者说,在正常进程的外面套了一个保护层。对于容器里面的进程来说,它接触到的各种资源都是虚拟的,从而实现与底层系统的隔离。
由于容器是进程级别的,相比虚拟机有很多优势。
启动快
容器里面的应用,直接就是底层系统的一个进程,而不是虚拟机内部的进程。所以,启动容器相当于启动本机的一个进程,而不是启动一个操作系统,速度就快很多。
资源占用少
容器只占用需要的资源,不占用那些没有用到的资源;虚拟机由于是完整的操作系统,不可避免要占用所有资源。另外,多个容器可以共享资源,虚拟机都是独享资源。
体积小
容器只要包含用到的组件即可,而虚拟机是整个操作系统的打包,所以容器文件比虚拟机文件要小很多。
总之,容器有点像轻量级的虚拟机,能够提供虚拟化的环境,但是成本开销小得多。
docker微服务 Docker 属于 Linux 容器的一种封装,提供简单易用的容器使用介面。 它是目前最流行的 Linux 容器解决方案。
Docker 将应用程序与该程序的依赖,打包在一个文件里面。运行这个文件,就会生成一个虚拟容器。程序在这个虚拟容器里运行,就好像在真实的物理机上运行一样。有了 Docker,就不用担心环境问题。
总体来说,Docker 的介面相当简单,用户可以方便地创建和使用容器,把自己的应用放入容器。容器还可以进行版本管理、复制、分享、修改,就像管理普通的代码一样。
Docker 的主要用途,目前有三大类。
(1)提供一次性的环境。 比如,本地测试他人的软体、持续集成的时候提供单元测试和构建的环境。
(2)提供弹性的云服务。 因为 Docker 容器可以随开随关,很适合动态扩容和缩容。
(3)组建微服务架构。 通过多个容器,一台机器可以跑多个服务,因此在本机就可以模拟出微服务架构。
安装流程
以macOS为例,docker安装非常的简单:brew cask install homebrew/cask/docker
,点击鲸鱼可以弹出操作菜单:
docker下拉菜单
镜像加速:
镜像加速配置 在terminal运行docker info
来查看是否配置成功:
Registry Mirrors:
http://hub-mirror.c.163.com/
Live Restore Enabled: false
Product License: Community Engine
TensorFlow serving
启动TensorFlow serving服务
#!/usr/bin/env bash
docker run -p 8500 :8501 --mount type = bind,source= $MODEL_PATH , target = /models/deepFM -e MODEL_NAME = deepFM -t tensorflow/serving &
使用上述脚本即可启动一个TensorFlow serving,其中MODEL_PATH对应的就是上面代码输出的训练模型路径。
模型启动
获取TensorFlow serving预测结果
post man获取结果
postman api测试结果 body里面的signature_name是用户在生成模型时候定义的,暂时不管。instance代表要预测的数据是按行输入,数据结构为List of dictionary,每一个dictionary代买一行需要预测的样本,每一个dictionary的key值代表输入到模型里面去的variable name,也就是前面模型输出的时候定义的inputs里面的key值。
可以通过saved_model_cli查看保存模型需要的输入特征称:
MetaGraphDef with tag-set: serve contains the following SignatureDefs:
signature_def[ serving_default ] :
The given SavedModel SignatureDef contains the following input( s) :
inputs[ sparse_0-hour:0 ] tensor_info:
dtype: DT_FLOAT
shape: ( -1, 1 )
name: sparse_0-hour:0
inputs[ sparse_1-C1:0 ] tensor_info:
dtype: DT_FLOAT
shape: ( -1, 1 )
name: sparse_1-C1:0
inputs[ sparse_10-device_ip:0 ] tensor_info:
dtype: DT_FLOAT
shape: ( -1, 1 )
name: sparse_10-device_ip:0
inputs[ sparse_11-device_model:0 ] tensor_info:
dtype: DT_FLOAT
shape: ( -1, 1 )
name: sparse_11-device_model:0
inputs[ sparse_12-device_type:0 ] tensor_info:
dtype: DT_FLOAT
shape: ( -1, 1 )
name: sparse_12-device_type:0
inputs[ sparse_13-device_conn_type:0 ] tensor_info:
dtype: DT_FLOAT
shape: ( -1, 1 )
name: sparse_13-device_conn_type:0
inputs[ sparse_14-C14:0 ] tensor_info:
dtype: DT_FLOAT
shape: ( -1, 1 )
name: sparse_14-C14:0
inputs[ sparse_15-C15:0 ] tensor_info:
dtype: DT_FLOAT
shape: ( -1, 1 )
name: sparse_15-C15:0
inputs[ sparse_16-C16:0 ] tensor_info:
dtype: DT_FLOAT
shape: ( -1, 1 )
name: sparse_16-C16:0
inputs[ sparse_17-C17:0 ] tensor_info:
dtype: DT_FLOAT
shape: ( -1, 1 )
name: sparse_17-C17:0
inputs[ sparse_18-C18:0 ] tensor_info:
dtype: DT_FLOAT
shape: ( -1, 1 )
name: sparse_18-C18:0
inputs[ sparse_19-C19:0 ] tensor_info:
dtype: DT_FLOAT
shape: ( -1, 1 )
name: sparse_19-C19:0
inputs[ sparse_2-banner_pos:0 ] tensor_info:
dtype: DT_FLOAT
shape: ( -1, 1 )
name: sparse_2-banner_pos:0
inputs[ sparse_20-C20:0 ] tensor_info:
dtype: DT_FLOAT
shape: ( -1, 1 )
name: sparse_20-C20:0
inputs[ sparse_21-C21:0 ] tensor_info:
dtype: DT_FLOAT
shape: ( -1, 1 )
name: sparse_21-C21:0
inputs[ sparse_3-site_id:0 ] tensor_info:
dtype: DT_FLOAT
shape: ( -1, 1 )
name: sparse_3-site_id:0
inputs[ sparse_4-site_domain:0 ] tensor_info:
dtype: DT_FLOAT
shape: ( -1, 1 )
name: sparse_4-site_domain:0
inputs[ sparse_5-site_category:0 ] tensor_info:
dtype: DT_FLOAT
shape: ( -1, 1 )
name: sparse_5-site_category:0
inputs[ sparse_6-app_id:0 ] tensor_info:
dtype: DT_FLOAT
shape: ( -1, 1 )
name: sparse_6-app_id:0
inputs[ sparse_7-app_domain:0 ] tensor_info:
dtype: DT_FLOAT
shape: ( -1, 1 )
name: sparse_7-app_domain:0
inputs[ sparse_8-app_category:0 ] tensor_info:
dtype: DT_FLOAT
shape: ( -1, 1 )
name: sparse_8-app_category:0
inputs[ sparse_9-device_id:0 ] tensor_info:
dtype: DT_FLOAT
shape: ( -1, 1 )
name: sparse_9-device_id:0
The given SavedModel SignatureDef contains the following output( s) :
outputs[ ctr ] tensor_info:
dtype: DT_FLOAT
shape: ( -1, 1 )
name: prediction_layer/Reshape:0
Method name is: tensorflow/serving/predict
使用Restful API调用模型
前面post man的形式只是测试API调用是否成功,实际上预估的结果肯定是不对的,因为模型训练的时候有特征转换的操作,而这里直接传输的是原始数据。需要将前训练的时候得到的Label Encoder模型load起来,并对原始数据做特征转换之后在发送request请求到TensorFlow serving里面。
"""
本文件主要用于向deepFM模型发起restful api请求, 获取ctr预估值
请求数据源来自于train_sample.txt,需要保证以下几点:
1. 逐条预测得到的ctr值与模型直接预测的值一模一样。
2. 一次预测的值与模型预测的一模一样。
3. 所有的请求预测得到的AUC与模型直接预测得到的一模一样。
pandas int64的type是无法被序列化的
"""
import numpy
import tensorflow as tf
from tensorflow_serving.apis import predict_pb2
from tensorflow_serving.apis import prediction_service_pb2_grpc
import os
import pandas as pd
import requests
import pickle
import json
import numpy as np
from sklearn.metrics import roc_auc_score
SERVING_URL = "http://localhost:8500/v1/models/deepFM:predict"
data_dir = f "{os.path.dirname(__file__)}/../../Data"
predict_data = f "{os.path.abspath(data_dir)}/avazu_ctr/train_sample.txt"
sparse_features = [ hour , C1 , banner_pos , site_id , site_domain ,
site_category , app_id , app_domain , app_category , device_id ,
device_ip , device_model , device_type , device_conn_type , C14 ,
C15 , C16 , C17 , C18 , C19 , C20 , C21 ]
encoding_file = f "{os.path.abspath(data_dir)}/avazu_ctr/label_encodel.pkl"
with open ( encoding_file , rb ) as f :
les = pickle . load ( f )
df = pd . read_csv ( predict_data )
for col in sparse_features :
df [ col ] = les [ col ] . transform ( df [ col ])
df [ col ] = df [ col ]
rename_features = { j : f "sparse_{i}-{j}:0" for i , j in enumerate ( sparse_features )}
df = df . rename ( columns = rename_features )
X = df [ list ( rename_features . values ())] . to_json ( orient = records , lines = True )
X = X . split ( "
" )
y = df [ click ]
predict_data = []
for data in X :
data = json . loads ( data )
data_new = { x : [ y ] for x , y in data . items ()}
request_body = {
"signature_name" : "serving_default" ,
"instances" : [ data_new ]
}
print ( f "send request to tensorflow with data :{request_body}" )
headers = { "Content-type" : "application/json" }
data_post = json . dumps ( request_body )
print ( data_post )
response = requests . post ( SERVING_URL , data = data_post , headers = headers )
response_data = response . json ()
print ( response_data )
response . raise_for_status ()
predict_data . append ( response_data [ predictions ][ 0 ])
predict_data = np . array ( predict_data )
auc_score = roc_auc_score ( y , predict_data )
print ( f "tensorflow restful api get auc score is {auc_score}" )
日志输入如下:
send request to tensorflow with data :{signature_name: serving_default, instances: [{sparse_0-hour:0: [0], sparse_1-C1:0: [2], sparse_2-banner_pos:0: [0], sparse_3-site_id:0: [1], sparse_4-site_domain:0: [1], sparse_5-site_category:0: [7], sparse_6-app_id:0: [69], sparse_7-app_domain:0: [6], sparse_8-app_category:0: [0], sparse_9-device_id:0: [86], sparse_10-device_ip:0: [908], sparse_11-device_model:0: [291], sparse_12-device_type:0: [1], sparse_13-device_conn_type:0: [0], sparse_14-C14:0: [55], sparse_15-C15:0: [2], sparse_16-C16:0: [1], sparse_17-C17:0: [40], sparse_18-C18:0: [0], sparse_19-C19:0: [0], sparse_20-C20:0: [0], sparse_21-C21:0: [20]}]}
{"signature_name": "serving_default", "instances": [{"sparse_0-hour:0": [0], "sparse_1-C1:0": [2], "sparse_2-banner_pos:0": [0], "sparse_3-site_id:0": [1], "sparse_4-site_domain:0": [1], "sparse_5-site_category:0": [7], "sparse_6-app_id:0": [69], "sparse_7-app_domain:0": [6], "sparse_8-app_category:0": [0], "sparse_9-device_id:0": [86], "sparse_10-device_ip:0": [908], "sparse_11-device_model:0": [291], "sparse_12-device_type:0": [1], "sparse_13-device_conn_type:0": [0], "sparse_14-C14:0": [55], "sparse_15-C15:0": [2], "sparse_16-C16:0": [1], "sparse_17-C17:0": [40], "sparse_18-C18:0": [0], "sparse_19-C19:0": [0], "sparse_20-C20:0": [0], "sparse_21-C21:0": [20]}]}
{predictions: [[0.474098]]}
....
send request to tensorflow with data :{signature_name: serving_default, instances: [{sparse_0-hour:0: [0], sparse_1-C1:0: [2], sparse_2-banner_pos:0: [0], sparse_3-site_id:0: [17], sparse_4-site_domain:0: [103], sparse_5-site_category:0: [1], sparse_6-app_id:0: [69], sparse_7-app_domain:0: [6], sparse_8-app_category:0: [0], sparse_9-device_id:0: [86], sparse_10-device_ip:0: [515], sparse_11-device_model:0: [79], sparse_12-device_type:0: [1], sparse_13-device_conn_type:0: [0], sparse_14-C14:0: [21], sparse_15-C15:0: [2], sparse_16-C16:0: [1], sparse_17-C17:0: [14], sparse_18-C18:0: [0], sparse_19-C19:0: [0], sparse_20-C20:0: [0], sparse_21-C21:0: [12]}]}
{"signature_name": "serving_default", "instances": [{"sparse_0-hour:0": [0], "sparse_1-C1:0": [2], "sparse_2-banner_pos:0": [0], "sparse_3-site_id:0": [17], "sparse_4-site_domain:0": [103], "sparse_5-site_category:0": [1], "sparse_6-app_id:0": [69], "sparse_7-app_domain:0": [6], "sparse_8-app_category:0": [0], "sparse_9-device_id:0": [86], "sparse_10-device_ip:0": [515], "sparse_11-device_model:0": [79], "sparse_12-device_type:0": [1], "sparse_13-device_conn_type:0": [0], "sparse_14-C14:0": [21], "sparse_15-C15:0": [2], "sparse_16-C16:0": [1], "sparse_17-C17:0": [14], "sparse_18-C18:0": [0], "sparse_19-C19:0": [0], "sparse_20-C20:0": [0], "sparse_21-C21:0": [12]}]}
{predictions: [[0.473931]]}
tensorflow restful api get auc score is 0.7531147050014975
最终样本通过模型在线预测的AUC与离线计算的AUC一致,模型serving成功。
后续工作
1. 请求服务化: 将上述发起模型预测请求的代码使用tornado框架服务化。
2. schedule更新、载入模型 :可以使用crontab定时更新模型,tornado定时载入训练模型以及特征转换模型。
3. docker容器化
4. 模型优化
参考文献
DeepFM: A Factorization-Machine based Neural Network for CTR
Wide & Deep Learning for Recommender Systems
Deploying Keras models using TensorFlow Serving and Flask
tensorflow-DeepFM
项目代码(待传)
推荐阅读: