Skip to content

Commit e26bb1a

Browse files
Handle analysis of demos (#93)
Co-authored-by: megascatterbomb <megascatterbomb@gmail.com>
1 parent a99809c commit e26bb1a

File tree

9 files changed

+259
-4
lines changed

9 files changed

+259
-4
lines changed

.github/workflows/integration.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ jobs:
4848
- name: Create the bucket
4949
run: ./mc alias set blobs http://127.0.0.1:9000 MEGASCATTERBOMB masterbase
5050
- name: more minio bs
51-
run: ./mc mb -p blobs/demoblobs
51+
run: ./mc mb -p blobs/demoblobs && ./mc mb -p blobs/analysisblobs
5252
- name: Remove mc client
5353
run: rm -v ./mc
5454

masterbase/app.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import logging
44
import os
5+
import sys
56
import time
67
from datetime import datetime, timezone
78
from hmac import compare_digest
@@ -41,6 +42,8 @@
4142
demo_blob_name,
4243
generate_api_key,
4344
generate_uuid4_int,
45+
get_uningested_demos,
46+
ingest_demo,
4447
late_bytes_helper,
4548
list_demos_helper,
4649
provision_api_key,
@@ -55,6 +58,9 @@
5558
from masterbase.registers import shutdown_registers, startup_registers
5659
from masterbase.steam import account_exists, is_limited_account
5760

61+
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
62+
sys.path.append(os.path.dirname(CURRENT_DIR))
63+
5864
logger = logging.getLogger(__name__)
5965

6066

@@ -197,6 +203,25 @@ def db_export(request: Request, api_key: str, table: ExportTable) -> Stream:
197203
)
198204

199205

206+
@get("/jobs", guards=[valid_key_guard, analyst_guard], sync_to_thread=False)
207+
def jobs(request: Request, api_key: str, limit: int = 1) -> list[str]:
208+
"""Return a list of demos that need analysis."""
209+
engine = request.app.state.engine
210+
demos = get_uningested_demos(engine, limit)
211+
212+
return demos
213+
214+
215+
@post("/ingest", guards=[valid_key_guard, analyst_guard], sync_to_thread=False)
216+
def ingest(request: Request, api_key: str, session_id: str) -> dict[str, bool]:
217+
"""Report analysis as completed, ingest into database."""
218+
minio_client = request.app.state.minio_client
219+
err = ingest_demo(minio_client, request.app.state.engine, session_id)
220+
if err is None:
221+
return {"ingested": True}
222+
raise HTTPException(detail=f"Internal Error Occured: {err}", status_code=500)
223+
224+
200225
@post("/report", guards=[valid_key_guard])
201226
async def report_player(request: Request, api_key: str, data: ReportBody) -> dict[str, bool]:
202227
"""Add a player report."""
@@ -279,8 +304,8 @@ def provision(request: Request) -> Redirect:
279304
"""
280305
# enforce https on base_url
281306
base_url = str(request.base_url)
282-
dev_mode = os.getenv('DEVELOPMENT', 'false')
283-
proto = "http://" if dev_mode.lower() == 'true' else "https://"
307+
dev_mode = os.getenv("DEVELOPMENT", "false")
308+
proto = "http://" if dev_mode.lower() == "true" else "https://"
284309
base_url = proto + base_url.split("//")[-1]
285310

286311
auth_params = {
@@ -421,6 +446,8 @@ def plain_text_exception_handler(_: Request, exception: Exception) -> Response:
421446
analyst_list_demos,
422447
report_player,
423448
db_export,
449+
jobs,
450+
ingest,
424451
],
425452
exception_handlers={Exception: plain_text_exception_handler},
426453
on_shutdown=shutdown_registers,

masterbase/lib.py

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import hashlib
44
import io
5+
import json
56
import logging
67
import os
78
import secrets
@@ -16,11 +17,13 @@
1617
from litestar import WebSocket
1718
from minio import Minio, S3Error
1819
from minio.datatypes import Object as BlobStat
20+
from pydantic import ValidationError
1921
from sqlalchemy import Engine
2022
from sqlalchemy.exc import NoResultFound
2123
from sqlalchemy.ext.asyncio import AsyncEngine
2224

2325
from masterbase.anomaly import DetectionState
26+
from masterbase.models import Analysis
2427

2528
logger = logging.getLogger(__name__)
2629

@@ -304,6 +307,131 @@ async def check_analyst(engine: AsyncEngine, steam_id: str) -> bool:
304307
return analyst
305308

306309

310+
def get_uningested_demos(engine: Engine, limit: int) -> list[str]:
311+
"""Get a list of uningested demos."""
312+
sql = """
313+
SELECT
314+
session_id
315+
FROM
316+
demo_sessions
317+
WHERE
318+
active = false
319+
AND open = false
320+
AND ingested = false
321+
AND demo_size > 0
322+
AND blob_name IS NOT NULL
323+
ORDER BY
324+
created_at ASC
325+
LIMIT :limit;
326+
"""
327+
params = {"limit": limit}
328+
329+
with engine.connect() as conn:
330+
result = conn.execute(
331+
sa.text(sql),
332+
params,
333+
)
334+
335+
data = result.all()
336+
uningested_demos = [row[0] for row in data]
337+
338+
return uningested_demos
339+
340+
341+
def ingest_demo(minio_client: Minio, engine: Engine, session_id: str):
342+
"""Ingest a demo analysis from an analysis client."""
343+
blob_name = f"{session_id}.json"
344+
try:
345+
raw_data = minio_client.get_object("jsonblobs", blob_name).read()
346+
decoded_data = raw_data.decode("utf-8")
347+
json_data = json.JSONDecoder().decode(decoded_data)
348+
data = Analysis.parse_obj(json_data)
349+
except S3Error as err:
350+
if err.code == "NoSuchKey":
351+
return "no analysis data found."
352+
else:
353+
return "unknown S3 error while looking up analysis data."
354+
except ValidationError:
355+
return "malformed analysis data."
356+
357+
# Data preprocessing
358+
algorithm_counts = {}
359+
for detection in data.detections:
360+
key = (detection.player, detection.algorithm)
361+
if key not in algorithm_counts:
362+
algorithm_counts[key] = 0
363+
algorithm_counts[key] += 1
364+
365+
# ensure the demo session is not already ingested
366+
is_ingested_sql = "SELECT ingested, active, open FROM demo_sessions WHERE session_id = :session_id;"
367+
368+
# Wipe existing analysis data
369+
# (we want to be able to reingest a demo if necessary by manually setting ingested = false)
370+
wipe_analysis_sql = "DELETE FROM analysis WHERE session_id = :session_id;"
371+
wipe_reviews_sql = "DELETE FROM reviews WHERE session_id = :session_id;"
372+
373+
# Insert the analysis data
374+
insert_sql = """\
375+
INSERT INTO analysis (
376+
session_id, target_steam_id, algorithm_type, detection_count, created_at
377+
) VALUES (
378+
:session_id, :target_steam_id, :algorithm, :count, :created_at
379+
);
380+
"""
381+
382+
# Mark the demo as ingested
383+
mark_ingested_sql = "UPDATE demo_sessions SET ingested = true WHERE session_id = :session_id;"
384+
created_at = datetime.now().astimezone(timezone.utc).isoformat()
385+
386+
with engine.connect() as conn:
387+
with conn.begin():
388+
command = conn.execute(
389+
sa.text(is_ingested_sql),
390+
{"session_id": session_id},
391+
)
392+
393+
result = command.one_or_none()
394+
if result is None:
395+
conn.rollback()
396+
return "demo not found"
397+
if result.ingested is True:
398+
conn.rollback()
399+
return "demo already ingested"
400+
if result.active is True:
401+
conn.rollback()
402+
return "session is still active"
403+
if result.open is True:
404+
conn.rollback()
405+
return "session is still open"
406+
407+
conn.execute(
408+
sa.text(wipe_analysis_sql),
409+
{"session_id": session_id},
410+
)
411+
conn.execute(
412+
sa.text(wipe_reviews_sql),
413+
{"session_id": session_id},
414+
)
415+
416+
for key, count in algorithm_counts.items():
417+
conn.execute(
418+
sa.text(insert_sql),
419+
{
420+
"session_id": session_id,
421+
"target_steam_id": key[0],
422+
"algorithm": key[1],
423+
"count": count,
424+
"created_at": created_at,
425+
},
426+
)
427+
428+
conn.execute(
429+
sa.text(mark_ingested_sql),
430+
{"session_id": session_id},
431+
)
432+
return None
433+
434+
307435
async def session_closed(engine: AsyncEngine, session_id: str) -> bool:
308436
"""Determine if a session is active."""
309437
sql = "SELECT active FROM demo_sessions WHERE session_id = :session_id;"

masterbase/models.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Module of pydantic models."""
22

33
from enum import Enum
4+
from typing import Any
45

56
from pydantic import BaseModel
67

@@ -20,6 +21,25 @@ class ReportBody(BaseModel):
2021
reason: ReportReason
2122

2223

24+
class Detection(BaseModel):
25+
"""A single detection from the analysis client."""
26+
27+
tick: int
28+
algorithm: str
29+
player: int
30+
data: Any
31+
32+
33+
class Analysis(BaseModel):
34+
"""The body of the POST /demos endpoint."""
35+
36+
author: str
37+
detections: list[Detection]
38+
duration: int
39+
map: str
40+
server_ip: str
41+
42+
2343
class ExportTable(str, Enum):
2444
"""Tables to be allowed in database exports."""
2545

masterbase/registers.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ def get_minio_connection(app: Litestar) -> Minio:
1616
minio_client = make_minio_client()
1717
if not minio_client.bucket_exists("demoblobs"):
1818
minio_client.make_bucket("demoblobs", "us-east-1")
19+
if not minio_client.bucket_exists("jsonblobs"):
20+
minio_client.make_bucket("jsonblobs", "us-east-1")
1921
app.state.minio_client = minio_client
2022

2123
return cast(Minio, app.state.minio_client)
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
"""analysis
2+
3+
Revision ID: b941ebee3091
4+
Revises: 53d7f00c595e
5+
Create Date: 2024-10-08 13:56:46.796256
6+
7+
"""
8+
from typing import Sequence, Union
9+
10+
from alembic import op
11+
import sqlalchemy as sa
12+
13+
14+
# revision identifiers, used by Alembic.
15+
revision: str = 'b941ebee3091'
16+
down_revision: Union[str, None] = '53d7f00c595e'
17+
branch_labels: Union[str, Sequence[str], None] = None
18+
depends_on: Union[str, Sequence[str], None] = None
19+
20+
21+
def upgrade() -> None:
22+
"""Add analysis and review tables."""
23+
op.execute(
24+
"""
25+
CREATE TYPE verdict AS ENUM ('none', 'benign', 'inconclusive', 'confirmed', 'error');
26+
"""
27+
)
28+
op.execute(
29+
"""
30+
CREATE TABLE analysis (
31+
session_id varchar REFERENCES demo_sessions,
32+
target_steam_id varchar,
33+
algorithm_type varchar,
34+
detection_count int,
35+
created_at timestamptz,
36+
PRIMARY KEY (session_id, target_steam_id, algorithm_type)
37+
);
38+
"""
39+
)
40+
op.execute(
41+
"""
42+
CREATE TABLE reviews (
43+
session_id varchar REFERENCES demo_sessions,
44+
target_steam_id varchar,
45+
reviewer_steam_id varchar,
46+
verdict verdict,
47+
created_at timestamptz,
48+
PRIMARY KEY (session_id, target_steam_id, reviewer_steam_id)
49+
);
50+
"""
51+
)
52+
53+
54+
def downgrade() -> None:
55+
"""Remove analysis and review tables."""
56+
op.execute(
57+
"""
58+
DROP TABLE analysis;
59+
DROP TABLE reviews;
60+
"""
61+
)
62+
op.execute("DROP TYPE verdict;")

services/api/Dockerfile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ COPY . .
1111

1212
RUN apt-get update && apt-get install -y --no-install-recommends \
1313
apt-utils \
14-
postgresql-client
14+
postgresql-client \
15+
dos2unix
1516

1617
RUN pdm sync --prod --no-editable
1718

@@ -22,4 +23,6 @@ RUN touch /first_run
2223
COPY services/api/start.sh /usr/local/bin/start.sh
2324
RUN chmod +x /usr/local/bin/start.sh
2425

26+
RUN dos2unix /usr/local/bin/start.sh
27+
2528
ENTRYPOINT /usr/local/bin/start.sh

services/minio/Dockerfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ EXPOSE 9001
1212

1313

1414
COPY services/minio/start.sh /usr/local/bin/start.sh
15+
RUN tr -d '\r' < /usr/local/bin/start.sh > /usr/local/bin/start_unix.sh && mv /usr/local/bin/start_unix.sh /usr/local/bin/start.sh
16+
1517
RUN chmod +x /usr/local/bin/start.sh
1618

1719
ENTRYPOINT /usr/local/bin/start.sh

vars.ps1

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
$env:DEVELOPMENT = $true
2+
$env:DEBUG_WAIT_FOR_ATTACH = $true
3+
$env:STEAM_API_KEY = "foo"
4+
$env:POSTGRES_USER = "MEGASCATTERBOMB"
5+
$env:POSTGRES_PASSWORD = "masterbase"
6+
$env:POSTGRES_HOST = "localhost"
7+
$env:POSTGRES_PORT = 8050
8+
$env:MINIO_HOST = "localhost"
9+
$env:MINIO_PORT = 9000
10+
$env:MINIO_ACCESS_KEY = "MEGASCATTERBOMB"
11+
$env:MINIO_SECRET_KEY = "masterbase"

0 commit comments

Comments
 (0)