最近又碰到了高基數類別特徵的處理問題,正好也要把相關的解決方案添加到現有的線上機器學習系統裏,這裡總結一下以後免得又忘記了。

在特徵工程裏,特徵編碼是佔比很重的一塊,在kaggle的結構化數據比賽中,最終幫助選手勝利的關鍵因素之一往往是高級特徵的構造和特徵編碼(很多時候特徵編碼也是在構造高級的特徵),下面就來總結一下吧。

1、labelencoder 標籤編碼

如果是無序的非數值離散特徵,一般直接用onehot獨熱編碼了,有序的非數值離散特徵才會用到標籤編碼,因為大部分演算法是沒有內置自動識別類別特徵的功能的,所以需要做這麼一步簡單的轉換,原理很easy了不用廢話了,為了文章看起來完整才寫的,使用labelencoder或者自己用字典來做映射即可。

2、onehotencoder 獨熱編碼

針對類別特徵,例如【男人,女人】,【晴天,雨天,陰天】,類別型特徵,無序,最簡單快捷的方式是通過獨熱編碼轉化為【0,1】或者【0,0,1】這樣的形式,模型才能識別,同時也起到了擴充特徵的作用(例如邏輯在特徵進行onehot展開之後表達能力一般能夠得到較好的提高)。sklearn的onehot,pandas的get_dummies或者自己用字典映射均可。

  • 優點:獨熱編碼解決了分類器不好處理屬性數據的問題,在一定程度上也起到了擴充特徵的作用。它的值只有0和1,不同的類型存儲在垂直的空間。
  • 缺點:1、當類別的數量很多時,特徵空間會變得非常大。2、對於特定任務,例如詞向量化,直接使用onehot的方式是無法考慮到詞之間的交互關係的,onehot之後損失了部分信息。推而廣之,如果特徵之間是非獨立的(比如上下文的詞之間是存在交互關係,時間序列數據之間存在某些內在關係),就不能簡單的使用onehot功能

3、label_binarize 二值化編碼

舉個例子就知道是幹嘛用的了,比如特徵為【晴天,雨天,陰天,雷暴】則特徵轉化為【是否晴天,是否雨天,是否陰天,是否雷暴】,用數字來表示【雷暴】就是[0,0,0,1],和onthot看起來很類似,很多時候不那麼嚴格界定,其實等同於onehot,一般來說獨熱編碼的結果是多個0和1個1組成的比如類別特徵的處理,但是也存在處理之後出現多個1和多個0的情況,比如文本問題,whatever,不做嚴格區分,因為很多文章都不劃分那麼細,反正自己心裡有數就行了,實現使用sklearn的label_binarize或者自己用字典來實現。

4、直方圖編碼

直方圖編碼,主要針對類別型特徵與類別型標籤的一種編碼方式,還是舉個例子來說明什麼是直方圖編碼吧,最好理解了:

假設類別特徵f1=【A,A,B,B,B,C,C】,對應的二分類標籤為【0,1,0,1,1,0,0】,則我們是這樣來計算類別特徵f1中對應的類別的編碼值的:

以A為例,類別特徵f1的值為A的樣本有兩個,這兩個樣本的標籤分別為【0,1】,則A被直方圖編碼為【1/2,1/2】=【0.5,0.5】(A的樣本一共有2個所以分母為2,其中一個樣本標籤為1,一個樣本標籤為0),實際上就是計算取值為A的樣本中,不同類別樣本的比例,然後用這個比例來替換原始的類別標籤,這裡需要強調的是,無論是直方圖編碼還是我們後面要介紹的target encoding,本質上都是用類別特徵的統計量來代替原來的類別值的,沒什麼神祕的地方,很好理解。

如法炮製,我們來對B進行類別編碼,f1值為B的一共3個樣本,其中一個樣本標籤為0,兩個樣本標籤為1,所以B被編碼為【1/3,2/3】,很好理解了。同樣對於C,一共兩個樣本,並且兩個樣本標籤均為0,則編碼為【2/2,0】。

直方圖編碼實際上存在著比較多的問題,我們目前針對高基類特徵的常用的目標編碼或者均值編碼實際上可以看作是在直方圖編碼之上的問題改進。

直方圖編碼存在以下問題:

1、沒有考慮到類別特徵中不同類別的數量的影響,舉個例子,假設樣本的某個類別特徵為【A,A,A,A,A,A,B】,對應的標籤為【0,0,0,1,1,1,0】,則根據直方圖編碼的公式得到的結果為A:【1/2,1/2】,B:【1,0】,然而這實際上對於A來說是很不公平的,因為B的樣本數量太少,計算出來的結果根本不能算是明顯的統計特徵,而很可能是一種噪音,這實際上是一種非常「過擬合」的計算方式,因為一旦測試集中的樣本有多個B之後,B的直方圖編碼的結果很可能發生非常大的變化;

2、假設沒有1中出現的情況,所有的類別A,B的數量都比較均勻,直方圖編碼還是存在著一個潛在的隱患,直方圖編碼的計算非常依賴於訓練集中的樣本標籤的分佈情況,以f1特徵的那個例子為例,實際上直方圖這麼計算的隱含的假設是潛在的所有的數據的在類別f1上的每一個類別計算出來的結果可以用訓練集的結果來近似代替,簡單說比如我在訓練集中算出來A的直方圖編碼為【1/2,1/2】,即類別為A的樣本中有一半標籤0的樣本,一半標籤1的樣本,那麼一旦測試集的分佈情況發生改變,或者是訓練集本身的採樣過程就是有偏的,則直方圖編碼的結果就是完全錯誤的,(比如全樣本中,類別為A的樣本其實只有10%是標籤為0的,90%標籤為1的,則這個時候A的直方圖編碼為【1/10,9/10】,訓練集的產生可能是有偏的);

所以在可用的資料和kaggle比賽中很少有人會用到直方圖編碼,更多的使用target encoding和mean encoding。下面是簡單的直方圖編碼的實現,因為不怎麼用就懶得優化了。

def histogram_encoding(X,y):
category=list(set(X))
labels=list(set(y))
data=pd.concat([X,pd.DataFrame(y)],axis=1)
data.columns=[data,labels]
dictionary={}
for item in category:
temp=data[data[data]==item]
tp=temp[labels].value_counts()
if tp.shape[0]<len(labels):
for label in labels:
if label not in tp.index:
tp[label]=0
nums=tp.tolist()
sums=sum(nums)
nums=[items*1.0/sums for items in nums] ### 這裡sums如果-1就是one leave out的分類問題形式
##其實問題差別不是很大,數據量一般都是至少幾十萬的級別的這麼一個數據點的刪除與否沒什麼大影響
dictionary[item]=nums
hs_enc=X.copy()
hs_enc=hs_enc.values.tolist()
for i in range(len(hs_enc)):
hs_enc[i]=dictionary[hs_enc[i]]
return hs_enc,dictionary

5、WOE編碼

實際上這裡細心一點就可以發現,woe編碼僅僅針對於二分類問題,woe編碼如下:

原理很簡單就是根據woe的公式來計算即可。實際上woe編碼的方法很容易就可以擴展到多類,後面會寫。

單純從woe的公式就可以看出woe編碼存在的問題:

1、分母可能為0的問題;

2、類似於直方圖編碼,沒有考慮到不同類別數量的大小,例如類別特徵為【A,A,A,A,A,A,B】而標籤為【0,0,0,1,1,1,1】這樣的情況計算出來的woe明顯對A這個類別不公平

3、應用侷限性太大了,只能針對二分類問題,並且特徵也必須為離散特徵。

4、訓練集計算的woe編碼結果可能和測試集計算的woe編碼結果存在較大差異(所有基於統計特徵的編碼方式的通病)

首先我們調個包,使用到的是註明scikit-learn contrib分支中的category_encoders:

from category_encoders import *
import pandas as pd
from sklearn.datasets import load_boston
bunch = load_boston()
y = bunch.target > 22.5
X = pd.DataFrame(bunch.data, columns=bunch.feature_names)
enc = WOEEncoder(cols=[CHAS, RAD]).fit(X, y)
numeric_dataset = enc.transform(X)

通過查看內部核心實現代碼,對比原始公式:

github.com/scikit-learn 源代碼地址太多了自己看

源代碼中大致實現了上圖的計算邏輯,為了避免除0的問題,引入了「regulation」這個參數(用戶自定義,默認為1)來進行拉普拉斯平滑。

核心代碼:

nominator = (stats[sum] + self.regularization) / (self._sum + 2*self.regularization)
denominator = ((stats[count] - stats[sum]) + self.regularization) / (self._count - self._sum + 2*self.regularization)

所以對於很小樣本的數據進行woe編碼計算的結果會和實際計算上有出入,不過說實話如果樣本數量很小個人認為沒有太多編碼的必要吧,統計學意義非常不明顯。

這裡我們只解決了上面除0的問題,對於問題二,我們可以使用IV值的思路,針對類別特徵中不同類別的數量給woe的公式施加一個懲罰項:

這樣就把樣本數量的問題也考慮進去了。實現也很簡單,計算出woe編碼結果之後再計算懲罰項然後相乘即可,不贅述了。

針對問題3,如果要拓展多多分類,我想到的思路是使用直方圖編碼的思路:

這是原始的woe編碼公式,修改的思路是,分子為類別特徵中第i個類別中的 y_i/y_sum,分母為所有訓練樣本中的yi/y_sum,舉個例子把,例如類別特徵為【A,A,A,B,B】,標籤為【0,0,1,2,1】,則對於A,類別0的編碼的計算過程為ln(2/3 / 2/5)依次類推,不過就是不知道這種編碼結果效果好不好,下次打比賽的時候試試看好了。

針對問題4,沒想出來什麼好的辦法

6、target encoding 目標編碼

先調一波包

from category_encoders import *
import pandas as pd
from sklearn.datasets import load_boston
bunch = load_boston()
y = bunch.target
X = pd.DataFrame(bunch.data, columns=bunch.feature_names)
enc = TargetEncoder(cols=[CHAS, RAD]).fit(X, y)
numeric_dataset = enc.transform(X)
print(numeric_dataset.info())

原理也不難;

分類問題

對於C分類問題,目標編碼(target encode)後只需要增加C?1個屬性列,如果C遠遠小於N,則相對one-hot-encoding可以節省很多內存. 其出發點是用概率P(y=yi|x=xi)代替屬性值x, 其中x表示屬性值,y表示類別值. 但實際問題中,經常會遇到x=xi對應的樣本數目比較少,導致對P(y=yi|x=xi)的計算不準確. 所以後來的改進結果是引入先驗概率P(y=yi),公式轉換成 :

細心一點就可以發現,如果上述不引入先驗概率P以及lambda項,其實就是我們前面提到的直方圖編碼。直方圖編碼是target encode和mean encode的前輩了。

其中j∈[0,C),ni是訓練集中xi的樣本個數,λ(ni)∈[0,1]負責計算兩個概率值的可靠性,針對應用有不同的定義方法,如下是一個例子 :

(我們的category_encoders庫使用的就是上面這個例子的計算方式,其中參數k和f分別是我們的min_sample_leaf和smoothing參數),二者都是一個可調參數,當x在訓練集中出現次數n=k時,λ(n)=0.5,兩個概率的可靠性相等,隨者n的增大,先驗概率P(y=yi)的可靠性逐漸降低.

我第一次接觸這裡的入(n)還是比較奇怪的,長得很奇怪,不過其實帶幾個數進去算一算也能理解這個項的意義了,公式轉換成這樣主要是考慮到有的類別xi的數量太少從而編碼結果不精確(原因在直方圖編碼那邊已經描述過了),對於數量很大的xi來說,入(n)的引入幾乎沒有影響,比如n=100000,此時入(n)的計算結果趨近於1,先驗項的係數趨近於0,則target_encode計算的結果和直方圖編碼的計算結果是基本近似的。如果n很小,比如n=2,則入(n)=0.731,此時根據先驗項的係數為0.269,即最終編碼結果部分受到先驗項的影響,從而通過這種方式降低由於n數量太小而導致的編碼不精確的問題(這尼瑪也行。。。類似於用先驗的統計值對原來的編碼結果進行一個調和加權平均),所以顯然,這裡的k越大,則意味著先驗的影響越大。

回歸問題

回歸問題同樣可以使用均值編碼,只需要把概率換成均值

其中

表示x=xi對應的y均值,

是整個訓練集上y的均值

我們來看一看源代碼:

下面是核心實現代碼:

def fit_target_encoding(self, X, y):
mapping = {}

for switch in self.ordinal_encoder.category_mapping:
col = switch.get(col)
values = switch.get(mapping)

prior = self._mean = y.mean()

stats = y.groupby(X[col]).agg([count, mean])

smoove = 1 / (1 + np.exp(-(stats[count] - self.min_samples_leaf) / self.smoothing))
smoothing = prior * (1 - smoove) + stats[mean] * smoove
smoothing[stats[count] == 1] = prior

if self.handle_unknown == return_nan:
smoothing.loc[-1] = np.nan
elif self.handle_unknown == value:
smoothing.loc[-1] = prior

if self.handle_missing == return_nan:
smoothing.loc[values.loc[np.nan]] = np.nan
elif self.handle_missing == value:
smoothing.loc[-2] = prior

mapping[col] = smoothing

return mapping

核心中的核心代碼:

prior = self._mean = y.mean() ##計算連續值標籤的均值

stats = y.groupby(X[col]).agg([count, mean]) #根據類別特徵中的不同的類分別進行groupby,聚合函數為計數group和
#求平均值mean

smoove = 1 / (1 + np.exp(-(stats[count] - self.min_samples_leaf) / self.smoothing))
#計算smoove值,
smoothing = prior * (1 - smoove) + stats[mean] * smoove#計算最終的編碼結果smoothing值
smoothing[stats[count] == 1] = prior#出現次數為1的類別直接用先驗值prior代替

為了便於理解還是舉個例子吧。

假設特徵f1為【A,A,A,B,B,C,C,C,C,D】,對應的連續值標籤為【1,2,3,4,5,6,7,8,9,10】則根據上面的源代碼我們計算結果如下:

對於A,對應的子數據集為【A,A,A】。【1,2,3】,則y.mean()=(1+2+3+4+。。。+10)/10=5.5,groupby之後的結果為count=3,mean=(1+2+3)/3=2,因為min_samples_leaf和smoothing默認值為1,則smoove=1/(1+np.exp(-3-min_samples_leaf))=0.982

5.5*(1-0.892)+2*0.892=2.378,調包試了一下,結果差不多,思路沒什麼問題。

target encode是針對高基數類別特徵進行處理手段的最好的選擇之一。但它也有缺點,就是容易過擬合,因為所有的統計計算都是基於訓練集來的,所以一旦新數據集的分佈發生變化,就會產生類似於過擬合所產生的不良的訓練效果,所以接下來我們要介紹target encode 的升級版,也是目前最常用的特徵編碼方法之一,mean encoding。

7、mean encoding 均值編碼

網上有實現的源碼,就不費心思去看論文了,直接根據代碼來解釋均值編碼的原理吧,均值編碼的原理和target encoding非常非常類似,只不過為了避免過擬合加入了一些特別的手段而已。

首先來看一下初始化的部分:

self.categorical_features 用來指定特徵變數中的類別變數的變數名;

self.n_splits 用與指定後面交叉驗證的折數(後文詳述)

self.learned_stats 用於統計量的存放

然後做了一個分類和回歸的判斷,分類和回歸下的均值編碼略有不同

判斷是否存在先驗權重計算函數,沒有的話則默認使用下面的公式並且k和f是根據用戶給定的字典類型的參數取值的,有的話則使用用戶給定的先驗權重計算函數來計算先驗權重:

如果用戶沒有提供先驗權重計算函數也沒有提供k和f的參數值則k,f則使用默認值分別為k=2,f=1

然後我們看一下「fit_transform」部分:

首先是copy一個新的特徵矩陣Xnew,然後根據分類還是回歸問題選擇不同的抽樣方式(分層抽樣or普通抽樣),然後我們生成一個字典learned_stas用於存放編碼之後的結果類似這樣:

接下來是核心實現的部分:

for variable, target in product(self.categorical_features, self.target_values):
nf_name = {}_pred_{}.format(variable, target)
X_new.loc[:, nf_name] = np.nan
for large_ind, small_ind in skf.split(y, y):
nf_large, nf_small, prior, col_avg_y = MeanEncoder.mean_encode_subroutine(
X_new.iloc[large_ind], y.iloc[large_ind], X_new.iloc[small_ind], variable, target, self.prior_weight_func)
X_new.iloc[small_ind, -1] = nf_small
self.learned_stats[nf_name].append((prior, col_avg_y))

X_new中通過loc函數先佔個坑,然後進入交叉驗證:(補充:建議原始的輸入變數X和y先shuffle一下再進入計算)

然後我們就進入了核心的實現 MeanEncoder.mean_encode_subroutine(靜態函數):

@staticmethod
def mean_encode_subroutine(X_train, y_train, X_test, variable, target, prior_weight_func):
X_train = X_train[[variable]].copy()
X_test = X_test[[variable]].copy()

if target is not None:
nf_name = {}_pred_{}.format(variable, target)
X_train[pred_temp] = (y_train == target).astype(int) # classification
else:
nf_name = {}_pred.format(variable)
X_train[pred_temp] = y_train # regression
prior = X_train[pred_temp].mean()

col_avg_y = X_train.groupby(by=variable, axis=0)[pred_temp].agg({mean: mean, beta: size})
col_avg_y[beta] = prior_weight_func(col_avg_y[beta])
col_avg_y[nf_name] = col_avg_y[beta] * prior + (1 - col_avg_y[beta]) * col_avg_y[mean]
col_avg_y.drop([beta, mean], axis=1, inplace=True)

nf_train = X_train.join(col_avg_y, on=variable)[nf_name].values
nf_test = X_test.join(col_avg_y, on=variable).fillna(prior, inplace=False)[nf_name].values

return nf_train, nf_test, prior, col_avg_y

還是舉個例子好理解吧,假設這裡categorical_features=[f1,f2,f3],target=[0,1,2],那麼這裡我們以variable=『f1』,target=0為

例來計算,首先是去原始數據中標籤為0的樣本的f1特徵:

X_train[pred_temp] = (y_train == target).astype(int)

然後是根據X_train[pred_temp]的來計算target為0的樣本的佔比情況以作為prior先驗概率的值。

prior = X_train[pred_temp].mean()

然後接下來的計算方式和target encoding是一致的:

col_avg_y = X_train.groupby(by=variable, axis=0)[pred_temp].agg({mean: mean, beta: size})
col_avg_y[beta] = prior_weight_func(col_avg_y[beta])
col_avg_y[nf_name] = col_avg_y[beta] * prior + (1 - col_avg_y[beta]) * col_avg_y[mean]
col_avg_y.drop([beta, mean], axis=1, inplace=True)

唯一不同的方式是,mean encoding這裡用到了交叉計算的方式,以5折交叉為例,在80%的數據上計算編碼結果得到轉換的規則,

然後將剩下20%的數據按照轉換規則進行轉換,最後將結果返回:

nf_train = X_train.join(col_avg_y, on=variable)[nf_name].values
nf_test = X_test.join(col_avg_y, on=variable).fillna(prior, inplace=False)[nf_name].values

return nf_train, nf_test, prior, col_avg_y

最後:

X_new.iloc[small_ind, -1] = nf_small

把測試集的轉換結果賦給原始數據的copy。

綜上所屬,mean encoding的原理和target encoding基本是一樣的,只不過比target encoding多了一個交叉計算的步驟,假設有10000條數據,

target encoding是直接在這10000條數據上進行編碼結果的計算的,而mean encoding則是每次用類似與模型訓練的方法,比如五折交叉計算,用

80%的數據計算編碼結果然後賦給剩下的20%,重複5次則所有特徵都編碼完畢,這樣的好處就是一定程度上降低過擬合的影響。

完整代碼如下,回歸的原理類似公式如下,自己看吧:(注意,輸入X,y都必須是pandas格式的否則會報錯)

import numpy as np
import pandas as pd
from sklearn.model_selection import StratifiedKFold,KFold
from itertools import product

class MeanEncoder:
def __init__(self, categorical_features, n_splits=5, target_type=classification, prior_weight_func=None):
"""
:param categorical_features: list of str, the name of the categorical columns to encode

:param n_splits: the number of splits used in mean encoding

:param target_type: str, regression or classification

:param prior_weight_func:
a function that takes in the number of observations, and outputs prior weight
when a dict is passed, the default exponential decay function will be used:
k: the number of observations needed for the posterior to be weighted equally as the prior
f: larger f --> smaller slope
"""

self.categorical_features = categorical_features
self.n_splits = n_splits
self.learned_stats = {}

if target_type == classification:
self.target_type = target_type
self.target_values = []
else:
self.target_type = regression
self.target_values = None

if isinstance(prior_weight_func, dict):
self.prior_weight_func = eval(lambda x: 1 / (1 + np.exp((x - k) / f)), dict(prior_weight_func, np=np))
elif callable(prior_weight_func):
self.prior_weight_func = prior_weight_func
else:
self.prior_weight_func = lambda x: 1 / (1 + np.exp((x - 2) / 1))

@staticmethod
def mean_encode_subroutine(X_train, y_train, X_test, variable, target, prior_weight_func):
X_train = X_train[[variable]].copy()
X_test = X_test[[variable]].copy()

if target is not None:
nf_name = {}_pred_{}.format(variable, target)
X_train[pred_temp] = (y_train == target).astype(int) # classification
else:
nf_name = {}_pred.format(variable)
X_train[pred_temp] = y_train # regression
prior = X_train[pred_temp].mean()

col_avg_y = X_train.groupby(by=variable, axis=0)[pred_temp].agg({mean: mean, beta: size})
col_avg_y[beta] = prior_weight_func(col_avg_y[beta])
col_avg_y[nf_name] = col_avg_y[beta] * prior + (1 - col_avg_y[beta]) * col_avg_y[mean]
col_avg_y.drop([beta, mean], axis=1, inplace=True)

nf_train = X_train.join(col_avg_y, on=variable)[nf_name].values
nf_test = X_test.join(col_avg_y, on=variable).fillna(prior, inplace=False)[nf_name].values

return nf_train, nf_test, prior, col_avg_y

def fit_transform(self, X, y):

"""
:param X: pandas DataFrame, n_samples * n_features
:param y: pandas Series or numpy array, n_samples
:return X_new: the transformed pandas DataFrame containing mean-encoded categorical features
"""
X_new = X.copy()
if self.target_type == classification:
skf = StratifiedKFold(self.n_splits)
else:
skf = KFold(self.n_splits)

if self.target_type == classification:
self.target_values = sorted(set(y))
self.learned_stats = {{}_pred_{}.format(variable, target): [] for variable, target in
product(self.categorical_features, self.target_values)}
for variable, target in product(self.categorical_features, self.target_values):
nf_name = {}_pred_{}.format(variable, target)
X_new.loc[:, nf_name] = np.nan
for large_ind, small_ind in skf.split(y, y):
nf_large, nf_small, prior, col_avg_y = MeanEncoder.mean_encode_subroutine(
X_new.iloc[large_ind], y.iloc[large_ind], X_new.iloc[small_ind], variable, target, self.prior_weight_func)
X_new.iloc[small_ind, -1] = nf_small
self.learned_stats[nf_name].append((prior, col_avg_y))
else:
self.learned_stats = {{}_pred.format(variable): [] for variable in self.categorical_features}
for variable in self.categorical_features:
nf_name = {}_pred.format(variable)
X_new.loc[:, nf_name] = np.nan
for large_ind, small_ind in skf.split(y, y):
nf_large, nf_small, prior, col_avg_y = MeanEncoder.mean_encode_subroutine(
X_new.iloc[large_ind], y.iloc[large_ind], X_new.iloc[small_ind], variable, None, self.prior_weight_func)
X_new.iloc[small_ind, -1] = nf_small
self.learned_stats[nf_name].append((prior, col_avg_y))
return X_new

def transform(self, X):
"""
:param X: pandas DataFrame, n_samples * n_features
:return X_new: the transformed pandas DataFrame containing mean-encoded categorical features
"""
X_new = X.copy()

if self.target_type == classification:
for variable, target in product(self.categorical_features, self.target_values):
nf_name = {}_pred_{}.format(variable, target)
X_new[nf_name] = 0
for prior, col_avg_y in self.learned_stats[nf_name]:
X_new[nf_name] += X_new[[variable]].join(col_avg_y, on=variable).fillna(prior, inplace=False)[
nf_name]
X_new[nf_name] /= self.n_splits
else:
for variable in self.categorical_features:
nf_name = {}_pred.format(variable)
X_new[nf_name] = 0
for prior, col_avg_y in self.learned_stats[nf_name]:
X_new[nf_name] += X_new[[variable]].join(col_avg_y, on=variable).fillna(prior, inplace=False)[
nf_name]
X_new[nf_name] /= self.n_splits

return X_new

推薦閱讀:

相關文章