企業安全建設之搭建開源SIEM平臺
https://www.freebuf.com/special/127172.html
https://www.freebuf.com/special/127264.html
https://www.freebuf.com/articles/network/127988.html
前言
SIEM(security information and event management),顧名思義就是針對安全信息和事件的管理系統,針對大多數企業是不便宜的安全系統,本文結合作者的經驗介紹下如何使用開源軟件搭建企業的SIEM系統,數據深度分析在下篇。
SIEM的發展
對比Gartner2009年和2016年的全球SIEM廠商排名,可以清楚看出,基於大數據架構的廠商Splunk迅速崛起,傳統四強依托完整的安全產品線和成熟市場渠道,依然占據領導者象限,其他較小的廠商逐漸離開領導者象限。最重要的存儲架構也由盤櫃(可選)+商業數據庫逐漸轉變為可橫向擴展的大數據架構,支持雲環境也成為趨勢。
開源SIEM領域,比較典型的就是ossim和Opensoc,ossim存儲架構是mysql,支持多種日誌格式,包括鼎鼎大名的Snort、Nmap、 Nessus以及Ntop等,對於數據規模不大的情況是個不錯的選擇,新版界面很酷炫。
完整的SIEM至少會包括以下功能:
- 漏洞管理
- 資產發現
- 入侵檢測
- 行為分析
- 日誌存儲、檢索
- 報警管理
- 酷炫報表
其中最核心的我認為是入侵檢測、行為分析和日誌存儲檢索,本文重點集中討論支撐上面三個功能的技術架構。
Opensoc簡介
Opensoc是思科2014年在BroCon大會上公布的開源項目,但是沒有真正開源其源代碼,只是發布了其技術框架。我們參考了Opensoc發布的架構,結合公司實際落地了一套方案。Opensoc完全基於開源的大數據框架kafka、storm、spark和es等,天生具有強大的橫向擴展能力,本文重點講解的也是基於Opensoc的siem搭建。
上圖是Opensoc給出的框架,初次看非常費解,我們以數據存儲與數據處理兩個緯度來細化,以常見的linux服務器ssh登錄日誌搜集為例。
數據搜集緯度
數據搜集緯度需求是搜集原始數據,存儲,提供用戶交互式檢索的UI接口,典型場景就是出現安全事件後,通過檢索日誌回溯攻擊行為,定損。
logtash其實可以直接把數據寫es,但是考慮到storm也要數據處理,所以把數據切分放到logstash,切分後的數據發送kafka,提供給storm處理和logstash寫入es。數據檢索可以直接使用kibana,非常方便。數據切分也可以在storm裏面完成。這個就是大名鼎鼎的ELK架構。es比較適合存儲較短時間的熱數據的實時檢索查詢,對於需要長期存儲,並且希望使用hadoop或者spark進行大時間跨度的離線分析時,還需要存儲到hdfs上,所以比較常見的數據流程圖為:
數據處理緯度
這裏以數據實時流式處理為例,storm從kafka中訂閱切分過的ssh登錄日誌,匹配檢測規則,檢測結果的寫入mysql或者es。
在這個例子中,孤立看一條登錄日誌難以識別安全問題,最多識別非跳板機登錄,真正運行還需要參考知識庫中的常見登錄IP、時間、IP情報等以及臨時存儲處理狀態的狀態庫中最近該IP的登錄成功與失敗情況。比較接近實際運行情況的流程如下:
具體判斷邏輯舉例如下,實際中使用大量代理IP同時暴力破解,打一槍換一個地方那種無法覆蓋,這裏只是個舉例:
擴展數據源
生產環境中,處理安全事件,分析入侵行為,只有ssh登錄日誌肯定是不夠,我們需要盡可能多的搜集數據源,以下作為參考:
- linux/window系統安全日誌/操作日誌
- web服務器訪問日誌
- 數據庫SQL日誌
- 網絡流量日誌
簡化後的系統架構如下,報警也存es主要是查看報警也可以通過kibana,人力不足界面都不用開發了:
storm拓撲
storm拓撲支持python開發,以處理SQL日誌為例子:
假設SQL日誌的格式
"Feb 16 06:32:50 " "127.0.0.1" "root@localhost" "select * from user where id=1"
一般storm的拓撲結構
簡化後spout是通用的從kafka讀取數據的,就一個bolt處理SQL日誌,匹配規則,命中策略即輸出”alert”:”原始SQL日誌”
核心bolt代碼doSQLCheckBolt偽碼
import storm
class doSQLCheckBolt(storm.BasicBolt):
def process(self, tup):
words = tup.values[0].split(" ")
sql = word[3]
if re.match(規則,sql):
storm.emit(["sqli",tup.values[0]])
doSQLCheckBolt().run()
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sqlLog", new kafkaSpout(), 10);
builder.setBolt("sqliAlert", new doSQLCheckBolt(), 3)
.shuffleGrouping("sqlLog");
拓撲提交示例
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("doSQL", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("doSQL");
cluster.shutdown();
logstash
在本文環節中,logstash的配置量甚至超過了storm的拓撲腳本開發量,下面講下比較重點的幾個點,切割日誌與檢索需求有關系,非常個性化,這裏就不展開了。
從文件讀取
input
file {
path => ["/var/log/*.log", "/var/log/message"]
type => "system"
start_position => "beginning"
}
}
從kafka中訂閱
input {
kafka {
zk_connect => "localhost:2181"
group_id => "logstash"
topic_id => "test"
reset_beginning => false # boolean (optional), default: false
consumer_threads => 5 # number (optional), default: 1
decorate_events => true # boolean (optional), default: false
}
}
寫kafka
output {
kafka {
broker_list => "localhost:9092"
topic_id => "test"
compression_codec => "snappy" # string (optional), one of ["none", "gzip", "snappy"], default: "none"
}
}
寫hdfs
output {
hadoop_webhdfs {
workers => 2
server => "localhost:14000"
user => "flume"
path => "/user/flume/logstash/dt=%{+Y}-%{+M}-%{+d}/logstash-%{+H}.log"
flush_size => 500
compress => "snappy"
idle_flush_time => 10
retry_interval => 0.5
}
}
寫es
output {
elasticsearch {
host => "localhost"
protocol => "http"
index => "logstash-%{type}-%{+YYYY.MM.dd}"
index_type => "%{type}"
workers => 5
template_overwrite => true
}
}
前言
SIEM(security information and event management),顧名思義就是針對安全信息和事件的管理系統,針對大多數企業是不便宜的安全系統,本文結合作者的經驗介紹如何使用開源軟件離線分析數據,使用攻擊建模的方式識別攻擊行為。
回顧系統架構
以數據庫為例,通過logstash搜集mysql的查詢日誌,近實時備份到hdfs集群上,通過hadoop腳本離線分析攻擊行為。
數據庫日誌搜集
常見的數據日誌搜集方式有三種:
鏡像方式
大多數數據庫審計產品都支持這種模式,通過分析數據庫流量,解碼數據庫協議,識別SQL預計,抽取出SQL日誌
代理方式
比較典型的就是db-proxy方式,目前百度、搜狐、美團、京東等都有相關開源產品,前端通過db-proxy訪問後端的真實數據庫服務器。SQL日誌可以直接在db-proxy上搜集。
客戶端方式
通過在數據庫服務器安裝客戶端搜集SQL日誌,比較典型的方式就是通過logstash來搜集,本文以客戶端方式進行講解,其余方式本質上也是類似的。
logstash配置
安裝
下載logstash https://www.elastic.co/downloads/logstash 目前最新版本5.2.1版
開啟mysql查詢日誌
mysql查詢日誌
配置logstash
input {
file {
type => "mysql_sql_file"
path => "/var/log/mysql/mysql.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
output {
kafka { broker_list => "localhost:9092" topic_id => "test" compression_codec => "snappy"
運行logstash
bin/logstash -f mysql.conf
日誌舉例
2017-02-16T23:29:00.813Z localhost 170216 19:10:15 37 Connect
debian-sys-maint@localhost on
2017-02-16T23:29:00.813Z localhost 37 Quit
2017-02-16T23:29:00.813Z localhost 38 Connect debian-sys-maint@localhost on
2017-02-16T23:29:00.813Z localhost 38 Query SHOW VARIABLES LIKE ‘pid_file‘
切詞
最簡化操作是不用進行切詞,如果喜歡自動切分出數據庫名,時間等字段,請參考:
grok語法
https://github.com/elastic/logstash/blob/v1.4.2/patterns/grok-patterns
grok語法調試
http://grokdebug.herokuapp.com/
常見攻擊特征
以常見的wavsep搭建靶場環境,請參考我的另外一篇文章《基於WAVSEP的靶場搭建指南》
使用SQL掃描鏈接
分析攻擊特征,下列列舉兩個,更多攻擊特征請大家自行總結
特征一
2017-02-16T23:29:00.993Z localhost 170216 19:19:12 46 Query SELECT username, password FROM users WHERE username=‘textvalue‘ UNION ALL SELECT NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL#‘ AND password=‘textvalue2‘
使用聯合查詢枚舉數據時會產生大量的NULL字段
特征二、三
枚舉數據庫結構時會使用INFORMATION_SCHEMA,另外個別掃描器會使用GROUP BY x)a)
2017-02-16T23:29:00.998Z localhost 46 Query SELECT username, password FROM users WHERE username=‘textvalue‘ AND (SELECT 7473 FROM(SELECT COUNT(*),CONCAT(0x7171716271,(SELECT (CASE WHEN (8199= 8199) THEN 1 ELSE 0 END)),0x717a627871,FLOOR(RAND(0)*2))x FROM INFORMATION_SCHEMA.PLUGINS GROUP BY x)a)-- LFpQ‘ AND password=‘textvalue2‘
hadoop離線處理
hadoop是基於map,reduce模型
簡化理解就是:
cat data.txt | ./map | ./reduce
最簡化期間,我們可以只開發map程序,在map中逐行處理日誌數據,匹配攻擊行為。
以perl腳本開發,python類似
在hadoop下運行即可。
生產環境
生產環境中的規則會比這復雜很多,需要你不斷補充,這裏只是舉例;
單純只編寫map會有大量的重復報警,需要開發reduce用於聚合;
應急響應時需要知道SQL註入的是那個庫,使用的是哪個賬戶,這個需要在logstash切割字段時補充;
應急響應時最好可以知道SQL註入對應的鏈接,這個需要將web的accesslog與SQL日誌關聯分析,比較成熟的方案是基於機器學習,學習出基於時間的關聯矩陣;
客戶端直接搜集SQL數據要求mysql也開啟查詢日誌,這個對服務器性能有較大影響,我知道的大型公司以db-prxoy方式接入為主,建議可以在db-proxy上搜集;
基於規則識別SQL註入存在瓶頸,雖然相對web日誌層面以及流量層面有一定進步,SQL語義成為必然之路。
後繼
前言
SIEM(security information and event management),顧名思義就是針對安全信息和事件的管理系統,針對大多數企業是不便宜的安全系統,本文結合作者的經驗介紹如何使用開源軟件離線分析數據,使用算法挖掘未知攻擊行為。上集傳送門
回顧系統架構
以WEB服務器日誌為例,通過logstash搜集WEB服務器的查詢日誌,近實時備份到hdfs集群上,通過hadoop腳本離線分析攻擊行為。
自定義日誌格式
開啟httpd自定義日誌格式,記錄User-Agen以及Referer
<IfModule logio_module>
# You need to enable mod_logio.c to use %I and %O
LogFormat "%h %l %u %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-Agent}i\" %I %O" combinedio
</IfModule>
CustomLog "logs/access_log" combined
日誌舉例
180.76.152.166 - - [26/Feb/2017:13:12:37 +0800] "GET /wordpress/ HTTP/1.1" 200 17443 "http://180.76.190.79:80/" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.21 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.21"
180.76.152.166 - - [26/Feb/2017:13:12:37 +0800] "GET /wordpress/wp-json/ HTTP/1.1" 200 51789 "-" "print `env`"
180.76.152.166
- - [26/Feb/2017:13:12:38 +0800] "GET
/wordpress/wp-admin/load-styles.php?c=0&dir=ltr&load[]=dashicons,buttons,forms,l10n,login&ver=Li4vLi4vLi4vLi4vLi4vLi4vLi4vLi4vLi4vLi4vZXRjL3Bhc3N3ZAAucG5n
HTTP/1.1" 200 35841 "http://180.76.190.79:80/" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.21 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.21"
180.76.152.166 - - [26/Feb/2017:13:12:38 +0800] "GET /wordpress/ HTTP/1.1" 200 17442 "http://180.76.190.79:80/" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.21 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.21"
測試環境
在wordpress目錄下添加測試代碼1.php,內容為phpinfo
針對1.php的訪問日誌
[root@instance-8lp4smgv logs]# cat access_log | grep ‘wp-admin/1.php‘
125.33.206.140
- - [26/Feb/2017:13:09:47 +0800] "GET /wordpress/wp-admin/1.php
HTTP/1.1" 200 17 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3)
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102
Safari/537.36"
125.33.206.140 - - [26/Feb/2017:13:11:19 +0800]
"GET /wordpress/wp-admin/1.php HTTP/1.1" 200 17 "-" "Mozilla/5.0
(Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like
Gecko) Chrome/50.0.2661.102 Safari/537.36"
125.33.206.140 - -
[26/Feb/2017:13:13:44 +0800] "GET /wordpress/wp-admin/1.php HTTP/1.1"
200 17 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3)
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102
Safari/537.36"
127.0.0.1 - - [26/Feb/2017:13:14:19 +0800] "GET
/wordpress/wp-admin/1.php HTTP/1.1" 200 17 "-" "curl/7.19.7
(x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.14.0.0 zlib/1.2.3
libidn/1.18 libssh2/1.4.2"
127.0.0.1 - - [26/Feb/2017:13:16:04
+0800] "GET /wordpress/wp-admin/1.php HTTP/1.1" 200 107519 "-"
"curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.14.0.0
zlib/1.2.3 libidn/1.18 libssh2/1.4.2"
125.33.206.140 - -
[26/Feb/2017:13:16:12 +0800] "GET /wordpress/wp-admin/1.php HTTP/1.1"
200 27499 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3)
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102
Safari/537.36"
[root@instance-8lp4smgv logs]#
hadoop離線處理
hadoop是基於map,reduce模型
map腳本
localhost:work maidou$ cat mapper-graph.pl
#!/usr/bin/perl -w
#180.76.152.166 - - [26/Feb/2017:13:12:37 +0800] "GET /wordpress/ HTTP/1.1" 200 17443 "http://180.76.190.79:80/" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.21 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.21"
my $line="";
while($line=<>)
{
if( $line=~/"GET (\S+) HTTP\/1.[01]" 2\d+ \d+ "(\S+)"/ )
{
my $path=$1;
my $ref=$2;
if( $path=~/(\S+)\?(\S+)/ )
{
$path=$1;
}
if( $ref=~/(\S+)\?(\S+)/ )
{
$ref=$1;
}
if( ($ref=~/^http:\/\/180/)||( "-" eq $ref ) )
{
my $line=$ref."::".$path."\n";
#printf("$ref::$path\n");
print($line);
}
}
}
reducer腳本
localhost:work maidou$ cat reducer-graph.pl
#!/usr/bin/perl -w
my %result;
my $line="";
while($line=<>)
{
if( $line=~/(\S+)\:\:(\S+)/ )
{
unless( exists($result{$line}) )
{
$result{$line}=1;
}
}
}
foreach $key (sort keys %result)
{
if( $key=~/(\S+)\:\:(\S+)/ )
{
my $ref=$1;
my $path=$2;#這裏是舉例,過濾你關註的webshell文件後綴,常見的有php、jsp,白名單形式過濾存在漏報風險;也可以以黑名單形式過濾你忽略的文件類型
if( $path=~/(\.php)$/ )
{
my $output=$ref." -> ".$path."\n";
print($output);
}
}
}
生成結果示例為:
- -> http://180.76.190.79/wordpress/wp-admin/1.php
- -> http://180.76.190.79/wordpress/wp-admin/admin-ajax.php
- -> http://180.76.190.79/wordpress/wp-admin/customize.php
http://180.76.190.79/wordpress/ -> http://180.76.190.79/wordpress/wp-admin/edit-comments.php
http://180.76.190.79/wordpress/ -> http://180.76.190.79/wordpress/wp-admin/profile.php
http://180.76.190.79/wordpress/ -> http://180.76.190.79/wordpress/wp-login.php
http://180.76.190.79/wordpress/ -> http://180.76.190.79/wordpress/xmlrpc.php
圖算法
講生成數據導入圖數據庫neo4j,滿足webshell特征的為:
入度出度均為0
入度出度均為1且自己指向自己
neo4j
neo4j是一個高性能的,NOSQL圖形數據庫,它將結構化數據存儲在網絡上而不是表中,因其嵌入式、高性能、輕量級等優勢,越來越受到關註。
neo4j安裝
https://neo4j.com/ 上下載安裝包安裝,默認配置即可
ne04j啟動
以我的mac為例子,通過gui啟動即可,默認密碼為ne04j/ne04j,第一次登錄會要求更改密碼
GUI管理界面
python api庫安裝
sudo pip install neo4j-driver
下載JPype
https://pypi.python.org/pypi/JPype1
安裝JPype
tar -zxvf JPype1-0.6.2.tar.gz
cd JPype1-0.6.2
sudo python setup.py install
將數據導入圖數據庫代碼如下:
B0000000B60544:freebuf liu.yan$ cat load-graph.py
import re
from neo4j.v1 import GraphDatabase, basic_auth
nodes={}
index=1
driver = GraphDatabase.driver("bolt://localhost:7687",auth=basic_auth("neo4j","maidou"))
session = driver.session()
file_object = open(‘r-graph.txt‘, ‘r‘)
try:
for line in file_object:
matchObj = re.match( r‘(\S+) -> (\S+)‘, line, re.M|re.I)
if matchObj:
path = matchObj.group(1);
ref = matchObj.group(2);
if path in nodes.keys():
path_node = nodes[path]
else:
path_node = "Page%d" % index
nodes[path]=path_node
sql = "create (%s:Page {url:\"%s\" , id:\"%d\",in:0,out:0})" %(path_node,path,index)
index=index+1
session.run(sql)
#print sql
if ref in nodes.keys():
ref_node = nodes[ref]
else:
ref_node = "Page%d" % index
nodes[ref]=ref_node
sql = "create (%s:Page {url:\"%s\",id:\"%d\",in:0,out:0})" %(ref_node,ref,index)
index=index+1
session.run(sql)
#print sql
sql = "create (%s)-[:IN]->(%s)" %(path_node,ref_node)
session.run(sql)
#print sql
sql = "match (n:Page {url:\"%s\"}) SET n.out=n.out+1" % path
session.run(sql)
#print sql
sql = "match (n:Page {url:\"%s\"}) SET n.in=n.in+1" % ref
session.run(sql)
#print sql
finally:
file_object.close( )
session.close()
生成有向圖如下
查詢入度為1出度均為0的結點或者查詢入度出度均為1且指向自己的結點,由於把ref為空的情況也識別為”-”結點,所以入度為1出度均為0。
優化點
生產環境實際使用中,我們遇到誤報分為以下幾種:
主頁,各種index頁面(第一個誤報就是這種)
phpmyadmin、zabbix等運維管理後臺
hadoop、elk等開源軟件的控制臺
API接口
這些通過短期加白可以有效解決,比較麻煩的是掃描器對結果的影響(第二個誤報就是這種),這部分需要通過掃描器指紋或者使用高大上的人機算法來去掉幹擾。
後記
使用算法來挖掘未知攻擊行為是目前非常流行的一個研究方向,本文只是介紹了其中比較好理解和實現的一種算法,該算法並非我首創,不少安全公司也都或多或少有過實踐。篇幅有限,我將陸續在企業安全建設專題其他文章中由淺入深介紹其他算法。算法或者說機器學習本質是科學規律在大數據集集合上趨勢體現,所以很難做到精準報警,目前階段還是需要通過各種規則和模型來輔助,不過對於挖掘未知攻擊行為確實是一支奇兵。
企業安全建設之搭建開源SIEM平臺