解决的问题:

  1. 分发器引擎自适应报文栏位数量,批量处理整个报文的所有栏位
  2. 攻击检测引擎自适应报文栏位数量,批量处理整个报文的所有栏位
  3. 分发器引擎采用循环神经网路,可以识别正常、XSS、SQL、PHP
  4. 分发器将攻击栏位聚合(栏位-引擎)到具体的攻击检测引擎,批处理攻击栏位
  5. 攻击检测引擎采用支持向量机,可以识别正常、XSS、SQL、PHP
  6. 攻击检测引擎反查有效攻击栏位(有效攻击-栏位)

举个栗子:下面的测试代码中,向WAF模型中喂了10个样本,其中3条正常样本,4条XSS注入,2条SQL注入,1条PHP注入,WAF模型给我们吐出来4条XSS攻击样本和1条SQL注入样本,因为WAF模型只吐攻击,所以3条正常样本被WAF模型消化了。

可以设想一下,在真实的互联网环境中一次喂食了10万条报文,WAF模型消化了9.99万条正常报文,吐出100条攻击报文,是不是美滋滋。嘿嘿,这里我偷换了一下概念,加油哈~

测试栏位:

0# x_payload_1 = "1234567890"
0# x_payload_2 = "hello/world/waf.php"
1# x_payload_3 = "<script>alert(1)</script>"
2# x_payload_4 = "-2 union select group_concat(Username),2,3 from Person#"
0# x_payload_5 = "hi welecome to learn web security"
3# (分发器检测失败) x_payload_6 = "ex1.php?dir=| cat /etc/passwd"
1# x_payload_7 = "<script>alert( and 2=2#)</script>"
1# x_payload_8 = "<script>alert(2)</script>"
1# x_payload_9 = "<script>alert(3)</script>"
2# (攻击引擎检测失败) x_payload_10 = " and 1=1#"

运行结果:

测试了10条数据,8条成功,2条失败。其中一条因为分发器检测失败,虽然我做了策略弥补分发器的准确率,但是PHP注入攻击样本约1k,需要将样本数量提高到10k,否则分发器学习不到PHP注入的特征。另一条是SQL注入攻击引擎,特征提取数量太少,没法识别类似 and 1=1#,需要提取更多攻击的特征。我深感自己的智商捉急,欢迎大佬提宝贵的建议。

data ok!
load classifier ok!
load atks ok!

{"<script>alert( and 2=2#)</script>": 1,
"-2 union select group_concat(Username),2,3 from Person#": 2,
<script>alert(3)</script>: 1,
<script>alert(2)</script>: 1,
<script>alert(1)</script>: 1}

完整代码:

## waf.py
import sys
import urllib
import numpy as np
import tensorflow as tf
import tflearn
from tflearn.data_utils import to_categorical, pad_sequences
from sklearn.model_selection import train_test_split
import time
import re
from sklearn.externals import joblib

def get_feature(line):
x = []
for i, c in enumerate(line):
c = c.lower()
x.append(ord(c))
return x

def load_file(filename,label,ms=[],ns=[]):
with open(filename) as f:
for line in f:
line = line.strip(
)
line = urllib.unquote(line)
if len(line)<= 100:
m = get_feature(line)
if(label == 3):
n = 3
elif(label == 2):
n = 2
elif(label == 1):
n = 1
else:
n = 0
ms.append(m)
ns.append(n)
print(len(ms))

def load_files(file1,file2,file3,file4):
xs = []
ys = []
load_file(file1,0,xs,ys)
load_file(file2,1,xs,ys)
load_file(file3,2,xs,ys)
load_file(file4,3,xs,ys)
return xs,ys

class XSS_FEATURE(object):
def __init__(self):
return
def get_len(self, url):
return len(url)
def get_url_count(self, url):
if re.search((http://)|(https://), url, re.IGNORECASE) :
return 1
else:
return 0
def get_evil_char(self, url):
return len(re.findall("[<>,"/]", url, re.IGNORECASE))
def get_evil_word(self, url):
return len(re.findall("(alert)|(script=)(%3c)|(%3e)|(%20)|(onerror)|(onload)|(eval)|(src=)|(prompt)",url,re.IGNORECASE))

class SQL_FEATURE(object):
def __init__(self):
return
def get_len(self, url):
return len(url)
def get_url_count(self, url):
if re.search((http://)|(https://), url, re.IGNORECASE):
return 1
else:
return 0
def get_evil_char(self, url):
return len(re.findall("[-,"*/]", url, re.IGNORECASE))
def get_evil_word(self, url):
return len(re.findall("(SELECT)|(CASE)|(WHEN)|(ORDER)|(GROUP)|(count)|(%2C%20)|(char)|(NULL)|(AND)",url,re.IGNORECASE))

class PHP_FEATURE(object):
def __init__(self):
return
def get_len(self, url):
return len(url)
def get_url_count(self, url):
if re.search((http://)|(https://), url, re.IGNORECASE) :
return 1
else:
return 0
def get_evil_char(self, url):
return len(re.findall("[${"=}.|]", url, re.IGNORECASE))
def get_evil_func(self, url):
return len(re.findall("(print)|(assert)|(system)|(preg_replace)|(create_function)|(call_user_func)|(call_user_func_array)|(eval)|(array_map)|(ob_start)|(shell_exec)|(passthru)|(escapeshellcmd)|(proc_popen)|(pcntl_exec)|(phpinfo)|(exit)",url,re.IGNORECASE))
def get_evil_word(self, url):
return len(re.findall("(chr)|(%27)|(passwd)|(whoami)|(base64_decode)", url, re.IGNORECASE))

def get_xss_feature(pkt_loads):
xss_f=XSS_FEATURE()
xss_features = []
for pkt_load in pkt_loads:
f1=xss_f.get_len(pkt_load)
f2=xss_f.get_url_count(pkt_load)
f3=xss_f.get_evil_char(pkt_load)
f4=xss_f.get_evil_word(pkt_load)
xss_features.append([f1,f2,f3,f4])
return xss_features

def get_sql_feature(pkt_loads):
sql_f = SQL_FEATURE()
sql_features = []
for pkt_load in pkt_loads:
f1=sql_f.get_len(pkt_load)
f2=sql_f.get_url_count(pkt_load)
f3=sql_f.get_evil_char(pkt_load)
f4=sql_f.get_evil_word(pkt_load)
sql_features.append([f1,f2,f3,f4])
return sql_features

def get_php_feature(pkt_loads):
php_f=PHP_FEATURE()
php_features = []
for pkt_load in pkt_loads:
f1=php_f.get_len(pkt_load)
f2=php_f.get_url_count(pkt_load)
f3=php_f.get_evil_char(pkt_load)
f4=php_f.get_evil_func(pkt_load)
f5=php_f.get_evil_word(pkt_load)
php_features.append([f1,f2,f3,f4,f5])
return php_features

def get_pkt_feature(atk_engine_info,pkt_loads):
if(atk_engine_info[1] == 1):
pkt_feature = get_xss_feature(pkt_loads)
elif(atk_engine_info[1] == 2):
pkt_feature = get_sql_feature(pkt_loads)
elif(atk_engine_info[1] == 3):
pkt_feature = get_php_feature(pkt_loads)
return pkt_feature

def init_atk_engines():
xss_engine = joblib.load("./model/train_model_xss.m")
sql_engine = joblib.load("./model/train_model_sql.m")
php_engine = joblib.load("./model/train_model_php.m")
atk_engines_index = (xss_engine, sql_engine, php_engine)
print("load atks ok!

")
return atk_engines_index;

def get_engine_by_index(atk_engines_index,atk_class_index):
atk_engine = atk_engines_index[atk_class_index-1]
atk_engine_info = (atk_engine,atk_class_index)
return atk_engine_info

def atk_predict(atk_engine_info,pkt_loads):
if(len(pkt_loads) == 0):
return []
else:
pkt_features = get_pkt_feature(atk_engine_info,pkt_loads)
atk_predict_01s = atk_engine_info[0].predict(pkt_features)
return atk_predict_01s

def classifier_train(x,y):
with tf.Graph().as_default():
x_train, x_test, y_train, y_test=train_test_split( x,y, test_size=0.2,random_state=0)
x_train = pad_sequences(x_train,maxlen=100,value=0.)
x_test = pad_sequences(x_test,maxlen=100,value=0.)
y_train = to_categorical(y_train, nb_classes=4)
y_test = to_categorical(y_test, nb_classes=4)

net = tflearn.input_data([None, 100])
net = tflearn.embedding(net, input_dim=256, output_dim=128)
net = tflearn.lstm(net, 128, dropout=0.8)
net = tflearn.fully_connected(net, 4, activation=softmax)
net = tflearn.regression(net, optimizer=adam, learning_rate=0.1,
loss=categorical_crossentropy)

model = tflearn.DNN(net, tensorboard_verbose=3)
model.fit(x_train, y_train,n_epoch=10, validation_set=(x_test, y_test), show_metric=True,
batch_size=200,run_id="waf-demo-2018")
model.save("./model/waf-demo-2018.tfl")

def classifier_load():
with tf.Graph().as_default():
net = tflearn.input_data([None, 100])
net = tflearn.embedding(net, input_dim=256, output_dim=128)
net = tflearn.lstm(net, 128, dropout=0.8)
net = tflearn.fully_connected(net, 4, activation=softmax)
net = tflearn.regression(net, optimizer=adam, learning_rate=0.1,
loss=categorical_crossentropy)
model = tflearn.DNN(net, tensorboard_verbose=3)
model.load("./model/waf-demo-2018.tfl")
print("load classifier ok!

")
return (model,)

def classifier_handout(handout,payloads):
atk_engine_collect_fields = [[],[],[]]
field_num = 0
for atk_id in handout:
for j in range(2):
if(atk_id[j] == 1):# xss
atk_engine_collect_fields[0].append(payloads[field_num])
elif(atk_id[j] == 2):# sql
atk_engine_collect_fields[1].append(payloads[field_num])
elif(atk_id[j] == 3):# php
atk_engine_collect_fields[2].append(payloads[field_num])
field_num += 1
return atk_engine_collect_fields

def classifier_predict(model,payloads):
model = model[0]
fields = []
for payload in payloads:
field = get_feature(urllib.unquote(payload))
fields.append(field)
fields = pad_sequences(fields,maxlen=100,value=0.)
distributions = model.predict_label(fields)
handout = distributions[:,0:2]
handout = handout.tolist()
atk_engine_collect_fields = classifier_handout(handout,payloads)
return atk_engine_collect_fields

def data_pre_treat():
pkt_loads = []
x_payload_1 = "1234567890"
x_payload_2 = "hello/world/waf.php"
x_payload_3 = "<script>alert(1)</script>"
x_payload_4 = "-2 union select group_concat(Username),2,3 from Person#"
x_payload_5 = "hi welecome to learn web security"
x_payload_6 = "ex1.php?dir=| cat /etc/passwd"
x_payload_7 = "<script>alert( and 2=2#)</script>"
x_payload_8 = "<script>alert(2)</script>"
x_payload_9 = "<script>alert(3)</script>"
x_payload_10 = " and 1=1#"
pkt_loads.append(x_payload_1)
pkt_loads.append(x_payload_2)
pkt_loads.append(x_payload_3)
pkt_loads.append(x_payload_4)
pkt_loads.append(x_payload_5)
pkt_loads.append(x_payload_6)
pkt_loads.append(x_payload_7)
pkt_loads.append(x_payload_8)
pkt_loads.append(x_payload_9)
pkt_loads.append(x_payload_10)
print("data ok!

")
return pkt_loads

def get_engine_id(engine_index,atk_pkt_loads_value=0):
id = 0
if(atk_pkt_loads_value):
id = 2**(engine_index - 1) + atk_pkt_loads_value
else:
id = 2**(engine_index - 1)
return id

def reverse_look(atk_effect_lists,atk_engine_collect_fields):
engine_index = 1
atk_pkt_loads = {}
for atk_effect_list in atk_effect_lists:
field_index = 0
for atk_effect in atk_effect_list:
if(atk_effect):
key = atk_engine_collect_fields[engine_index-1][field_index]
if(atk_pkt_loads.get(key) == None):
atk_pkt_loads[key] = get_engine_id(engine_index)
else:
atk_pkt_loads[key] = get_engine_id(engine_index,atk_pkt_loads[key])
field_index += 1
engine_index += 1
return atk_pkt_loads

def test():
payloads = data_pre_treat()
model = classifier_load()
atk_engine_collect_fields = classifier_predict(model,payloads)

atk_engines_info= init_atk_engines()
xss_engine = get_engine_by_index(atk_engines_info,atk_class_index=1)
sql_engine = get_engine_by_index(atk_engines_info,atk_class_index=2)
php_engine = get_engine_by_index(atk_engines_info,atk_class_index=3)

xss = atk_predict(xss_engine,atk_engine_collect_fields[0])
sql = atk_predict(sql_engine,atk_engine_collect_fields[1])
php = atk_predict(php_engine,atk_engine_collect_fields[2])

atk_effect_lists = (xss,sql,php)
atk_pkt_loads = reverse_look(atk_effect_lists,atk_engine_collect_fields)
print(atk_pkt_loads)

if __name__ == "__main__":
xs,ys = load_files(sys.argv[1],sys.argv[2],sys.argv[3],sys.argv[4])
classifier_train(xs,ys)
test()

推荐阅读:

查看原文 >>
相关文章