接下来是核心实现的部分:
for variable, target in product(self.categorical_features, self.target_values):
nf_name = {}_pred_{}.format(variable, target)
X_new.loc[:, nf_name] = np.nan
for large_ind, small_ind in skf.split(y, y):
nf_large, nf_small, prior, col_avg_y = MeanEncoder.mean_encode_subroutine(
X_new.iloc[large_ind], y.iloc[large_ind], X_new.iloc[small_ind], variable, target, self.prior_weight_func)
X_new.iloc[small_ind, -1] = nf_small
self.learned_stats[nf_name].append((prior, col_avg_y))
X_new中通过loc函数先占个坑,然后进入交叉验证:(补充:建议原始的输入变数X和y先shuffle一下再进入计算)
然后我们就进入了核心的实现 MeanEncoder.mean_encode_subroutine(静态函数):
@staticmethod
def mean_encode_subroutine(X_train, y_train, X_test, variable, target, prior_weight_func):
X_train = X_train[[variable]].copy()
X_test = X_test[[variable]].copy()
if target is not None:
nf_name = {}_pred_{}.format(variable, target)
X_train[pred_temp] = (y_train == target).astype(int) # classification
else:
nf_name = {}_pred.format(variable)
X_train[pred_temp] = y_train # regression
prior = X_train[pred_temp].mean()
col_avg_y = X_train.groupby(by=variable, axis=0)[pred_temp].agg({mean: mean, beta: size})
col_avg_y[beta] = prior_weight_func(col_avg_y[beta])
col_avg_y[nf_name] = col_avg_y[beta] * prior + (1 - col_avg_y[beta]) * col_avg_y[mean]
col_avg_y.drop([beta, mean], axis=1, inplace=True)
nf_train = X_train.join(col_avg_y, on=variable)[nf_name].values
nf_test = X_test.join(col_avg_y, on=variable).fillna(prior, inplace=False)[nf_name].values
return nf_train, nf_test, prior, col_avg_y
还是举个例子好理解吧,假设这里categorical_features=[f1,f2,f3],target=[0,1,2],那么这里我们以variable=『f1』,target=0为
例来计算,首先是去原始数据中标签为0的样本的f1特征:
X_train[pred_temp] = (y_train == target).astype(int)
然后是根据X_train[pred_temp]的来计算target为0的样本的占比情况以作为prior先验概率的值。
prior = X_train[pred_temp].mean()
然后接下来的计算方式和target encoding是一致的:
col_avg_y = X_train.groupby(by=variable, axis=0)[pred_temp].agg({mean: mean, beta: size})
col_avg_y[beta] = prior_weight_func(col_avg_y[beta])
col_avg_y[nf_name] = col_avg_y[beta] * prior + (1 - col_avg_y[beta]) * col_avg_y[mean]
col_avg_y.drop([beta, mean], axis=1, inplace=True)
唯一不同的方式是,mean encoding这里用到了交叉计算的方式,以5折交叉为例,在80%的数据上计算编码结果得到转换的规则,
然后将剩下20%的数据按照转换规则进行转换,最后将结果返回:
nf_train = X_train.join(col_avg_y, on=variable)[nf_name].values
nf_test = X_test.join(col_avg_y, on=variable).fillna(prior, inplace=False)[nf_name].values
return nf_train, nf_test, prior, col_avg_y
最后:
X_new.iloc[small_ind, -1] = nf_small
把测试集的转换结果赋给原始数据的copy。
综上所属,mean encoding的原理和target encoding基本是一样的,只不过比target encoding多了一个交叉计算的步骤,假设有10000条数据,
target encoding是直接在这10000条数据上进行编码结果的计算的,而mean encoding则是每次用类似与模型训练的方法,比如五折交叉计算,用
80%的数据计算编码结果然后赋给剩下的20%,重复5次则所有特征都编码完毕,这样的好处就是一定程度上降低过拟合的影响。
完整代码如下,回归的原理类似公式如下,自己看吧:(注意,输入X,y都必须是pandas格式的否则会报错)
import numpy as np
import pandas as pd
from sklearn.model_selection import StratifiedKFold,KFold
from itertools import product
class MeanEncoder:
def __init__(self, categorical_features, n_splits=5, target_type=classification, prior_weight_func=None):
"""
:param categorical_features: list of str, the name of the categorical columns to encode
:param n_splits: the number of splits used in mean encoding
:param target_type: str, regression or classification
:param prior_weight_func:
a function that takes in the number of observations, and outputs prior weight
when a dict is passed, the default exponential decay function will be used:
k: the number of observations needed for the posterior to be weighted equally as the prior
f: larger f --> smaller slope
"""
self.categorical_features = categorical_features
self.n_splits = n_splits
self.learned_stats = {}
if target_type == classification:
self.target_type = target_type
self.target_values = []
else:
self.target_type = regression
self.target_values = None
if isinstance(prior_weight_func, dict):
self.prior_weight_func = eval(lambda x: 1 / (1 + np.exp((x - k) / f)), dict(prior_weight_func, np=np))
elif callable(prior_weight_func):
self.prior_weight_func = prior_weight_func
else:
self.prior_weight_func = lambda x: 1 / (1 + np.exp((x - 2) / 1))
@staticmethod
def mean_encode_subroutine(X_train, y_train, X_test, variable, target, prior_weight_func):
X_train = X_train[[variable]].copy()
X_test = X_test[[variable]].copy()
if target is not None:
nf_name = {}_pred_{}.format(variable, target)
X_train[pred_temp] = (y_train == target).astype(int) # classification
else:
nf_name = {}_pred.format(variable)
X_train[pred_temp] = y_train # regression
prior = X_train[pred_temp].mean()
col_avg_y = X_train.groupby(by=variable, axis=0)[pred_temp].agg({mean: mean, beta: size})
col_avg_y[beta] = prior_weight_func(col_avg_y[beta])
col_avg_y[nf_name] = col_avg_y[beta] * prior + (1 - col_avg_y[beta]) * col_avg_y[mean]
col_avg_y.drop([beta, mean], axis=1, inplace=True)
nf_train = X_train.join(col_avg_y, on=variable)[nf_name].values
nf_test = X_test.join(col_avg_y, on=variable).fillna(prior, inplace=False)[nf_name].values
return nf_train, nf_test, prior, col_avg_y
def fit_transform(self, X, y):
"""
:param X: pandas DataFrame, n_samples * n_features
:param y: pandas Series or numpy array, n_samples
:return X_new: the transformed pandas DataFrame containing mean-encoded categorical features
"""
X_new = X.copy()
if self.target_type == classification:
skf = StratifiedKFold(self.n_splits)
else:
skf = KFold(self.n_splits)
if self.target_type == classification:
self.target_values = sorted(set(y))
self.learned_stats = {{}_pred_{}.format(variable, target): [] for variable, target in
product(self.categorical_features, self.target_values)}
for variable, target in product(self.categorical_features, self.target_values):
nf_name = {}_pred_{}.format(variable, target)
X_new.loc[:, nf_name] = np.nan
for large_ind, small_ind in skf.split(y, y):
nf_large, nf_small, prior, col_avg_y = MeanEncoder.mean_encode_subroutine(
X_new.iloc[large_ind], y.iloc[large_ind], X_new.iloc[small_ind], variable, target, self.prior_weight_func)
X_new.iloc[small_ind, -1] = nf_small
self.learned_stats[nf_name].append((prior, col_avg_y))
else:
self.learned_stats = {{}_pred.format(variable): [] for variable in self.categorical_features}
for variable in self.categorical_features:
nf_name = {}_pred.format(variable)
X_new.loc[:, nf_name] = np.nan
for large_ind, small_ind in skf.split(y, y):
nf_large, nf_small, prior, col_avg_y = MeanEncoder.mean_encode_subroutine(
X_new.iloc[large_ind], y.iloc[large_ind], X_new.iloc[small_ind], variable, None, self.prior_weight_func)
X_new.iloc[small_ind, -1] = nf_small
self.learned_stats[nf_name].append((prior, col_avg_y))
return X_new
def transform(self, X):
"""
:param X: pandas DataFrame, n_samples * n_features
:return X_new: the transformed pandas DataFrame containing mean-encoded categorical features
"""
X_new = X.copy()
if self.target_type == classification:
for variable, target in product(self.categorical_features, self.target_values):
nf_name = {}_pred_{}.format(variable, target)
X_new[nf_name] = 0
for prior, col_avg_y in self.learned_stats[nf_name]:
X_new[nf_name] += X_new[[variable]].join(col_avg_y, on=variable).fillna(prior, inplace=False)[
nf_name]
X_new[nf_name] /= self.n_splits
else:
for variable in self.categorical_features:
nf_name = {}_pred.format(variable)
X_new[nf_name] = 0
for prior, col_avg_y in self.learned_stats[nf_name]:
X_new[nf_name] += X_new[[variable]].join(col_avg_y, on=variable).fillna(prior, inplace=False)[
nf_name]
X_new[nf_name] /= self.n_splits
return X_new
推荐阅读: