Datacon Competition Record
Created November 20, 2023
1. 简介
2022年12月当时在打物联网安全赛道,用实验室先前论文SFuzz的代码改改,能较好地解决RTOS固件的相关题目,但第三题整数溢出漏洞检测在短时间内写出来的工具还是效果不佳。
2023年11月又带学弟们打了漏洞分析赛道,前两题基本是苦力活,第三题挖0day因为没提前买设备and固件模拟不熟练导致写不出exp,略遗憾。
后面如果再打Datacon的话就只想直接做第三题了(瘫
2. IDA脚本
人工审计二进制代码漏洞时,需要快速判断source点到sink点之间是否存在call trace。写了个简单的IDA脚本实现这一功能。
import idautils
import idc
import ida_idp
import ida_funcs
import ida_nalt
import time
log_file = None
#
# Basic Tools
#
def printh(num):
# h = hex
print(hex(num))
def printh_list(nums):
for num in nums:
printh(num)
def printf(func_ea):
# f = funtion
name = idc.get_func_name(func_ea)
if name:
print("%s(%s)" % (name, hex(func_ea)))
def get_addr_32(num)->str:
s = hex(num)[2:]
return '0x' + s.rjust(8, '0')
def is_func_entry(ea):
func = ida_funcs.get_func(ea)
if func and func.start_ea == ea:
return True
else:
return False
def parse_func(func):
# distinguish the parameter
func_name = ""
func_entry = BADADDR
if type(func) == str:
func_name = func
func_entry = get_name_ea(BADADDR, func_name)
if type(func) == int and func != BADADDR:
f_t = ida_funcs.get_func(func)
if not f_t:
return func_name, func_entry
func_entry = f_t.start_ea
func_name = get_func_name(func_entry)
# if type(func) == ida_funcs.func_t
return func_name, func_entry
def parse_func_entry(func):
NULL, entry = parse_func(func)
return entry
def parse_func_name(func):
name, NULL = parse_func(func)
return name
#
# Control Flow Tools
#
call_map = {}
# get call-sites!
def get_caller(func):
callers = []
dot_data_ref = []
func_entry = parse_func_entry(func)
if func_entry == BADADDR:
return callers
# traverse callers
caller = get_first_fcref_to(func_entry)
while caller != BADADDR and caller != func_entry:
callers.append(caller)
caller = get_next_fcref_to(func_entry, caller)
# traverse function pointers
pointer = get_first_dref_to(func_entry)
while pointer != BADADDR:
if ida_funcs.get_func(pointer):
callers.append(pointer)
else:
dot_data_ref.append(pointer)
pointer = get_next_dref_to(func_entry, pointer)
return callers
def get_callee(func):
callees = []
call_sites = {}
func_entry = parse_func_entry(func)
if func_entry == BADADDR:
return call_sites
# traverse every instruction in func
items = list(FuncItems(func_entry))
for item_ea in items:
# identify function pointer
# ToDo: 还是处理不了VxWorks特有的间接函数指针
data = get_first_dref_from(item_ea)
if is_func_entry(data):
callees.append(data)
call_sites[item_ea] = data
# printf(data)
# identify function call
if not ida_idp.is_call_insn(item_ea):
continue
else:
callee = get_first_fcref_from(item_ea)
callees.append(callee)
call_sites[item_ea] = callee
# printf(callee)
return call_sites
# DFS
# updating call_map
def build_call_map(start_func):
# check
start_func = parse_func_entry(start_func)
if start_func in call_map:
return
if start_func == BADADDR: #
call_map[start_func] = {}
return
# build
call_map[start_func] = {}
callees = get_callee(start_func)
for call_site, callee in callees.items():
# printf(callee)
call_map[start_func][call_site] = callee
if callee not in call_map:
build_call_map(callee)
# DFS
# using call_map
# "source" is the function containing source point
def check_connection(source, sink):
result = False
source_entry = parse_func_entry(source)
sink_entry = parse_func_entry(sink)
build_call_map(source_entry)
for call_site, callee in call_map[source_entry].items():
if callee == sink_entry:
return True
else:
result = check_connection(callee, sink)
if result:
return True
return result
def print_path(path, simple):
# "path" is a list of (call_site, callee)
result = ""
if len(path) == 0:
return
node = path[0]
if not simple:
# result += "(%s)%s" % (hex(node[1]), parse_func_name(node[1]))
result += "%s" % ( parse_func_name(node[1]) )
for i in range(len(path)-1):
node = path[i+1]
result += " -> "
# result += "%s(%s, ref at %s)" % (parse_func_name(node[1]), hex(node[1]), hex(node[0]))
result += "(%s)%s" % (hex(node[0]), parse_func_name(node[1]))
else:
result += parse_func_name(node)
for i in range(len(path)-1):
node = path[i+1]
result += " -> "
result += parse_func_name(node)
print(result)
global log_file
if log_file:
log_file.write(result + "\n")
# DFS
# using call_map
# "func" is the function containing source point
def find_path(func, sink_list, path=[], simple=False):
result = False
func_name, func_entry = parse_func(func)
# init check
if len(path) == 0:
init_node = func_entry if simple else (BADADDR, func_entry)
path.append(init_node)
# termination check
if func_entry == BADADDR:
return False
if (func_name in sink_list) or (func_name[0]=="." and func_name[1:] in sink_list): # x86 libc func
print_path(path, simple)
return True
# find path, "path" is a list of (call_site, callee)
build_call_map(func_entry)
explored_callees = set()
for call_site, callee in call_map[func_entry].items():
if not simple:
# 调用路径包含call-site信息
if callee in [x[1] for x in path]: # 跳过path上已遍历到的函数
continue
tmp = find_path(callee, sink_list, path + [(call_site, callee)], simple)
result = result or tmp
explored_callees.add(callee)
else:
# 调用路径只包含函数名信息,且同一个callee的多次调用看作一次
if callee in path or callee in explored_callees:
continue
tmp = find_path(callee, sink_list, path + [callee], simple)
result = result or tmp
explored_callees.add(callee)
return result
# using call_map
def find_source_to_sink(source_list, sink_list, simple=False, black_list=[]):
explored_caller_functions = set()
unsafe_functions = set()
for source in source_list:
source_name = parse_func_name(source)
callers = get_caller(source)
print("\n%s is referenced %d times" % (source_name, len(callers)))
for caller in callers:
func = parse_func_entry(caller)
func_name = parse_func_name(func)
if func_name in black_list:
continue
if func == BADADDR:
continue
# simple模式下,探索过的caller函数的不再探索
if simple and func in explored_caller_functions:
continue
print("[%s referenced at %s (0x%x)]: " % (source_name, func_name, caller))
result = find_path(func, sink_list, [], simple) #
explored_caller_functions.add(func)
if result:
unsafe_functions.add(func)
return len(unsafe_functions)
def check_to_sinks(func, sink_list):
for sink in sink_list:
if check_connection(func, sink):
return True
return False
# For Datacon2023 Challenge2
def get_datacon_callsites():
# Call bof_sinks or self-defined functions.
sum = 0
candidate_funcs = set() # entry addrs
call_sites = set()
int_ovf = ["malloc"] #+ ["calloc", "realloc"]
bof_sinks = ['strcpy','strcat','strncpy','memcpy','memmove','snprintf']
# 1. Get candidate functions!
for func in idautils.Functions():
sum+=1
func_name, func_entry = parse_func(func)
func_obj = ida_funcs.get_func(func_entry)
if not func_obj:
print("Failed to get function object:", func_name)
func_size = ida_funcs.calc_func_size(func_obj)
# if func_name in bof_sinks:
if func_name in bof_sinks or func_size > 8:
candidate_funcs.add(func_entry)
# 2. Get call sites!
for func_entry in candidate_funcs:
func_name = parse_func_name(func_entry)
callers = get_caller(func_entry)
call_sites.update(callers)
# 3. Print all answers.
for addr in call_sites:
addr32 = get_addr_32(addr)
print(FileName+":"+addr32)
print("Function Sum:", sum)
print("Candidates:", len(candidate_funcs))
print("Call Sites:", len(call_sites))
if __name__ == "__main__":
FileName = ""
FilePath = ida_nalt.get_input_file_path()
if '/' in FilePath:
FileName = FilePath.split('/')[-1]
elif '\\' in FilePath:
FileName = FilePath.split('\\')[-1]
else:
FileName = FilePath
start_time = int(time.time())
print("START!")
socket_list = ["socket", "connect", "bind", "listen", "accept", "recv", "recvfrom"]
source = ["recv", "SSL_read"]
# Usage: find_path, find_source_to_sink, check_to_sinks, get_caller, get_callee.
get_datacon_callsites()
if log_file :
log_file.close()
print('')
end_time = int(time.time())
print("Usage: " + str(end_time - start_time) + "s")
print("EXIT!\n")