解決的問題:

  1. 分發器引擎自適應報文欄位數量,批量處理整個報文的所有欄位
  2. 攻擊檢測引擎自適應報文欄位數量,批量處理整個報文的所有欄位
  3. 分發器引擎採用循環神經網路,可以識別正常、XSS、SQL、PHP
  4. 分發器將攻擊欄位聚合(欄位-引擎)到具體的攻擊檢測引擎,批處理攻擊欄位
  5. 攻擊檢測引擎採用支持向量機,可以識別正常、XSS、SQL、PHP
  6. 攻擊檢測引擎反查有效攻擊欄位(有效攻擊-欄位)

舉個栗子:下面的測試代碼中,向WAF模型中餵了10個樣本,其中3條正常樣本,4條XSS注入,2條SQL注入,1條PHP注入,WAF模型給我們吐出來4條XSS攻擊樣本和1條SQL注入樣本,因為WAF模型只吐攻擊,所以3條正常樣本被WAF模型消化了。

可以設想一下,在真實的互聯網環境中一次餵食了10萬條報文,WAF模型消化了9.99萬條正常報文,吐出100條攻擊報文,是不是美滋滋。嘿嘿,這裡我偷換了一下概念,加油哈~

測試欄位:

0# x_payload_1 = "1234567890"
0# x_payload_2 = "hello/world/waf.php"
1# x_payload_3 = "<script>alert(1)</script>"
2# x_payload_4 = "-2 union select group_concat(Username),2,3 from Person#"
0# x_payload_5 = "hi welecome to learn web security"
3# (分發器檢測失敗) x_payload_6 = "ex1.php?dir=| cat /etc/passwd"
1# x_payload_7 = "<script>alert( and 2=2#)</script>"
1# x_payload_8 = "<script>alert(2)</script>"
1# x_payload_9 = "<script>alert(3)</script>"
2# (攻擊引擎檢測失敗) x_payload_10 = " and 1=1#"

運行結果:

測試了10條數據,8條成功,2條失敗。其中一條因為分發器檢測失敗,雖然我做了策略彌補分發器的準確率,但是PHP注入攻擊樣本約1k,需要將樣本數量提高到10k,否則分發器學習不到PHP注入的特徵。另一條是SQL注入攻擊引擎,特徵提取數量太少,沒法識別類似 and 1=1#,需要提取更多攻擊的特徵。我深感自己的智商捉急,歡迎大佬提寶貴的建議。

data ok!
load classifier ok!
load atks ok!

{"<script>alert( and 2=2#)</script>": 1,
"-2 union select group_concat(Username),2,3 from Person#": 2,
<script>alert(3)</script>: 1,
<script>alert(2)</script>: 1,
<script>alert(1)</script>: 1}

完整代碼:

## waf.py
import sys
import urllib
import numpy as np
import tensorflow as tf
import tflearn
from tflearn.data_utils import to_categorical, pad_sequences
from sklearn.model_selection import train_test_split
import time
import re
from sklearn.externals import joblib

def get_feature(line):
x = []
for i, c in enumerate(line):
c = c.lower()
x.append(ord(c))
return x

def load_file(filename,label,ms=[],ns=[]):
with open(filename) as f:
for line in f:
line = line.strip(
)
line = urllib.unquote(line)
if len(line)<= 100:
m = get_feature(line)
if(label == 3):
n = 3
elif(label == 2):
n = 2
elif(label == 1):
n = 1
else:
n = 0
ms.append(m)
ns.append(n)
print(len(ms))

def load_files(file1,file2,file3,file4):
xs = []
ys = []
load_file(file1,0,xs,ys)
load_file(file2,1,xs,ys)
load_file(file3,2,xs,ys)
load_file(file4,3,xs,ys)
return xs,ys

class XSS_FEATURE(object):
def __init__(self):
return
def get_len(self, url):
return len(url)
def get_url_count(self, url):
if re.search((http://)|(https://), url, re.IGNORECASE) :
return 1
else:
return 0
def get_evil_char(self, url):
return len(re.findall("[<>,"/]", url, re.IGNORECASE))
def get_evil_word(self, url):
return len(re.findall("(alert)|(script=)(%3c)|(%3e)|(%20)|(onerror)|(onload)|(eval)|(src=)|(prompt)",url,re.IGNORECASE))

class SQL_FEATURE(object):
def __init__(self):
return
def get_len(self, url):
return len(url)
def get_url_count(self, url):
if re.search((http://)|(https://), url, re.IGNORECASE):
return 1
else:
return 0
def get_evil_char(self, url):
return len(re.findall("[-,"*/]", url, re.IGNORECASE))
def get_evil_word(self, url):
return len(re.findall("(SELECT)|(CASE)|(WHEN)|(ORDER)|(GROUP)|(count)|(%2C%20)|(char)|(NULL)|(AND)",url,re.IGNORECASE))

class PHP_FEATURE(object):
def __init__(self):
return
def get_len(self, url):
return len(url)
def get_url_count(self, url):
if re.search((http://)|(https://), url, re.IGNORECASE) :
return 1
else:
return 0
def get_evil_char(self, url):
return len(re.findall("[${"=}.|]", url, re.IGNORECASE))
def get_evil_func(self, url):
return len(re.findall("(print)|(assert)|(system)|(preg_replace)|(create_function)|(call_user_func)|(call_user_func_array)|(eval)|(array_map)|(ob_start)|(shell_exec)|(passthru)|(escapeshellcmd)|(proc_popen)|(pcntl_exec)|(phpinfo)|(exit)",url,re.IGNORECASE))
def get_evil_word(self, url):
return len(re.findall("(chr)|(%27)|(passwd)|(whoami)|(base64_decode)", url, re.IGNORECASE))

def get_xss_feature(pkt_loads):
xss_f=XSS_FEATURE()
xss_features = []
for pkt_load in pkt_loads:
f1=xss_f.get_len(pkt_load)
f2=xss_f.get_url_count(pkt_load)
f3=xss_f.get_evil_char(pkt_load)
f4=xss_f.get_evil_word(pkt_load)
xss_features.append([f1,f2,f3,f4])
return xss_features

def get_sql_feature(pkt_loads):
sql_f = SQL_FEATURE()
sql_features = []
for pkt_load in pkt_loads:
f1=sql_f.get_len(pkt_load)
f2=sql_f.get_url_count(pkt_load)
f3=sql_f.get_evil_char(pkt_load)
f4=sql_f.get_evil_word(pkt_load)
sql_features.append([f1,f2,f3,f4])
return sql_features

def get_php_feature(pkt_loads):
php_f=PHP_FEATURE()
php_features = []
for pkt_load in pkt_loads:
f1=php_f.get_len(pkt_load)
f2=php_f.get_url_count(pkt_load)
f3=php_f.get_evil_char(pkt_load)
f4=php_f.get_evil_func(pkt_load)
f5=php_f.get_evil_word(pkt_load)
php_features.append([f1,f2,f3,f4,f5])
return php_features

def get_pkt_feature(atk_engine_info,pkt_loads):
if(atk_engine_info[1] == 1):
pkt_feature = get_xss_feature(pkt_loads)
elif(atk_engine_info[1] == 2):
pkt_feature = get_sql_feature(pkt_loads)
elif(atk_engine_info[1] == 3):
pkt_feature = get_php_feature(pkt_loads)
return pkt_feature

def init_atk_engines():
xss_engine = joblib.load("./model/train_model_xss.m")
sql_engine = joblib.load("./model/train_model_sql.m")
php_engine = joblib.load("./model/train_model_php.m")
atk_engines_index = (xss_engine, sql_engine, php_engine)
print("load atks ok!

")
return atk_engines_index;

def get_engine_by_index(atk_engines_index,atk_class_index):
atk_engine = atk_engines_index[atk_class_index-1]
atk_engine_info = (atk_engine,atk_class_index)
return atk_engine_info

def atk_predict(atk_engine_info,pkt_loads):
if(len(pkt_loads) == 0):
return []
else:
pkt_features = get_pkt_feature(atk_engine_info,pkt_loads)
atk_predict_01s = atk_engine_info[0].predict(pkt_features)
return atk_predict_01s

def classifier_train(x,y):
with tf.Graph().as_default():
x_train, x_test, y_train, y_test=train_test_split( x,y, test_size=0.2,random_state=0)
x_train = pad_sequences(x_train,maxlen=100,value=0.)
x_test = pad_sequences(x_test,maxlen=100,value=0.)
y_train = to_categorical(y_train, nb_classes=4)
y_test = to_categorical(y_test, nb_classes=4)

net = tflearn.input_data([None, 100])
net = tflearn.embedding(net, input_dim=256, output_dim=128)
net = tflearn.lstm(net, 128, dropout=0.8)
net = tflearn.fully_connected(net, 4, activation=softmax)
net = tflearn.regression(net, optimizer=adam, learning_rate=0.1,
loss=categorical_crossentropy)

model = tflearn.DNN(net, tensorboard_verbose=3)
model.fit(x_train, y_train,n_epoch=10, validation_set=(x_test, y_test), show_metric=True,
batch_size=200,run_id="waf-demo-2018")
model.save("./model/waf-demo-2018.tfl")

def classifier_load():
with tf.Graph().as_default():
net = tflearn.input_data([None, 100])
net = tflearn.embedding(net, input_dim=256, output_dim=128)
net = tflearn.lstm(net, 128, dropout=0.8)
net = tflearn.fully_connected(net, 4, activation=softmax)
net = tflearn.regression(net, optimizer=adam, learning_rate=0.1,
loss=categorical_crossentropy)
model = tflearn.DNN(net, tensorboard_verbose=3)
model.load("./model/waf-demo-2018.tfl")
print("load classifier ok!

")
return (model,)

def classifier_handout(handout,payloads):
atk_engine_collect_fields = [[],[],[]]
field_num = 0
for atk_id in handout:
for j in range(2):
if(atk_id[j] == 1):# xss
atk_engine_collect_fields[0].append(payloads[field_num])
elif(atk_id[j] == 2):# sql
atk_engine_collect_fields[1].append(payloads[field_num])
elif(atk_id[j] == 3):# php
atk_engine_collect_fields[2].append(payloads[field_num])
field_num += 1
return atk_engine_collect_fields

def classifier_predict(model,payloads):
model = model[0]
fields = []
for payload in payloads:
field = get_feature(urllib.unquote(payload))
fields.append(field)
fields = pad_sequences(fields,maxlen=100,value=0.)
distributions = model.predict_label(fields)
handout = distributions[:,0:2]
handout = handout.tolist()
atk_engine_collect_fields = classifier_handout(handout,payloads)
return atk_engine_collect_fields

def data_pre_treat():
pkt_loads = []
x_payload_1 = "1234567890"
x_payload_2 = "hello/world/waf.php"
x_payload_3 = "<script>alert(1)</script>"
x_payload_4 = "-2 union select group_concat(Username),2,3 from Person#"
x_payload_5 = "hi welecome to learn web security"
x_payload_6 = "ex1.php?dir=| cat /etc/passwd"
x_payload_7 = "<script>alert( and 2=2#)</script>"
x_payload_8 = "<script>alert(2)</script>"
x_payload_9 = "<script>alert(3)</script>"
x_payload_10 = " and 1=1#"
pkt_loads.append(x_payload_1)
pkt_loads.append(x_payload_2)
pkt_loads.append(x_payload_3)
pkt_loads.append(x_payload_4)
pkt_loads.append(x_payload_5)
pkt_loads.append(x_payload_6)
pkt_loads.append(x_payload_7)
pkt_loads.append(x_payload_8)
pkt_loads.append(x_payload_9)
pkt_loads.append(x_payload_10)
print("data ok!

")
return pkt_loads

def get_engine_id(engine_index,atk_pkt_loads_value=0):
id = 0
if(atk_pkt_loads_value):
id = 2**(engine_index - 1) + atk_pkt_loads_value
else:
id = 2**(engine_index - 1)
return id

def reverse_look(atk_effect_lists,atk_engine_collect_fields):
engine_index = 1
atk_pkt_loads = {}
for atk_effect_list in atk_effect_lists:
field_index = 0
for atk_effect in atk_effect_list:
if(atk_effect):
key = atk_engine_collect_fields[engine_index-1][field_index]
if(atk_pkt_loads.get(key) == None):
atk_pkt_loads[key] = get_engine_id(engine_index)
else:
atk_pkt_loads[key] = get_engine_id(engine_index,atk_pkt_loads[key])
field_index += 1
engine_index += 1
return atk_pkt_loads

def test():
payloads = data_pre_treat()
model = classifier_load()
atk_engine_collect_fields = classifier_predict(model,payloads)

atk_engines_info= init_atk_engines()
xss_engine = get_engine_by_index(atk_engines_info,atk_class_index=1)
sql_engine = get_engine_by_index(atk_engines_info,atk_class_index=2)
php_engine = get_engine_by_index(atk_engines_info,atk_class_index=3)

xss = atk_predict(xss_engine,atk_engine_collect_fields[0])
sql = atk_predict(sql_engine,atk_engine_collect_fields[1])
php = atk_predict(php_engine,atk_engine_collect_fields[2])

atk_effect_lists = (xss,sql,php)
atk_pkt_loads = reverse_look(atk_effect_lists,atk_engine_collect_fields)
print(atk_pkt_loads)

if __name__ == "__main__":
xs,ys = load_files(sys.argv[1],sys.argv[2],sys.argv[3],sys.argv[4])
classifier_train(xs,ys)
test()

推薦閱讀:

查看原文 >>
相关文章