閱讀sqlmap源代碼,編寫burpsuite插件--sqlmapapi
0x00 概要
在安全測試過程中,大部分人會使用burpsuite的scanner模塊進行測試,可以發現一些淺顯的漏洞:比如xss、sql injection、c***f、xxe、Arbitrary file existence disclosure in Act、明文傳輸等。
說到sql injection,測試人員都會有一種想法是否存在一款自動化工具,可以將某一網站的所有鏈接都去嘗試一邊,盡可能的發現所有的sql injection。有了這種想法後大家會去想解決方案,有一種解決方案是編寫burpsuite插件。
0x01 為什麽sqlmapapi只能檢測get請求是否存在註入?
編寫burpsuite插件檢查sql injection,網上已經存在了很多代碼。
如圖中代碼一樣,網上所有的資料在調用sqlmapapi進行掃描時只能掃描get請求的鏈接,比如:http://www.xxx.com/index.php?id=1 對post提交的參數無法進行掃描,開始我一直很迷惑到底是sqlmap沒有提供該方法還是前輩們沒有寫,所以我就查看了一下sqlmap的源代碼。
調用sqlmapapi的基本過程:
接下來我們看看為什麽sqlmapapi不能進行post掃描:
1、首先查看sqlmapapi.py文件:
# Start the client or the server if args.server is True: server(args.host, args.port, adapter=args.adapter, username=args.username, password=args.password) elif args.client is True: client(args.host, args.port, username=args.username, password=args.password) else: apiparser.print_help()
sqlmapapi開啟服務後,我們在插件中的請求就是一個客戶端,所以我們需要關註 client(args.host, args.port, username=args.username, password=args.password)
2、cilent函數所在文件:
3、由於在api.py中存在大量代碼,逐行查找很費勁,我們直接搜索def client
4、分析cilent函數
def client(host=RESTAPI_DEFAULT_ADDRESS, port=RESTAPI_DEFAULT_PORT, username=None, password=None): """ REST-JSON API client """ DataStore.username = username DataStore.password = password dbgMsg = "Example client access from command line:" dbgMsg += "\n\t$ taskid=$(curl http://%s:%d/task/new 2>1 | grep -o -I ‘[a-f0-9]\{16\}‘) && echo $taskid" % (host, port) dbgMsg += "\n\t$ curl -H \"Content-Type: application/json\" -X POST -d ‘{\"url\": \"http://testphp.vulnweb.com/artists.php?artist=1\"}‘ http://%s:%d/scan/$taskid/start" % (host, port) dbgMsg += "\n\t$ curl http://%s:%d/scan/$taskid/data" % (host, port) dbgMsg += "\n\t$ curl http://%s:%d/scan/$taskid/log" % (host, port) logger.debug(dbgMsg) addr = "http://%s:%d" % (host, port) logger.info("Starting REST-JSON API client to ‘%s‘..." % addr) try: _client(addr) except Exception, ex: if not isinstance(ex, urllib2.HTTPError) or ex.code == httplib.UNAUTHORIZED: errMsg = "There has been a problem while connecting to the " errMsg += "REST-JSON API server at ‘%s‘ " % addr errMsg += "(%s)" % ex logger.critical(errMsg) return
我們查看的重點代碼是
dbgMsg = "Example client access from command line:"
dbgMsg += "\n\t$ taskid=$(curl http://%s:%d/task/new 2>1 | grep -o -I ‘[a-f0-9]\{16\}‘) && echo $taskid" % (host, port)
dbgMsg += "\n\t$ curl -H \"Content-Type: application/json\" -X POST -d ‘{\"url\": \"http://testphp.vulnweb.com/artists.php?artist=1\"}‘ http://%s:%d/scan/$taskid/start" % (host, port)
dbgMsg += "\n\t$ curl http://%s:%d/scan/$taskid/data" % (host, port)
dbgMsg += "\n\t$ curl http://%s:%d/scan/$taskid/log" % (host, port)
logger.debug(dbgMsg)
在 dbgMsg += "\n\t$ curl -H \"Content-Type: application/json\" -X POST -d ‘{\"url\": \"http://testphp.vulnweb.com/artists.php?artist=1\"}‘ http://%s:%d/scan/$taskid/start" % (host, port)這一行代碼表示開啟一個任務掃描,Content-Type: application/json為請求頭的部分;http://%s:%d/scan/$taskid/start
開啟的具體那個任務;-X POST -d ‘{\"url\": \"http://testphp.vulnweb.com/artists.php?artist=1\"}‘
post請求參數,其中只有url,沒有關於post請求的data參數,因此sqlmapapi只能進行get請求的sql injection 檢測。
0x02繼承IHttpListener接口(方法一),存在缺陷
先上源代碼:
from burp import IBurpExtender
from burp import IHttpListener
from java.io import PrintWriter
import re
import urllib
import urllib2
import time
import json
from threading import Thread
import requests
class BurpExtender(IBurpExtender, IHttpListener):
#
#implement IBurpExtender
#
def registerExtenderCallbacks(self, callbacks):
# keep a reference to our callbacks object
self._callbacks = callbacks
# set our extension name
callbacks.setExtensionName("fanyingjie")
# obtain our output stream
self._stdout = PrintWriter(callbacks.getStdout(), True)
self._helpers = callbacks.getHelpers()
# register ourselves as an
callbacks.registerHttpListener(self)
def processHttpMessage(self,toolFlag,messageIsRequest, messageInfo):
if(messageIsRequest):
a=self._helpers.analyzeRequest(messageInfo)
method=a.getMethod()
url=str(a.getUrl())
if(("?" in url) and (method=="GET")):
self._stdout.println("start")
t=AutoSqli(target=url,stdout=self._stdout,method=method)
t.run()
class AutoSqli(Thread):
def __init__(self,target,stdout,method):
self.server="http://192.168.159.134:8775"
self.taskid = ‘‘
self.target=target
self.method=method
self._stdout=stdout
self.start_time = time.time()
def task_new(self):
self.taskid = json.loads(urllib2.urlopen(self.server + ‘/task/new‘).read())[‘taskid‘]
self._stdout.println(‘Created new task: ‘ + self.taskid )
if len(self.taskid) > 0:
return True
return False
def task_delete(self):
if json.loads(urllib2.urlopen(self.server + ‘/task/‘ + self.taskid + ‘/delete‘).read())[‘success‘]:
self._stdout.println(‘[%s] Deleted task‘ % (self.taskid))
return True
return False
def scan_start(self):
headers = {‘Content-Type‘: ‘application/json‘}
payload = {‘url‘:self.target}
url = self.server + ‘/scan/‘ + self.taskid + ‘/start‘
#t = json.loads(requests.post(url, data=json.dumps(payload), headers=headers).text)
req=urllib2.Request(url,data=json.dumps(payload),headers=headers)
t=json.loads(urllib2.urlopen(req).read())
self._stdout.println("start "+ self.taskid)
if len(str(t[‘engineid‘])) > 0 and t[‘success‘]:
return True
return False
def scan_status(self):
status = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/status‘).read())[‘status‘]
if status == ‘running‘:
return ‘running‘
if status == ‘terminated‘:
return ‘terminated‘
return "error"
def scan_data(self):
data = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/data‘).read())[‘data‘]
if len(data) == 0:
self._stdout.println(‘not injection:\t‘ + self.target)
return False
else:
self._stdout.println(‘injection:\t‘ + self.target)
return True
def scan_kill(self):
json.loads(rurllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/kill‘).read())[‘success‘]
self._stdout.println("%s kill")%(self.taskid)
def scan_stop(self):
json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/stop‘).read())[‘success‘]
self._stdout.println("%s stop")%(self.taskid)
def run(self):
try:
if not self.task_new():
return False
if not self.scan_start():
return False
while True:
if self.scan_status() == ‘running‘:
time.sleep(10)
elif self.scan_status() == ‘terminated‘:
break
else:
break
#print self.target + ":\t" + str(time.time() - self.start_time)
if time.time() - self.start_time > 500:
self.scan_stop()
self.scan_kill()
break
self.scan_data()
#self.task_delete()
except Exception as e:
pass
在本插件中繼承了IHttpListener接口,繼承該接口後,每點擊一次鏈接就會執行一次插件,只有當插件完成執行完成後,才能進行下一步:
訪問鏈接後,瀏覽器一直等待回應,繼承IHttpListener接口的方法很影響測試效率,不過優點是可以檢查出存在註入的鏈接
接下來對該代碼進行簡單的講解一下:
if(messageIsRequest): #當包是請求包時執行sql injection檢查
a=self._helpers.analyzeRequest(messageInfo) #這是burp提供的一個函數,可以從請求包中獲取到url method header等
method=a.getMethod()
url=str(a.getUrl())
if(("?" in url) and (method=="GET")): #當請求是get請求和鏈接中存在參數時進行sql injection 檢查
self._stdout.println("start")
t=AutoSqli(target=url,stdout=self._stdout,method=method)
t.run()
class AutoSqli(Thread):
def __init__(self,target,stdout,method):
self.server="http://192.168.159.134:8775"#開啟sqlmapapi服務的ip
self.taskid = ‘‘
self.target=target
self.method=method
self._stdout=stdout
self.start_time = time.time()
def task_new(self):#創建一個新的任務,並獲取taskid
self.taskid = json.loads(urllib2.urlopen(self.server + ‘/task/new‘).read())[‘taskid‘]
self._stdout.println(‘Created new task: ‘ + self.taskid )
if len(self.taskid) > 0:
return True
return False
def task_delete(self):#通過taskid刪除某一個任務
if json.loads(urllib2.urlopen(self.server + ‘/task/‘ + self.taskid + ‘/delete‘).read())[‘success‘]:
self._stdout.println(‘[%s] Deleted task‘ % (self.taskid))
return True
return False
def scan_start(self):#開始一個掃描,傳入需要掃描的地址
headers = {‘Content-Type‘: ‘application/json‘}
payload = {‘url‘:self.target}
url = self.server + ‘/scan/‘ + self.taskid + ‘/start‘
#t = json.loads(requests.post(url, data=json.dumps(payload), headers=headers).text)
req=urllib2.Request(url,data=json.dumps(payload),headers=headers)
t=json.loads(urllib2.urlopen(req).read())
self._stdout.println("start "+ self.taskid)
if len(str(t[‘engineid‘])) > 0 and t[‘success‘]:
return True
return False
def scan_status(self):#查看是否掃描完成,通過status判斷,terminated是掃描完成
status = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/status‘).read())[‘status‘]
if status == ‘running‘:
return ‘running‘
if status == ‘terminated‘:
return ‘terminated‘
return "error"
def scan_data(self):#獲取掃描完成的結果,如果data有值表名存在註入,否則不存在
data = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/data‘).read())[‘data‘]
if len(data) == 0:
self._stdout.println(‘not injection:\t‘ + self.target)
return False
else:
self._stdout.println(‘injection:\t‘ + self.target)
return True
def scan_kill(self):
json.loads(rurllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/kill‘).read())[‘success‘]
self._stdout.println("%s kill")%(self.taskid)
def scan_stop(self):
json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/stop‘).read())[‘success‘]
self._stdout.println("%s stop")%(self.taskid)
def run(self):
try:
if not self.task_new():
return False
if not self.scan_start():
return False
while True:
if self.scan_status() == ‘running‘:
time.sleep(10)
elif self.scan_status() == ‘terminated‘:
break
else:
break
#print self.target + ":\t" + str(time.time() - self.start_time)
if time.time() - self.start_time > 500:
self.scan_stop()
self.scan_kill()
break
self.scan_data()
#self.task_delete()
except Exception as e:
pass
在下一篇文章中,我會記錄一下第二種方法,可以在不影響測試效率的情況下,依然可以有效的檢查出註入;在設置一些sqlmapapi參數的前提下,比較一下sqlmapapi和burpsuite scanner模塊那個更有效率。
閱讀sqlmap源代碼,編寫burpsuite插件--sqlmapapi