1. 程式人生 > >閱讀sqlmap源代碼,編寫burpsuite插件--sqlmapapi

閱讀sqlmap源代碼,編寫burpsuite插件--sqlmapapi

burpsuite burpsuite插件 sqlmapapi

burpsuite插件編寫---sql injection

0x00 概要

在安全測試過程中,大部分人會使用burpsuite的scanner模塊進行測試,可以發現一些淺顯的漏洞:比如xss、sql injection、c***f、xxe、Arbitrary file existence disclosure in Act、明文傳輸等。
說到sql injection,測試人員都會有一種想法是否存在一款自動化工具,可以將某一網站的所有鏈接都去嘗試一邊,盡可能的發現所有的sql injection。有了這種想法後大家會去想解決方案,有一種解決方案是編寫burpsuite插件。

0x01 為什麽sqlmapapi只能檢測get請求是否存在註入?

編寫burpsuite插件檢查sql injection,網上已經存在了很多代碼。
技術分享圖片
如圖中代碼一樣,網上所有的資料在調用sqlmapapi進行掃描時只能掃描get請求的鏈接,比如:http://www.xxx.com/index.php?id=1 對post提交的參數無法進行掃描,開始我一直很迷惑到底是sqlmap沒有提供該方法還是前輩們沒有寫,所以我就查看了一下sqlmap的源代碼。
調用sqlmapapi的基本過程:
技術分享圖片
接下來我們看看為什麽sqlmapapi不能進行post掃描:
1、首先查看sqlmapapi.py文件:

    # Start the client or the server
    if args.server is True:
        server(args.host, args.port, adapter=args.adapter, username=args.username, password=args.password)
    elif args.client is True:
        client(args.host, args.port, username=args.username, password=args.password)
    else:
        apiparser.print_help()

sqlmapapi開啟服務後,我們在插件中的請求就是一個客戶端,所以我們需要關註 client(args.host, args.port, username=args.username, password=args.password)
技術分享圖片
2、cilent函數所在文件:
技術分享圖片
3、由於在api.py中存在大量代碼,逐行查找很費勁,我們直接搜索def client
技術分享圖片
4、分析cilent函數

def client(host=RESTAPI_DEFAULT_ADDRESS, port=RESTAPI_DEFAULT_PORT, username=None, password=None):
    """
    REST-JSON API client
    """

    DataStore.username = username
    DataStore.password = password

    dbgMsg = "Example client access from command line:"
    dbgMsg += "\n\t$ taskid=$(curl http://%s:%d/task/new 2>1 | grep -o -I ‘[a-f0-9]\{16\}‘) && echo $taskid" % (host, port)
    dbgMsg += "\n\t$ curl -H \"Content-Type: application/json\" -X POST -d ‘{\"url\": \"http://testphp.vulnweb.com/artists.php?artist=1\"}‘ http://%s:%d/scan/$taskid/start" % (host, port)
    dbgMsg += "\n\t$ curl http://%s:%d/scan/$taskid/data" % (host, port)
    dbgMsg += "\n\t$ curl http://%s:%d/scan/$taskid/log" % (host, port)
    logger.debug(dbgMsg)

    addr = "http://%s:%d" % (host, port)
    logger.info("Starting REST-JSON API client to ‘%s‘..." % addr)

    try:
        _client(addr)
    except Exception, ex:
        if not isinstance(ex, urllib2.HTTPError) or ex.code == httplib.UNAUTHORIZED:
            errMsg = "There has been a problem while connecting to the "
            errMsg += "REST-JSON API server at ‘%s‘ " % addr
            errMsg += "(%s)" % ex
            logger.critical(errMsg)
            return

我們查看的重點代碼是

    dbgMsg = "Example client access from command line:"
    dbgMsg += "\n\t$ taskid=$(curl http://%s:%d/task/new 2>1 | grep -o -I ‘[a-f0-9]\{16\}‘) && echo $taskid" % (host, port)
    dbgMsg += "\n\t$ curl -H \"Content-Type: application/json\" -X POST -d ‘{\"url\": \"http://testphp.vulnweb.com/artists.php?artist=1\"}‘ http://%s:%d/scan/$taskid/start" % (host, port)
    dbgMsg += "\n\t$ curl http://%s:%d/scan/$taskid/data" % (host, port)
    dbgMsg += "\n\t$ curl http://%s:%d/scan/$taskid/log" % (host, port)
    logger.debug(dbgMsg)

在 dbgMsg += "\n\t$ curl -H \"Content-Type: application/json\" -X POST -d ‘{\"url\": \"http://testphp.vulnweb.com/artists.php?artist=1\"}‘ http://%s:%d/scan/$taskid/start" % (host, port)這一行代碼表示開啟一個任務掃描,Content-Type: application/json為請求頭的部分;http://%s:%d/scan/$taskid/start 開啟的具體那個任務;-X POST -d ‘{\"url\": \"http://testphp.vulnweb.com/artists.php?artist=1\"}‘ post請求參數,其中只有url,沒有關於post請求的data參數,因此sqlmapapi只能進行get請求的sql injection 檢測。

0x02繼承IHttpListener接口(方法一),存在缺陷

先上源代碼:

from burp import IBurpExtender
from burp import IHttpListener
from java.io import PrintWriter
import re
import urllib
import urllib2
import time
import json
from threading import Thread
import requests

class BurpExtender(IBurpExtender, IHttpListener):

    #
    #implement IBurpExtender
    #
    def    registerExtenderCallbacks(self, callbacks):
        # keep a reference to our callbacks object
        self._callbacks = callbacks

        # set our extension name
        callbacks.setExtensionName("fanyingjie")

        # obtain our output stream
        self._stdout = PrintWriter(callbacks.getStdout(), True)

        self._helpers  = callbacks.getHelpers()

        # register ourselves as an
        callbacks.registerHttpListener(self)

    def processHttpMessage(self,toolFlag,messageIsRequest, messageInfo):
        if(messageIsRequest):
            a=self._helpers.analyzeRequest(messageInfo)
            method=a.getMethod()
            url=str(a.getUrl())
            if(("?" in url) and (method=="GET")):
                self._stdout.println("start")
                t=AutoSqli(target=url,stdout=self._stdout,method=method)
                t.run()

class AutoSqli(Thread):
    def __init__(self,target,stdout,method):
        self.server="http://192.168.159.134:8775"
        self.taskid = ‘‘
        self.target=target
        self.method=method
        self._stdout=stdout
        self.start_time = time.time()

    def task_new(self):
        self.taskid = json.loads(urllib2.urlopen(self.server + ‘/task/new‘).read())[‘taskid‘]
        self._stdout.println(‘Created new task: ‘ + self.taskid )
        if len(self.taskid) > 0:
            return True
        return False

    def task_delete(self):
        if json.loads(urllib2.urlopen(self.server + ‘/task/‘ + self.taskid + ‘/delete‘).read())[‘success‘]:
            self._stdout.println(‘[%s] Deleted task‘ % (self.taskid))
            return True
        return False

    def scan_start(self):
        headers = {‘Content-Type‘: ‘application/json‘}
        payload = {‘url‘:self.target}
        url = self.server + ‘/scan/‘ + self.taskid + ‘/start‘
        #t = json.loads(requests.post(url, data=json.dumps(payload), headers=headers).text)

        req=urllib2.Request(url,data=json.dumps(payload),headers=headers)
        t=json.loads(urllib2.urlopen(req).read())
        self._stdout.println("start "+ self.taskid)

        if len(str(t[‘engineid‘])) > 0 and t[‘success‘]:
            return True
        return False

    def scan_status(self):
        status = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/status‘).read())[‘status‘]
        if status == ‘running‘:
            return ‘running‘
        if status == ‘terminated‘:
            return ‘terminated‘
        return "error"

    def scan_data(self):

        data = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/data‘).read())[‘data‘]
        if len(data) == 0:
            self._stdout.println(‘not injection:\t‘ + self.target)
            return False
        else:
            self._stdout.println(‘injection:\t‘ + self.target)
            return True

    def scan_kill(self):
        json.loads(rurllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/kill‘).read())[‘success‘]
        self._stdout.println("%s kill")%(self.taskid)

    def scan_stop(self):
        json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/stop‘).read())[‘success‘]
        self._stdout.println("%s stop")%(self.taskid)

    def run(self):
        try:
            if not self.task_new():
                return False
            if not self.scan_start():
                return False
            while True:
                if self.scan_status() == ‘running‘:
                    time.sleep(10)
                elif self.scan_status() == ‘terminated‘:
                    break
                else:
                    break
                #print self.target + ":\t" + str(time.time() - self.start_time)
                if time.time() - self.start_time > 500:
                    self.scan_stop()
                    self.scan_kill()
                    break
            self.scan_data()
            #self.task_delete()

        except Exception as e:
            pass

在本插件中繼承了IHttpListener接口,繼承該接口後,每點擊一次鏈接就會執行一次插件,只有當插件完成執行完成後,才能進行下一步:
技術分享圖片
訪問鏈接後,瀏覽器一直等待回應,繼承IHttpListener接口的方法很影響測試效率,不過優點是可以檢查出存在註入的鏈接
接下來對該代碼進行簡單的講解一下:

        if(messageIsRequest):  #當包是請求包時執行sql injection檢查
            a=self._helpers.analyzeRequest(messageInfo)  #這是burp提供的一個函數,可以從請求包中獲取到url method header等
            method=a.getMethod()
            url=str(a.getUrl())
            if(("?" in url) and (method=="GET")): #當請求是get請求和鏈接中存在參數時進行sql injection 檢查
                self._stdout.println("start")
                t=AutoSqli(target=url,stdout=self._stdout,method=method)
                t.run()
class AutoSqli(Thread): 
    def __init__(self,target,stdout,method):
        self.server="http://192.168.159.134:8775"#開啟sqlmapapi服務的ip
        self.taskid = ‘‘
        self.target=target
        self.method=method
        self._stdout=stdout
        self.start_time = time.time()

    def task_new(self):#創建一個新的任務,並獲取taskid
        self.taskid = json.loads(urllib2.urlopen(self.server + ‘/task/new‘).read())[‘taskid‘]
        self._stdout.println(‘Created new task: ‘ + self.taskid )
        if len(self.taskid) > 0:
            return True
        return False

    def task_delete(self):#通過taskid刪除某一個任務
        if json.loads(urllib2.urlopen(self.server + ‘/task/‘ + self.taskid + ‘/delete‘).read())[‘success‘]:
            self._stdout.println(‘[%s] Deleted task‘ % (self.taskid))
            return True
        return False

    def scan_start(self):#開始一個掃描,傳入需要掃描的地址
        headers = {‘Content-Type‘: ‘application/json‘}
        payload = {‘url‘:self.target}
        url = self.server + ‘/scan/‘ + self.taskid + ‘/start‘
        #t = json.loads(requests.post(url, data=json.dumps(payload), headers=headers).text)

        req=urllib2.Request(url,data=json.dumps(payload),headers=headers)
        t=json.loads(urllib2.urlopen(req).read())
        self._stdout.println("start "+ self.taskid)

        if len(str(t[‘engineid‘])) > 0 and t[‘success‘]:
            return True
        return False

    def scan_status(self):#查看是否掃描完成,通過status判斷,terminated是掃描完成
        status = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/status‘).read())[‘status‘]
        if status == ‘running‘:
            return ‘running‘
        if status == ‘terminated‘:
            return ‘terminated‘
        return "error"

    def scan_data(self):#獲取掃描完成的結果,如果data有值表名存在註入,否則不存在

        data = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/data‘).read())[‘data‘]
        if len(data) == 0:
            self._stdout.println(‘not injection:\t‘ + self.target)
            return False
        else:
            self._stdout.println(‘injection:\t‘ + self.target)
            return True

    def scan_kill(self):
        json.loads(rurllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/kill‘).read())[‘success‘]
        self._stdout.println("%s kill")%(self.taskid)

    def scan_stop(self):
        json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/stop‘).read())[‘success‘]
        self._stdout.println("%s stop")%(self.taskid)

    def run(self):
        try:
            if not self.task_new():
                return False
            if not self.scan_start():
                return False
            while True:
                if self.scan_status() == ‘running‘:
                    time.sleep(10)
                elif self.scan_status() == ‘terminated‘:
                    break
                else:
                    break
                #print self.target + ":\t" + str(time.time() - self.start_time)
                if time.time() - self.start_time > 500:
                    self.scan_stop()
                    self.scan_kill()
                    break
            self.scan_data()
            #self.task_delete()

        except Exception as e:
            pass

在下一篇文章中,我會記錄一下第二種方法,可以在不影響測試效率的情況下,依然可以有效的檢查出註入;在設置一些sqlmapapi參數的前提下,比較一下sqlmapapi和burpsuite scanner模塊那個更有效率。

閱讀sqlmap源代碼,編寫burpsuite插件--sqlmapapi