-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathapp.py
More file actions
168 lines (148 loc) · 7.37 KB
/
app.py
File metadata and controls
168 lines (148 loc) · 7.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import re
import os
import time
import logging
import hashlib
from waitress import serve
from werkzeug.utils import secure_filename
from flask import Flask, request, render_template, jsonify
from picblocks.blockhasher import BlockHasher
from picblocks.blockhashmatcher import BlockHashMatcher
logging.basicConfig(level=logging.INFO, format="%(asctime)-15s: %(name)-30s - %(message)s")
LOG = logging.getLogger("flask-app")
#TODO: Refactoring needed! Importing from external and unique source
USE_DB = False
db = None
if USE_DB:
try:
from pymongo import MongoClient
c = MongoClient("mongodb://localhost:27017")
db = c['malpedia']
f_to_id = db['family_to_id']
f_to_f = db['family_id_to_family']
blocks = db['blockhashes']
s_to_s = db['sample_id_to_sample']
s_s = db['statistics']
except:
db = None
LOG.error("Could not initialize database.")
app = Flask(__name__)
matcher = BlockHashMatcher()
start = time.time()
LOG.info("Loading BlocksDB")
if os.path.exists("db/picblocksdb.json"):
matcher.loadDb("db/picblocksdb.json")
LOG.info("Done! (%5.2fs)", (time.time() - start))
def render_report(report, template):
file_name = report['input_filename']
sha256 = report['sha256']
bitness = report['bitness']
extracted = report['input_block_hashes']
block_b = report['input_block_bytes']
unmatched = report['unmatched_blocks']
unmatch_sc= report['unmatched_score']
# collect output and deliver at the end
output = f""
output += f"<table>\n<tr><th>#</th><th>family</th><th colspan='3'>direct match</th><th colspan='3'>libraries excluded</th><th colspan='3'>frequency adjusted</th><th colspan='3'>uniquely matched</th></tr>\n"
index = 0
alternate = 0
for entry in report["family_matches"]:
dark = "ed" if alternate % 2 == 0 else "od"
light = "el" if alternate % 2 == 0 else "ol"
green = "gd" if alternate % 2 == 0 else "gl"
if entry['uniq_bytes'] > 0 or index < 20:
style20 = " style='border-bottom: 2px solid black;'" if index == 19 else ""
malpedia_link = f"<a href='https://malpedia.caad.fkie.fraunhofer.de/details/"+ entry['family'] +"' target='_blank'>"+str(entry['family'])+"</a>"
#TODO: just improve this durity and rep. code
_a = f"{entry['index']:>5,d}"
_b = f"{entry['direct_bytes']:,d}"
_c = f"{entry['direct_blocks']:,d}"
_d = f"{entry['direct_perc']:>5.2f}%"
_e = f"{entry['nonlib_bytes']:,d}"
_f = f"{entry['nonlib_blocks']:,d}"
_g = f"{entry['nonlib_perc']:>5.2f}%"
_h = f"{entry['freq_bytes']:,d}"
_i = f"{entry['freq_blocks']:5.2f}"
_l = f"{entry['freq_perc']:>5.2f}%"
_1 = f"{entry['uniq_bytes']:,d}"
_2 = f"{entry['uniq_blocks']:,d}"
_3 = f"{entry['uniq_perc']:>5.2f}%"
output += f"<tr"+style20+"><td class='"+light+"'>"+_a+"</td><td class='"+light+"'>"+malpedia_link+"</td>"
output += f"<td class='"+dark+"' style='text-align:right'>"+_b+"</td><td class='"+dark+"' style='text-align:right'>"+_c+"</td><td class='"+dark+"' style='text-align:right'>"+_d+"</td>"
output += f"<td class='"+light+"' style='text-align:right'>"+_e+"</td><td class='"+light+"' style='text-align:right'>"+_f+"</td><td class='"+light+"' style='text-align:right'>"+_g+"</td>"
output += f"<td class='"+dark+"' style='text-align:right'>"+_h+"</td><td class='"+dark+"' style='text-align:right'>"+_i+"</td><td class='"+dark+"' style='text-align:right'>"+_l+"</td>"
if entry['uniq_bytes'] > 0:
light = green
output += f"<td class='"+light+"' style='text-align:right'>"+_1+"</td><td class='"+light+"' style='text-align:right'>"+_2+"</td><td class='"+light+"' style='text-align:right'>"+_3+"</td></tr>"
alternate += 1
index += 1
output += "</table>\n"
output += "<p></p>"
output += "<h3>Information</h3>"
output += "<p>results per matching class shown as (bytes, blocks, percent of bytes).<br />"
output += "libraries excluded: filter out blocks known from a set of 3rd party libraries, including MSVC.<br />"
output += "frequency adjusted: for the remainder, block scores are increasingly penalized when occurring in three or more families.<br />"
output += "uniquely matched: Block score for blocks only found in this family.</p>"
return render_template(template, file_name=file_name, sha256=sha256, bitness=bitness, extracted=extracted, unmatched=unmatched, unmatch_sc=unmatch_sc, out_html=output)
@app.route("/")
def index():
LOG.info("request to /index")
return render_template('index.html', db_timestamp=matcher.db_timestamp)
@app.route("/about")
def about():
LOG.info("request to /about")
stats = matcher.getDbStats()
return render_template(
'about.html',
num_families=stats["num_families"],
num_libraries=stats["num_libraries"],
num_files=stats["num_files"],
num_functions=stats["num_functions"],
num_hashes=stats["num_hashes"],
num_hash_and_sizes=stats["num_hash_and_sizes"],
num_bytes=stats["num_bytes"],
num_bytes_unique=stats["num_bytes_unique"],
db_timestamp=matcher.db_timestamp
)
@app.route('/stats', methods=['GET'])
def get_stats():
LOG.info("request to /stats")
if request.method == 'GET':
stats = []
if USE_DB and db:
f_c = f_to_id.find({}).count()
s_c = s_to_s.find({}).count()
b_c = blocks.find({}).count()
cursor = s_s.find({})
stats = list(cursor)
return render_template('stats.html', db_online ="online", tracked_families=f_c, number_samples=s_c, number_blocks=b_c, s_stats=stats)
else:
return render_template('disabled.html')
@app.route('/blocks', methods=['GET', 'POST'])
def upload_file():
LOG.info("request to /blocks")
if request.method == 'POST':
f = request.files['binary']
binary = f.read()
LOG.info(f"received binary with sha256: {hashlib.sha256(binary).hexdigest()}")
form_bitness = int(request.form["bitness"]) if ("bitness" in request.form and request.form["bitness"] in ["32", "64"]) else None
form_baseaddress = int(request.form["baseaddress"], 16) if ("baseaddress" in request.form and re.match("^0x[0-9a-fA-F]{1,16}$", request.form["baseaddress"])) else None
hasher = BlockHasher()
blockhash_report = hasher.processBuffer(binary, secure_filename(f.filename), bitness=form_bitness, baseaddress=form_baseaddress)
report = matcher.match(blockhash_report)
LOG.info("matching completed.")
return render_report(report, "report.html")
@app.route('/api/blocks', methods=['POST'])
def upload_api_file():
LOG.info("request to /api/blocks")
if request.method == 'POST':
binary = request.stream.read()
LOG.info(f"received binary with sha256: {hashlib.sha256(binary).hexdigest()}")
hasher = BlockHasher()
blockhash_report = hasher.processBuffer(binary, f"sha256:{hashlib.sha256(binary).hexdigest()}")
report = matcher.match(blockhash_report)
LOG.info("matching completed.")
return jsonify(report)
if __name__ == '__main__':
# start up server as WSGI applet through waitress
serve(app, host="127.0.0.1", port=9001)