Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
- name: Create the bucket
run: ./mc alias set blobs http://127.0.0.1:9000 MEGASCATTERBOMB masterbase
- name: more minio bs
run: ./mc mb -p blobs/demoblobs
run: ./mc mb -p blobs/demoblobs && ./mc mb -p blobs/analysisblobs
- name: Remove mc client
run: rm -v ./mc

Expand Down
31 changes: 29 additions & 2 deletions masterbase/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
import os
import sys
import time
from datetime import datetime, timezone
from hmac import compare_digest
Expand Down Expand Up @@ -41,6 +42,8 @@
demo_blob_name,
generate_api_key,
generate_uuid4_int,
get_uningested_demos,
ingest_demo,
late_bytes_helper,
list_demos_helper,
provision_api_key,
Expand All @@ -55,6 +58,9 @@
from masterbase.registers import shutdown_registers, startup_registers
from masterbase.steam import account_exists, is_limited_account

CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.append(os.path.dirname(CURRENT_DIR))

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -197,6 +203,25 @@ def db_export(request: Request, api_key: str, table: ExportTable) -> Stream:
)


@get("/jobs", guards=[valid_key_guard, analyst_guard], sync_to_thread=False)
def jobs(request: Request, api_key: str, limit: int = 1) -> list[str]:
"""Return a list of demos that need analysis."""
engine = request.app.state.engine
demos = get_uningested_demos(engine, limit)

return demos


@post("/ingest", guards=[valid_key_guard, analyst_guard], sync_to_thread=False)
def ingest(request: Request, api_key: str, session_id: str) -> dict[str, bool]:
"""Report analysis as completed, ingest into database."""
minio_client = request.app.state.minio_client
err = ingest_demo(minio_client, request.app.state.engine, session_id)
if err is None:
return {"ingested": True}
raise HTTPException(detail=f"Internal Error Occured: {err}", status_code=500)


@post("/report", guards=[valid_key_guard])
async def report_player(request: Request, api_key: str, data: ReportBody) -> dict[str, bool]:
"""Add a player report."""
Expand Down Expand Up @@ -279,8 +304,8 @@ def provision(request: Request) -> Redirect:
"""
# enforce https on base_url
base_url = str(request.base_url)
dev_mode = os.getenv('DEVELOPMENT', 'false')
proto = "http://" if dev_mode.lower() == 'true' else "https://"
dev_mode = os.getenv("DEVELOPMENT", "false")
proto = "http://" if dev_mode.lower() == "true" else "https://"
base_url = proto + base_url.split("//")[-1]

auth_params = {
Expand Down Expand Up @@ -421,6 +446,8 @@ def plain_text_exception_handler(_: Request, exception: Exception) -> Response:
analyst_list_demos,
report_player,
db_export,
jobs,
ingest,
],
exception_handlers={Exception: plain_text_exception_handler},
on_shutdown=shutdown_registers,
Expand Down
128 changes: 128 additions & 0 deletions masterbase/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import hashlib
import io
import json
import logging
import os
import secrets
Expand All @@ -16,11 +17,13 @@
from litestar import WebSocket
from minio import Minio, S3Error
from minio.datatypes import Object as BlobStat
from pydantic import ValidationError
from sqlalchemy import Engine
from sqlalchemy.exc import NoResultFound
from sqlalchemy.ext.asyncio import AsyncEngine

from masterbase.anomaly import DetectionState
from masterbase.models import Analysis

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -304,6 +307,131 @@ async def check_analyst(engine: AsyncEngine, steam_id: str) -> bool:
return analyst


def get_uningested_demos(engine: Engine, limit: int) -> list[str]:
"""Get a list of uningested demos."""
sql = """
SELECT
session_id
FROM
demo_sessions
WHERE
active = false
AND open = false
AND ingested = false
AND demo_size > 0
AND blob_name IS NOT NULL
ORDER BY
created_at ASC
LIMIT :limit;
"""
params = {"limit": limit}

with engine.connect() as conn:
result = conn.execute(
sa.text(sql),
params,
)

data = result.all()
uningested_demos = [row[0] for row in data]

return uningested_demos


def ingest_demo(minio_client: Minio, engine: Engine, session_id: str):
"""Ingest a demo analysis from an analysis client."""
blob_name = f"{session_id}.json"
try:
raw_data = minio_client.get_object("jsonblobs", blob_name).read()
decoded_data = raw_data.decode("utf-8")
json_data = json.JSONDecoder().decode(decoded_data)
data = Analysis.parse_obj(json_data)
except S3Error as err:
if err.code == "NoSuchKey":
return "no analysis data found."
else:
return "unknown S3 error while looking up analysis data."
except ValidationError:
return "malformed analysis data."

# Data preprocessing
algorithm_counts = {}
for detection in data.detections:
key = (detection.player, detection.algorithm)
if key not in algorithm_counts:
algorithm_counts[key] = 0
algorithm_counts[key] += 1

# ensure the demo session is not already ingested
is_ingested_sql = "SELECT ingested, active, open FROM demo_sessions WHERE session_id = :session_id;"

# Wipe existing analysis data
# (we want to be able to reingest a demo if necessary by manually setting ingested = false)
wipe_analysis_sql = "DELETE FROM analysis WHERE session_id = :session_id;"
wipe_reviews_sql = "DELETE FROM reviews WHERE session_id = :session_id;"

# Insert the analysis data
insert_sql = """\
INSERT INTO analysis (
session_id, target_steam_id, algorithm_type, detection_count, created_at
) VALUES (
:session_id, :target_steam_id, :algorithm, :count, :created_at
);
"""

# Mark the demo as ingested
mark_ingested_sql = "UPDATE demo_sessions SET ingested = true WHERE session_id = :session_id;"
created_at = datetime.now().astimezone(timezone.utc).isoformat()

with engine.connect() as conn:
with conn.begin():
command = conn.execute(
sa.text(is_ingested_sql),
{"session_id": session_id},
)

result = command.one_or_none()
if result is None:
conn.rollback()
return "demo not found"
if result.ingested is True:
conn.rollback()
return "demo already ingested"
if result.active is True:
conn.rollback()
return "session is still active"
if result.open is True:
conn.rollback()
return "session is still open"

conn.execute(
sa.text(wipe_analysis_sql),
{"session_id": session_id},
)
conn.execute(
sa.text(wipe_reviews_sql),
{"session_id": session_id},
)

for key, count in algorithm_counts.items():
conn.execute(
sa.text(insert_sql),
{
"session_id": session_id,
"target_steam_id": key[0],
"algorithm": key[1],
"count": count,
"created_at": created_at,
},
)

conn.execute(
sa.text(mark_ingested_sql),
{"session_id": session_id},
)
return None


async def session_closed(engine: AsyncEngine, session_id: str) -> bool:
"""Determine if a session is active."""
sql = "SELECT active FROM demo_sessions WHERE session_id = :session_id;"
Expand Down
20 changes: 20 additions & 0 deletions masterbase/models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Module of pydantic models."""

from enum import Enum
from typing import Any

from pydantic import BaseModel

Expand All @@ -20,6 +21,25 @@ class ReportBody(BaseModel):
reason: ReportReason


class Detection(BaseModel):
"""A single detection from the analysis client."""

tick: int
algorithm: str
player: int
data: Any


class Analysis(BaseModel):
"""The body of the POST /demos endpoint."""

author: str
detections: list[Detection]
duration: int
map: str
server_ip: str


class ExportTable(str, Enum):
"""Tables to be allowed in database exports."""

Expand Down
2 changes: 2 additions & 0 deletions masterbase/registers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ def get_minio_connection(app: Litestar) -> Minio:
minio_client = make_minio_client()
if not minio_client.bucket_exists("demoblobs"):
minio_client.make_bucket("demoblobs", "us-east-1")
if not minio_client.bucket_exists("jsonblobs"):
minio_client.make_bucket("jsonblobs", "us-east-1")
app.state.minio_client = minio_client

return cast(Minio, app.state.minio_client)
Expand Down
62 changes: 62 additions & 0 deletions migrations/versions/b941ebee3091_analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""analysis

Revision ID: b941ebee3091
Revises: 53d7f00c595e
Create Date: 2024-10-08 13:56:46.796256

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = 'b941ebee3091'
down_revision: Union[str, None] = '53d7f00c595e'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Add analysis and review tables."""
op.execute(
"""
CREATE TYPE verdict AS ENUM ('none', 'benign', 'inconclusive', 'confirmed', 'error');
"""
)
op.execute(
"""
CREATE TABLE analysis (
session_id varchar REFERENCES demo_sessions,
target_steam_id varchar,
algorithm_type varchar,
detection_count int,
created_at timestamptz,
PRIMARY KEY (session_id, target_steam_id, algorithm_type)
);
"""
)
op.execute(
"""
CREATE TABLE reviews (
session_id varchar REFERENCES demo_sessions,
target_steam_id varchar,
reviewer_steam_id varchar,
verdict verdict,
created_at timestamptz,
PRIMARY KEY (session_id, target_steam_id, reviewer_steam_id)
);
"""
)


def downgrade() -> None:
"""Remove analysis and review tables."""
op.execute(
"""
DROP TABLE analysis;
DROP TABLE reviews;
"""
)
op.execute("DROP TYPE verdict;")
5 changes: 4 additions & 1 deletion services/api/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ COPY . .

RUN apt-get update && apt-get install -y --no-install-recommends \
apt-utils \
postgresql-client
postgresql-client \
dos2unix

RUN pdm sync --prod --no-editable

Expand All @@ -22,4 +23,6 @@ RUN touch /first_run
COPY services/api/start.sh /usr/local/bin/start.sh
RUN chmod +x /usr/local/bin/start.sh

RUN dos2unix /usr/local/bin/start.sh

ENTRYPOINT /usr/local/bin/start.sh
2 changes: 2 additions & 0 deletions services/minio/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ EXPOSE 9001


COPY services/minio/start.sh /usr/local/bin/start.sh
RUN tr -d '\r' < /usr/local/bin/start.sh > /usr/local/bin/start_unix.sh && mv /usr/local/bin/start_unix.sh /usr/local/bin/start.sh

RUN chmod +x /usr/local/bin/start.sh

ENTRYPOINT /usr/local/bin/start.sh
11 changes: 11 additions & 0 deletions vars.ps1
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
$env:DEVELOPMENT = $true
$env:DEBUG_WAIT_FOR_ATTACH = $true
$env:STEAM_API_KEY = "foo"
$env:POSTGRES_USER = "MEGASCATTERBOMB"
$env:POSTGRES_PASSWORD = "masterbase"
$env:POSTGRES_HOST = "localhost"
$env:POSTGRES_PORT = 8050
$env:MINIO_HOST = "localhost"
$env:MINIO_PORT = 9000
$env:MINIO_ACCESS_KEY = "MEGASCATTERBOMB"
$env:MINIO_SECRET_KEY = "masterbase"
Loading