接下來是核心實現的部分:
for variable, target in product(self.categorical_features, self.target_values):
nf_name = {}_pred_{}.format(variable, target)
X_new.loc[:, nf_name] = np.nan
for large_ind, small_ind in skf.split(y, y):
nf_large, nf_small, prior, col_avg_y = MeanEncoder.mean_encode_subroutine(
X_new.iloc[large_ind], y.iloc[large_ind], X_new.iloc[small_ind], variable, target, self.prior_weight_func)
X_new.iloc[small_ind, -1] = nf_small
self.learned_stats[nf_name].append((prior, col_avg_y))
X_new中通過loc函數先佔個坑,然後進入交叉驗證:(補充:建議原始的輸入變數X和y先shuffle一下再進入計算)
然後我們就進入了核心的實現 MeanEncoder.mean_encode_subroutine(靜態函數):
@staticmethod
def mean_encode_subroutine(X_train, y_train, X_test, variable, target, prior_weight_func):
X_train = X_train[[variable]].copy()
X_test = X_test[[variable]].copy()
if target is not None:
nf_name = {}_pred_{}.format(variable, target)
X_train[pred_temp] = (y_train == target).astype(int) # classification
else:
nf_name = {}_pred.format(variable)
X_train[pred_temp] = y_train # regression
prior = X_train[pred_temp].mean()
col_avg_y = X_train.groupby(by=variable, axis=0)[pred_temp].agg({mean: mean, beta: size})
col_avg_y[beta] = prior_weight_func(col_avg_y[beta])
col_avg_y[nf_name] = col_avg_y[beta] * prior + (1 - col_avg_y[beta]) * col_avg_y[mean]
col_avg_y.drop([beta, mean], axis=1, inplace=True)
nf_train = X_train.join(col_avg_y, on=variable)[nf_name].values
nf_test = X_test.join(col_avg_y, on=variable).fillna(prior, inplace=False)[nf_name].values
return nf_train, nf_test, prior, col_avg_y
還是舉個例子好理解吧,假設這裡categorical_features=[f1,f2,f3],target=[0,1,2],那麼這裡我們以variable=『f1』,target=0為
例來計算,首先是去原始數據中標籤為0的樣本的f1特徵:
X_train[pred_temp] = (y_train == target).astype(int)
然後是根據X_train[pred_temp]的來計算target為0的樣本的佔比情況以作為prior先驗概率的值。
prior = X_train[pred_temp].mean()
然後接下來的計算方式和target encoding是一致的:
col_avg_y = X_train.groupby(by=variable, axis=0)[pred_temp].agg({mean: mean, beta: size})
col_avg_y[beta] = prior_weight_func(col_avg_y[beta])
col_avg_y[nf_name] = col_avg_y[beta] * prior + (1 - col_avg_y[beta]) * col_avg_y[mean]
col_avg_y.drop([beta, mean], axis=1, inplace=True)
唯一不同的方式是,mean encoding這裡用到了交叉計算的方式,以5折交叉為例,在80%的數據上計算編碼結果得到轉換的規則,
然後將剩下20%的數據按照轉換規則進行轉換,最後將結果返回:
nf_train = X_train.join(col_avg_y, on=variable)[nf_name].values
nf_test = X_test.join(col_avg_y, on=variable).fillna(prior, inplace=False)[nf_name].values
return nf_train, nf_test, prior, col_avg_y
最後:
X_new.iloc[small_ind, -1] = nf_small
把測試集的轉換結果賦給原始數據的copy。
綜上所屬,mean encoding的原理和target encoding基本是一樣的,只不過比target encoding多了一個交叉計算的步驟,假設有10000條數據,
target encoding是直接在這10000條數據上進行編碼結果的計算的,而mean encoding則是每次用類似與模型訓練的方法,比如五折交叉計算,用
80%的數據計算編碼結果然後賦給剩下的20%,重複5次則所有特徵都編碼完畢,這樣的好處就是一定程度上降低過擬合的影響。
完整代碼如下,回歸的原理類似公式如下,自己看吧:(注意,輸入X,y都必須是pandas格式的否則會報錯)
import numpy as np
import pandas as pd
from sklearn.model_selection import StratifiedKFold,KFold
from itertools import product
class MeanEncoder:
def __init__(self, categorical_features, n_splits=5, target_type=classification, prior_weight_func=None):
"""
:param categorical_features: list of str, the name of the categorical columns to encode
:param n_splits: the number of splits used in mean encoding
:param target_type: str, regression or classification
:param prior_weight_func:
a function that takes in the number of observations, and outputs prior weight
when a dict is passed, the default exponential decay function will be used:
k: the number of observations needed for the posterior to be weighted equally as the prior
f: larger f --> smaller slope
"""
self.categorical_features = categorical_features
self.n_splits = n_splits
self.learned_stats = {}
if target_type == classification:
self.target_type = target_type
self.target_values = []
else:
self.target_type = regression
self.target_values = None
if isinstance(prior_weight_func, dict):
self.prior_weight_func = eval(lambda x: 1 / (1 + np.exp((x - k) / f)), dict(prior_weight_func, np=np))
elif callable(prior_weight_func):
self.prior_weight_func = prior_weight_func
else:
self.prior_weight_func = lambda x: 1 / (1 + np.exp((x - 2) / 1))
@staticmethod
def mean_encode_subroutine(X_train, y_train, X_test, variable, target, prior_weight_func):
X_train = X_train[[variable]].copy()
X_test = X_test[[variable]].copy()
if target is not None:
nf_name = {}_pred_{}.format(variable, target)
X_train[pred_temp] = (y_train == target).astype(int) # classification
else:
nf_name = {}_pred.format(variable)
X_train[pred_temp] = y_train # regression
prior = X_train[pred_temp].mean()
col_avg_y = X_train.groupby(by=variable, axis=0)[pred_temp].agg({mean: mean, beta: size})
col_avg_y[beta] = prior_weight_func(col_avg_y[beta])
col_avg_y[nf_name] = col_avg_y[beta] * prior + (1 - col_avg_y[beta]) * col_avg_y[mean]
col_avg_y.drop([beta, mean], axis=1, inplace=True)
nf_train = X_train.join(col_avg_y, on=variable)[nf_name].values
nf_test = X_test.join(col_avg_y, on=variable).fillna(prior, inplace=False)[nf_name].values
return nf_train, nf_test, prior, col_avg_y
def fit_transform(self, X, y):
"""
:param X: pandas DataFrame, n_samples * n_features
:param y: pandas Series or numpy array, n_samples
:return X_new: the transformed pandas DataFrame containing mean-encoded categorical features
"""
X_new = X.copy()
if self.target_type == classification:
skf = StratifiedKFold(self.n_splits)
else:
skf = KFold(self.n_splits)
if self.target_type == classification:
self.target_values = sorted(set(y))
self.learned_stats = {{}_pred_{}.format(variable, target): [] for variable, target in
product(self.categorical_features, self.target_values)}
for variable, target in product(self.categorical_features, self.target_values):
nf_name = {}_pred_{}.format(variable, target)
X_new.loc[:, nf_name] = np.nan
for large_ind, small_ind in skf.split(y, y):
nf_large, nf_small, prior, col_avg_y = MeanEncoder.mean_encode_subroutine(
X_new.iloc[large_ind], y.iloc[large_ind], X_new.iloc[small_ind], variable, target, self.prior_weight_func)
X_new.iloc[small_ind, -1] = nf_small
self.learned_stats[nf_name].append((prior, col_avg_y))
else:
self.learned_stats = {{}_pred.format(variable): [] for variable in self.categorical_features}
for variable in self.categorical_features:
nf_name = {}_pred.format(variable)
X_new.loc[:, nf_name] = np.nan
for large_ind, small_ind in skf.split(y, y):
nf_large, nf_small, prior, col_avg_y = MeanEncoder.mean_encode_subroutine(
X_new.iloc[large_ind], y.iloc[large_ind], X_new.iloc[small_ind], variable, None, self.prior_weight_func)
X_new.iloc[small_ind, -1] = nf_small
self.learned_stats[nf_name].append((prior, col_avg_y))
return X_new
def transform(self, X):
"""
:param X: pandas DataFrame, n_samples * n_features
:return X_new: the transformed pandas DataFrame containing mean-encoded categorical features
"""
X_new = X.copy()
if self.target_type == classification:
for variable, target in product(self.categorical_features, self.target_values):
nf_name = {}_pred_{}.format(variable, target)
X_new[nf_name] = 0
for prior, col_avg_y in self.learned_stats[nf_name]:
X_new[nf_name] += X_new[[variable]].join(col_avg_y, on=variable).fillna(prior, inplace=False)[
nf_name]
X_new[nf_name] /= self.n_splits
else:
for variable in self.categorical_features:
nf_name = {}_pred.format(variable)
X_new[nf_name] = 0
for prior, col_avg_y in self.learned_stats[nf_name]:
X_new[nf_name] += X_new[[variable]].join(col_avg_y, on=variable).fillna(prior, inplace=False)[
nf_name]
X_new[nf_name] /= self.n_splits
return X_new
推薦閱讀: