angr原始碼分析——DFG 資料流圖
阿新 • • 發佈:2019-01-02
這篇文章主要講述,angr中資料流圖(Data Flow Gragh)的構建。
DFG恢復的是CFG中每個基本塊的資料流!
DFG為CFG的每個基本塊構建一個數據流圖(DFG)
DFG可以通過字典self.dfgs獲得,其中key的值為基本塊的地址,或DFG中的值。
param CFG:用於獲得所有基本塊的CFG
param annocfg:一個由向後片構建的註釋cfg,用於在白名單上構建DFG。
建構函式:
def __init__(self, cfg=None, annocfg=None):
"""
Build a Data Flow Grah (DFG) for every basic block of a CFG
The DFGs are available in the dict self.dfgs where the key
is a basic block addr and the value a DFG.
:param cfg: A CFG used to get all the basic blocks
:param annocfg: An AnnotatedCFG built from a backward slice used to only build the DFG on the whitelisted statements
"""
if cfg is None:
self._cfg = self.project.analyses.CFGAccurate()
else:
self._cfg = cfg
self._annocfg = annocfg
self.dfgs = self._construct()
如果沒有cfg就構建cfg。
然後,呼叫_construct()函式構建DFG。這個函式,有點長,不過也是構造資料流的主要函式。下面開始分析吧。
def _construct(self): """ We want to build the type of DFG that's used in "Automated Ident. of Crypto Primitives in Binary Code with Data Flow Graph Isomorphisms." Unlike that paper, however, we're building it on Vex IR instead of assembly instructions. """ cfg = self._cfg p = self.project dfgs = {} l.debug("Building Vex DFG...") for node in cfg.nodes():#遍歷每個節點 try: if node.simprocedure_name == None: irsb = p.factory.block(node.addr).vex #根據節點獲得irsb else: l.debug("Cannot process SimProcedures, ignoring %s" % node.simprocedure_name) continue except Exception as e: l.debug(e) continue tmpsnodes = {} storesnodes = {} putsnodes = {} statements = irsb.statements #獲取irsb的所有語句 dfg = DiGraph() for stmt_idx, stmt in enumerate(statements):#遍歷每條語句 # We want to skip over certain types, such as Imarks if self._need_to_ignore(node.addr, stmt, stmt_idx): continue # break statement down into sub-expressions exprs = stmt.expressions #獲得語句的子表示式 stmt_node = stmt dfg.add_node(stmt) if stmt.tag == 'Ist_WrTmp': tmpsnodes[stmt.tmp] = stmt_node if exprs[0].tag == 'Iex_Binop': if exprs[1].tag == 'Iex_RdTmp': dfg.add_edge(tmpsnodes[exprs[1].tmp], stmt_node) else: dfg.add_edge(exprs[1], stmt_node) if exprs[2].tag == 'Iex_RdTmp': dfg.add_edge(tmpsnodes[exprs[2].tmp], stmt_node) else: dfg.add_edge(exprs[2], stmt_node) elif exprs[0].tag == 'Iex_Unop': dfg.remove_node(stmt_node) if exprs[1].tag == 'Iex_RdTmp': tmpsnodes[stmt.tmp] = copy(tmpsnodes[exprs[1].tmp]) tmpsnodes[stmt.tmp].tmp = stmt.tmp else: tmpsnodes[stmt.tmp] = exprs[1] elif exprs[0].tag == 'Iex_RdTmp': tmpsnodes[stmt.tmp] = copy(tmpsnodes[exprs[0].tmp]) tmpsnodes[stmt.tmp].tmp = stmt.tmp elif exprs[0].tag == 'Iex_Get': if putsnodes.has_key(exprs[0].offset): dfg.add_edge(putsnodes[exprs[0].offset], stmt_node) if len(exprs) > 1 and exprs[1].tag == "Iex_RdTmp": dfg.add_edge(tmpsnodes[exprs[1].tmp], stmt_node) elif len(exprs) > 1: dfg.add_edge(exprs[1], stmt_node) elif exprs[0].tag == 'Iex_Load': if exprs[1].tag == 'Iex_RdTmp': dfg.add_edge(tmpsnodes[exprs[1].tmp], stmt_node) else: dfg.add_edge(exprs[1], stmt_node) else: # Take a guess by assuming exprs[0] is the op and any other expressions are args for e in exprs[1:]: if e.tag == 'Iex_RdTmp': dfg.add_edge(tmpsnodes[e.tmp], stmt_node) else: dfg.add_edge(e, stmt_node) elif stmt.tag == 'Ist_Store': if exprs[0].tag == 'Iex_RdTmp': dfg.add_edge(tmpsnodes[exprs[0].tmp], stmt_node) elif exprs[0].tag == 'Iex_Const': dfg.add_edge(exprs[0], stmt_node) if exprs[1].tag == 'Iex_RdTmp': dfg.add_edge(tmpsnodes[exprs[1].tmp], stmt_node) else: dfg.add_edge(exprs[1], stmt_node) elif stmt.tag == 'Ist_Put': if exprs[0].tag == 'Iex_RdTmp': dfg.add_edge(tmpsnodes[exprs[0].tmp], stmt_node) elif exprs[0].tag == 'Iex_Const': dfg.add_edge(exprs[0], stmt_node) putsnodes[stmt.offset] = stmt_node elif stmt.tag == 'Ist_Exit': if exprs[0].tag == 'Iex_RdTmp': dfg.add_edge(tmpsnodes[exprs[0].tmp], stmt_node) elif stmt.tag == 'Ist_Dirty': tmpsnodes[stmt.tmp] = stmt_node elif stmt.tag == 'Ist_CAS': tmpsnodes[stmt.oldLo] = stmt_node else: for e in stmt.expressions: if e.tag == 'Iex_RdTmp': dfg.add_edge(tmpsnodes[e.tmp], stmt_node) else: dfg.add_edge(e, stmt_node) for vtx in list(dfg.nodes()): if dfg.degree(vtx) == 0: dfg.remove_node(vtx) if dfg.size() > 0: dfgs[node.addr] = dfg return dfgs
根據不同statements的型別,標記不同的點。
其實不僅可以用cfg來恢復資料流圖,任意的一個block都可以利用這個方法恢復資料流。
唯一的遺憾就是,恢復的資料流是block的,要想恢復函式間的資料流,就應該恢復資料依賴圖。
下面是我寫的恢復任意一個block的測試程式碼,當然呼叫仍然是construct函式。
def main(): proj = angr.Project("test2.bin",load_options={'auto_load_libs':False}) start_addr=0x1F065405 start_state= proj.factory.blank_state(addr=start_addr) addrs=[start_addr] dfgs=constructDFG(addrs,proj) print len(dfgs) plot_common(dfgs[start_addr],"dfg_1F065405")
constructDFG只是對construct的一點更改
def constructDFG(addrs,project):
dfgs={}
for addr in addrs:
irsb = project.factory.block(addr).vex
if irsb is not None:
tmpsnodes = {}
storesnodes = {}
putsnodes = {}
statements = irsb.statements
dfg = DiGraph()
for stmt_idx, stmt in enumerate(statements):
# break statement down into sub-expressions
exprs = stmt.expressions
stmt_node = stmt
dfg.add_node(stmt)
if stmt.tag == 'Ist_WrTmp':
tmpsnodes[stmt.tmp] = stmt_node
if exprs[0].tag == 'Iex_Binop':
if exprs[1].tag == 'Iex_RdTmp':
dfg.add_edge(tmpsnodes[exprs[1].tmp], stmt_node)
else:
dfg.add_edge(exprs[1], stmt_node)
if exprs[2].tag == 'Iex_RdTmp':
dfg.add_edge(tmpsnodes[exprs[2].tmp], stmt_node)
else:
dfg.add_edge(exprs[2], stmt_node)
elif exprs[0].tag == 'Iex_Unop':
dfg.remove_node(stmt_node)
if exprs[1].tag == 'Iex_RdTmp':
tmpsnodes[stmt.tmp] = copy(tmpsnodes[exprs[1].tmp])
tmpsnodes[stmt.tmp].tmp = stmt.tmp
else:
tmpsnodes[stmt.tmp] = exprs[1]
elif exprs[0].tag == 'Iex_RdTmp':
tmpsnodes[stmt.tmp] = copy(tmpsnodes[exprs[0].tmp])
tmpsnodes[stmt.tmp].tmp = stmt.tmp
elif exprs[0].tag == 'Iex_Get':
if putsnodes.has_key(exprs[0].offset):
dfg.add_edge(putsnodes[exprs[0].offset], stmt_node)
if len(exprs) > 1 and exprs[1].tag == "Iex_RdTmp":
dfg.add_edge(tmpsnodes[exprs[1].tmp], stmt_node)
elif len(exprs) > 1:
dfg.add_edge(exprs[1], stmt_node)
elif exprs[0].tag == 'Iex_Load':
if exprs[1].tag == 'Iex_RdTmp':
dfg.add_edge(tmpsnodes[exprs[1].tmp], stmt_node)
else:
dfg.add_edge(exprs[1], stmt_node)
else:
# Take a guess by assuming exprs[0] is the op and any other expressions are args
for e in exprs[1:]:
if e.tag == 'Iex_RdTmp':
dfg.add_edge(tmpsnodes[e.tmp], stmt_node)
else:
dfg.add_edge(e, stmt_node)
elif stmt.tag == 'Ist_Store':
if exprs[0].tag == 'Iex_RdTmp':
dfg.add_edge(tmpsnodes[exprs[0].tmp], stmt_node)
elif exprs[0].tag == 'Iex_Const':
dfg.add_edge(exprs[0], stmt_node)
if exprs[1].tag == 'Iex_RdTmp':
dfg.add_edge(tmpsnodes[exprs[1].tmp], stmt_node)
else:
dfg.add_edge(exprs[1], stmt_node)
elif stmt.tag == 'Ist_Put':
if exprs[0].tag == 'Iex_RdTmp':
dfg.add_edge(tmpsnodes[exprs[0].tmp], stmt_node)
elif exprs[0].tag == 'Iex_Const':
dfg.add_edge(exprs[0], stmt_node)
putsnodes[stmt.offset] = stmt_node
elif stmt.tag == 'Ist_Exit':
if exprs[0].tag == 'Iex_RdTmp':
dfg.add_edge(tmpsnodes[exprs[0].tmp], stmt_node)
elif stmt.tag == 'Ist_Dirty':
tmpsnodes[stmt.tmp] = stmt_node
elif stmt.tag == 'Ist_CAS':
tmpsnodes[stmt.oldLo] = stmt_node
else:
for e in stmt.expressions:
if e.tag == 'Iex_RdTmp':
dfg.add_edge(tmpsnodes[e.tmp], stmt_node)
else:
dfg.add_edge(e, stmt_node)
for vtx in list(dfg.nodes()):
if dfg.degree(vtx) == 0:
dfg.remove_node(vtx)
if dfg.size() > 0:
dfgs[addr] = dfg
return dfgs
最終畫出的圖為: