resultbrowser: remove circular detaildealer import

This commit is contained in:
2026-01-18 20:58:13 +01:00
parent eaba782ca6
commit 4f65e53392
5 changed files with 202 additions and 95 deletions

View File

@ -1,9 +1,12 @@
from pprint import pprint # from pprint import pprint
from . import details
from . import model # from . import details
from app.detaildealer import detaildealer
def scrub(table_name): def scrub(table_name):
return ''.join( chr for chr in table_name if chr.isalnum() or chr == '_' ) return "".join(chr for chr in table_name if chr.isalnum() or chr == "_")
class Resulttype: class Resulttype:
def __init__(self, name, count): def __init__(self, name, count):
@ -16,6 +19,7 @@ class Resulttype:
def getCount(self): def getCount(self):
return self.count return self.count
class Variant: class Variant:
def __init__(self, id, name, table, benchmark, detail): def __init__(self, id, name, table, benchmark, detail):
self.id = id self.id = id
@ -26,7 +30,6 @@ class Variant:
self.results = {} self.results = {}
self.totalresults = 0 self.totalresults = 0
def getMapper(self): def getMapper(self):
mapper = self.benchmark.getMapper() mapper = self.benchmark.getMapper()
if not mapper: # try benchmark mapper if not mapper: # try benchmark mapper
@ -34,10 +37,9 @@ class Variant:
if not mapper: # of not there, try parent tables mapper if not mapper: # of not there, try parent tables mapper
mapper = self.parenttable.getMapper() mapper = self.parenttable.getMapper()
if not mapper: # no mapper found at all, try default mapper if not mapper: # no mapper found at all, try default mapper
mapper = model.detaildealer.getDefaultMapper() mapper = detaildealer.getDefaultMapper()
return mapper return mapper
def addResulttype(self, name, count): def addResulttype(self, name, count):
mapper = self.getMapper() mapper = self.getMapper()
label = mapper.getLabel(name) label = mapper.getLabel(name)
@ -70,7 +72,16 @@ class Variant:
return self.totalresults return self.totalresults
def __str__(self): def __str__(self):
ret = "Variant: " + self.getDetails().getTitle() + " - " + self.getBenchmarkDetails().getTitle() +" (id: " + str( self.id )+ ")" + " " ret = (
"Variant: "
+ self.getDetails().getTitle()
+ " - "
+ self.getBenchmarkDetails().getTitle()
+ " (id: "
+ str(self.id)
+ ")"
+ " "
)
ret += "Total Results: " + str(self.totalresults) + "\n" ret += "Total Results: " + str(self.totalresults) + "\n"
for v in self.results: for v in self.results:
ret += "\t" + v.name + ": " + str(v.count) + "\n" ret += "\t" + v.name + ": " + str(v.count) + "\n"
@ -78,9 +89,11 @@ class Variant:
__repr__ = __str__ __repr__ = __str__
'''A ResultTable contains n Variants'''
class ResultTable:
"""A ResultTable contains n Variants"""
class ResultTable:
def __init__(self, name, cfg): def __init__(self, name, cfg):
self.name = scrub(name) self.name = scrub(name)
self.details = cfg.getTable(name) self.details = cfg.getTable(name)
@ -113,9 +126,13 @@ class ResultTable:
for k, v in self.variants.items(): for k, v in self.variants.items():
ret += "\t" + str(v) + "\n" ret += "\t" + str(v) + "\n"
return ret return ret
__repr__ = __str__ __repr__ = __str__
'''Overview has n ResultTables'''
"""Overview has n ResultTables"""
class Overview: class Overview:
def __init__(self): def __init__(self):
self.tables = {} self.tables = {}
@ -139,5 +156,3 @@ class Overview:
def length(self): def length(self):
return len(self.tables) return len(self.tables)

View File

@ -0,0 +1,4 @@
# Instantiate global detail dealer, will be initialized in reloadOverview
from app import details
detaildealer = details.DetailDealer()

View File

@ -1,31 +1,60 @@
#!/usr/bin/env python #!/usr/bin/env python
import MySQLdb
import MySQLdb.cursors
import yaml
import sys
import os.path import os.path
from pprint import pprint # import yaml
from . import data import sys
from . import details
import MySQLdb
import MySQLdb.cursors
from app.detaildealer import detaildealer
# from pprint import pprint
from . import data, details
"""Get command line options""" """Get command line options"""
from optparse import OptionParser from optparse import OptionParser
parser = OptionParser() parser = OptionParser()
parser.add_option("-c", "--conf", type="string", help="MySQL config file", dest="config", default= os.path.join(os.path.expanduser("~"),".my.cnf")) parser.add_option(
parser.add_option("-s", "--host", type="string", help="Webserver hostname", dest="host", default="localhost") "-c",
parser.add_option("-d", "--details", type="string", help="Detailed information (YAML configuration file)", dest="details", default=None) "--conf",
parser.add_option("-p", "--port", type="string", help="Webserver port", dest="port", default="5000") type="string",
help="MySQL config file",
dest="config",
default=os.path.join(os.path.expanduser("~"), ".my.cnf"),
)
parser.add_option(
"-s",
"--host",
type="string",
help="Webserver hostname",
dest="host",
default="localhost",
)
parser.add_option(
"-d",
"--details",
type="string",
help="Detailed information (YAML configuration file)",
dest="details",
default=None,
)
parser.add_option(
"-p", "--port", type="string", help="Webserver port", dest="port", default="5000"
)
opts, args = parser.parse_args() opts, args = parser.parse_args()
"""Check if configuration files exist""" """Check if configuration files exist"""
def checkConfigFile(msg, fname): def checkConfigFile(msg, fname):
if not os.path.isfile(fname): if not os.path.isfile(fname):
sys.exit("Error: '" + fname + "' not found") sys.exit("Error: '" + fname + "' not found")
else: else:
print(msg, "->", fname) print(msg, "->", fname)
# Check sql config # Check sql config
sqlconfig = opts.config sqlconfig = opts.config
checkConfigFile("MySQL config", sqlconfig) checkConfigFile("MySQL config", sqlconfig)
@ -35,50 +64,74 @@ if opts.details:
checkConfigFile("Details", opts.details) checkConfigFile("Details", opts.details)
# Instantiate global detail dealer, will be initialized in reloadOverview # Instantiate global detail dealer, will be initialized in reloadOverview
detaildealer = details.DetailDealer() # detaildealer = details.DetailDealer()
"""Remove all characters from string except alphanuermics and _""" """Remove all characters from string except alphanuermics and _"""
def scrub(table_name): def scrub(table_name):
return ''.join( chr for chr in table_name if chr.isalnum() or chr == '_' ) return "".join(chr for chr in table_name if chr.isalnum() or chr == "_")
"""Global mysql handles""" """Global mysql handles"""
db = None db = None
cur = None cur = None
def loadSession(dbconf): def loadSession(dbconf):
global db global db
if db: if db:
db.close() db.close()
db = MySQLdb.connect(read_default_file=dbconf, cursorclass=MySQLdb.cursors.DictCursor) db = MySQLdb.connect(
read_default_file=dbconf, cursorclass=MySQLdb.cursors.DictCursor
)
return db.cursor() return db.cursor()
def closeSession(): def closeSession():
if cur: cur.close() if cur:
cur.close()
global db global db
db.close() db.close()
db = None db = None
'''Populate variant results for overview data''' """Populate variant results for overview data"""
def getVariants(cur, table): def getVariants(cur, table):
restbl = table.getDetails().getDBName() restbl = table.getDetails().getDBName()
cur.execute("""SELECT sum((t.time2 - t.time1 + 1) * width) AS total, resulttype,variant, v.id as variant_id, benchmark, details FROM variant v JOIN trace t ON v.id = t.variant_id JOIN fspgroup g ON g.variant_id = t.variant_id AND g.instr2 = t.instr2 AND g.data_address = t.data_address JOIN %s r ON r.pilot_id = g.pilot_id JOIN fsppilot p ON r.pilot_id = p.id GROUP BY v.id, resulttype, details""" % (restbl)) # % is used here, as a tablename must not be quoted cur.execute(
"""SELECT sum((t.time2 - t.time1 + 1) * width) AS total, resulttype,variant, v.id as variant_id, benchmark, details FROM variant v JOIN trace t ON v.id = t.variant_id JOIN fspgroup g ON g.variant_id = t.variant_id AND g.instr2 = t.instr2 AND g.data_address = t.data_address JOIN %s r ON r.pilot_id = g.pilot_id JOIN fsppilot p ON r.pilot_id = p.id GROUP BY v.id, resulttype, details"""
% (restbl)
) # % is used here, as a tablename must not be quoted
res = cur.fetchall() res = cur.fetchall()
rdic = {} rdic = {}
# Build dict with variant id as key # Build dict with variant id as key
for r in res: for r in res:
# if variant entry already exists: # if variant entry already exists:
variant = table.getVariant(int(r['variant_id'])) variant = table.getVariant(int(r["variant_id"]))
if not variant: # if variant did not exist yet, create it: if not variant: # if variant did not exist yet, create it:
variant_details = detaildealer.getVariant(restbl, r['variant']) variant_details = detaildealer.getVariant(restbl, r["variant"])
benchmark_details = detaildealer.getBenchmark(restbl, r['variant'], r['benchmark']) benchmark_details = detaildealer.getBenchmark(
restbl, r["variant"], r["benchmark"]
)
table_details = detaildealer.getTable(restbl) table_details = detaildealer.getTable(restbl)
variant = data.Variant(int(r['variant_id']), r['variant'], table_details, benchmark_details, variant_details) variant = data.Variant(
variant.addResulttype(r['resulttype'], r['total']) int(r["variant_id"]),
r["variant"],
table_details,
benchmark_details,
variant_details,
)
variant.addResulttype(r["resulttype"], r["total"])
table.addVariant(variant) table.addVariant(variant)
'''Get overview data for index page'''
"""Get overview data for index page"""
def reloadOverview(): def reloadOverview():
overview = data.Overview() overview = data.Overview()
detaildealer.reload(opts.details) detaildealer.reload(opts.details)
@ -94,28 +147,37 @@ def reloadOverview():
overview.add(table) overview.add(table)
# Check if objdump table exists # Check if objdump table exists
cur.execute("SHOW TABLES like 'objdump'") cur.execute("SHOW TABLES like 'objdump'")
objdump_exists = (len(cur.fetchall()) == 1) objdump_exists = len(cur.fetchall()) == 1
closeSession() closeSession()
return overview, objdump_exists return overview, objdump_exists
"""Load overview data at server startup""" """Load overview data at server startup"""
print("Loading overview data from database. This may take a while ...") print("Loading overview data from database. This may take a while ...")
overview_data, objdump_exists = reloadOverview() overview_data, objdump_exists = reloadOverview()
print("done.") print("done.")
## Get overview data for views.index() ## Get overview data for views.index()
def getOverview(): def getOverview():
return overview_data return overview_data
def objdumpExists(): def objdumpExists():
return objdump_exists return objdump_exists
"""Get Results for one variant id""" """Get Results for one variant id"""
def getVariantResult(table, variantid): def getVariantResult(table, variantid):
cur = loadSession(sqlconfig) cur = loadSession(sqlconfig)
restbl = scrub(table) restbl = scrub(table)
stmt = "SELECT resulttype, count(*) as total from %s r join fsppilot on r.pilot_id=fsppilot.id join variant on fsppilot.variant_id=variant.id" % (restbl) stmt = (
"SELECT resulttype, count(*) as total from %s r join fsppilot on r.pilot_id=fsppilot.id join variant on fsppilot.variant_id=variant.id"
% (restbl)
)
where = " WHERE variant.id = %s group by resulttype ORDER BY resulttype " where = " WHERE variant.id = %s group by resulttype ORDER BY resulttype "
stmt = stmt + where stmt = stmt + where
cur.execute(stmt, variantid) cur.execute(stmt, variantid)
@ -123,10 +185,13 @@ def getVariantResult(table, variantid):
closeSession() closeSession()
return res return res
'''Show objdump together with according injection result types.'''
"""Show objdump together with according injection result types."""
def getCode(result_table, variant_id, resultlabel=None): def getCode(result_table, variant_id, resultlabel=None):
result_table = scrub(result_table) result_table = scrub(result_table)
filt = '' filt = ""
if not variant_id or not result_table: if not variant_id or not result_table:
return None return None
variant = overview_data.getVariantById(variant_id) variant = overview_data.getVariantById(variant_id)
@ -143,7 +208,10 @@ def getCode(result_table, variant_id, resultlabel=None):
# I especially like this one: # I especially like this one:
select = "SELECT instr_address, opcode, disassemble, comment, sum(t.time2 - t.time1 + 1) as totals, GROUP_CONCAT(DISTINCT resulttype SEPARATOR ', ') as results FROM variant v " select = "SELECT instr_address, opcode, disassemble, comment, sum(t.time2 - t.time1 + 1) as totals, GROUP_CONCAT(DISTINCT resulttype SEPARATOR ', ') as results FROM variant v "
join = " JOIN trace t ON v.id = t.variant_id JOIN fspgroup g ON g.variant_id = t.variant_id AND g.instr2 = t.instr2 AND g.data_address = t.data_address JOIN %s r ON r.pilot_id = g.pilot_id JOIN fsppilot p ON r.pilot_id = p.id JOIN objdump ON objdump.variant_id = v.id AND objdump.instr_address = injection_instr_absolute " %(scrub(result_table)) join = (
" JOIN trace t ON v.id = t.variant_id JOIN fspgroup g ON g.variant_id = t.variant_id AND g.instr2 = t.instr2 AND g.data_address = t.data_address JOIN %s r ON r.pilot_id = g.pilot_id JOIN fsppilot p ON r.pilot_id = p.id JOIN objdump ON objdump.variant_id = v.id AND objdump.instr_address = injection_instr_absolute "
% (scrub(result_table))
)
where = "WHERE v.id = %s " where = "WHERE v.id = %s "
group = "GROUP BY injection_instr_absolute ORDER BY totals DESC " group = "GROUP BY injection_instr_absolute ORDER BY totals DESC "
@ -156,24 +224,32 @@ def getCode(result_table, variant_id, resultlabel=None):
resulttypes = variant.getResultLabels() resulttypes = variant.getResultLabels()
return dump, resulttypes return dump, resulttypes
def getCodeExcerpt(variant_id, instr_addr): def getCodeExcerpt(variant_id, instr_addr):
code = {} code = {}
limit = 8 limit = 8
cur = loadSession(sqlconfig) cur = loadSession(sqlconfig)
cur.execute( """(SELECT instr_address, opcode, disassemble, comment FROM objdump \ cur.execute(
"""(SELECT instr_address, opcode, disassemble, comment FROM objdump \
WHERE instr_address < %s AND variant_id = %s \ WHERE instr_address < %s AND variant_id = %s \
ORDER BY instr_address DESC LIMIT %s) \ ORDER BY instr_address DESC LIMIT %s) \
ORDER BY instr_address ASC""" , (instr_addr, variant_id, limit)) ORDER BY instr_address ASC""",
(instr_addr, variant_id, limit),
)
below = cur.fetchall() below = cur.fetchall()
code['below'] = below code["below"] = below
cur.execute("""SELECT instr_address, opcode, disassemble, comment FROM objdump \ cur.execute(
"""SELECT instr_address, opcode, disassemble, comment FROM objdump \
WHERE instr_address >= %s AND variant_id = %s \ WHERE instr_address >= %s AND variant_id = %s \
ORDER BY instr_address ASC LIMIT %s""", (instr_addr, variant_id, limit+1)) ORDER BY instr_address ASC LIMIT %s""",
(instr_addr, variant_id, limit + 1),
)
upper = cur.fetchall() upper = cur.fetchall()
code['upper'] = upper code["upper"] = upper
closeSession() closeSession()
return code return code
def getResultsbyInstruction(result_table, variant_id, instr_addr, resultlabel=None): def getResultsbyInstruction(result_table, variant_id, instr_addr, resultlabel=None):
restypefilter = None restypefilter = None
if resultlabel: if resultlabel:
@ -187,7 +263,10 @@ def getResultsbyInstruction(result_table, variant_id, instr_addr, resultlabel=No
restypefilter += "resulttype = '" + dbn + "' OR " restypefilter += "resulttype = '" + dbn + "' OR "
restypefilter += "resulttype = '" + dbnames[-1] + "' ) " restypefilter += "resulttype = '" + dbnames[-1] + "' ) "
select = "SELECT bitoffset as 'Bit Offset', hex(injection_instr_absolute) as 'Instruction Address', hex(original_value) as 'Original Value', hex(data_address) as 'Data Address', resulttype as 'Result Type', details as 'Details' from %s " % scrub(result_table) select = (
"SELECT bitoffset as 'Bit Offset', hex(injection_instr_absolute) as 'Instruction Address', hex(original_value) as 'Original Value', hex(data_address) as 'Data Address', resulttype as 'Result Type', details as 'Details' from %s "
% scrub(result_table)
)
join = "JOIN fsppilot ON pilot_id = fsppilot.id " join = "JOIN fsppilot ON pilot_id = fsppilot.id "
where = "WHERE variant_id = %s and injection_instr_absolute = %s " where = "WHERE variant_id = %s and injection_instr_absolute = %s "
order = "ORDER BY data_address, bitoffset" order = "ORDER BY data_address, bitoffset"
@ -204,8 +283,7 @@ def getResultsbyInstruction(result_table, variant_id, instr_addr, resultlabel=No
closeSession() closeSession()
return res return res
def showDBstatus(): def showDBstatus():
res = "TODO" res = "TODO"
return res return res

View File

@ -1,42 +1,53 @@
from flask import render_template, request from flask import render_template, request
from app import app from app import app
# import model # import model
# import data # import data
from . import model from . import model
from . import data
@app.route('/') # from . import data
@app.route('/index')
@app.route("/")
@app.route("/index")
def index(): def index():
reload_overview = request.args.get('reload', False) reload_overview = request.args.get("reload", False)
if reload_overview: if reload_overview:
print("Reloading overview...") print("Reloading overview...")
model.reloadOverview() model.reloadOverview()
return render_template("index.html", overview=model.getOverview(), objdump_there = model.objdumpExists()) return render_template(
"index.html", overview=model.getOverview(), objdump_there=model.objdumpExists()
)
@app.route('/code')
@app.route("/code")
def code(): def code():
variant_id = request.args.get('variant_id', None) variant_id = request.args.get("variant_id", None)
resulttype = request.args.get('resulttype', None) resulttype = request.args.get("resulttype", None)
table = request.args.get('table', None) table = request.args.get("table", None)
res, restypes = model.getCode(table, variant_id, resulttype) res, restypes = model.getCode(table, variant_id, resulttype)
var_dets = model.getOverview().getVariantById(variant_id) var_dets = model.getOverview().getVariantById(variant_id)
return render_template("code.html", results=res, resulttypes=restypes, variant_details=var_dets ) return render_template(
"code.html", results=res, resulttypes=restypes, variant_details=var_dets
)
@app.route('/instr_details')
@app.route("/instr_details")
def instr_details(): def instr_details():
table = request.args.get('table', None) table = request.args.get("table", None)
variant_id = request.args.get('variant_id', None) variant_id = request.args.get("variant_id", None)
instr_addr = request.args.get('instr_address', None) instr_addr = request.args.get("instr_address", None)
resulttype = request.args.get('resulttype', None) resulttype = request.args.get("resulttype", None)
codeexcerpt = model.getCodeExcerpt(variant_id, instr_addr) codeexcerpt = model.getCodeExcerpt(variant_id, instr_addr)
var_dets = model.getOverview().getVariantById(variant_id) var_dets = model.getOverview().getVariantById(variant_id)
results = model.getResultsbyInstruction(table, variant_id, instr_addr, resulttype) results = model.getResultsbyInstruction(table, variant_id, instr_addr, resulttype)
return render_template("instr_details.html", code=codeexcerpt, result=results, variant_details=var_dets) return render_template(
"instr_details.html", code=codeexcerpt, result=results, variant_details=var_dets
)
@app.route('/about')
@app.route("/about")
def about(): def about():
stat = model.showDBstatus() stat = model.showDBstatus()
return render_template("about.html", status=stat) return render_template("about.html", status=stat)

View File

@ -1,5 +1,4 @@
#!/usr/bin/env python #!/usr/bin/env python
from app import app from app import app, model
from app import model
app.run(debug=False, port=int(model.opts.port), host=model.opts.host) app.run(debug=False, port=int(model.opts.port), host=model.opts.host)