diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index e77ac82..df3d76c 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -48,7 +48,7 @@ jobs: - name: Create the bucket run: ./mc alias set blobs http://127.0.0.1:9000 MEGASCATTERBOMB masterbase - name: more minio bs - run: ./mc mb -p blobs/demoblobs + run: ./mc mb -p blobs/demoblobs && ./mc mb -p blobs/analysisblobs - name: Remove mc client run: rm -v ./mc diff --git a/masterbase/app.py b/masterbase/app.py index 1959952..5187148 100644 --- a/masterbase/app.py +++ b/masterbase/app.py @@ -2,6 +2,7 @@ import logging import os +import sys import time from datetime import datetime, timezone from hmac import compare_digest @@ -41,6 +42,8 @@ demo_blob_name, generate_api_key, generate_uuid4_int, + get_uningested_demos, + ingest_demo, late_bytes_helper, list_demos_helper, provision_api_key, @@ -55,6 +58,9 @@ from masterbase.registers import shutdown_registers, startup_registers from masterbase.steam import account_exists, is_limited_account +CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(os.path.dirname(CURRENT_DIR)) + logger = logging.getLogger(__name__) @@ -197,6 +203,25 @@ def db_export(request: Request, api_key: str, table: ExportTable) -> Stream: ) +@get("/jobs", guards=[valid_key_guard, analyst_guard], sync_to_thread=False) +def jobs(request: Request, api_key: str, limit: int = 1) -> list[str]: + """Return a list of demos that need analysis.""" + engine = request.app.state.engine + demos = get_uningested_demos(engine, limit) + + return demos + + +@post("/ingest", guards=[valid_key_guard, analyst_guard], sync_to_thread=False) +def ingest(request: Request, api_key: str, session_id: str) -> dict[str, bool]: + """Report analysis as completed, ingest into database.""" + minio_client = request.app.state.minio_client + err = ingest_demo(minio_client, request.app.state.engine, session_id) + if err is None: + return {"ingested": True} + raise HTTPException(detail=f"Internal Error Occured: {err}", status_code=500) + + @post("/report", guards=[valid_key_guard]) async def report_player(request: Request, api_key: str, data: ReportBody) -> dict[str, bool]: """Add a player report.""" @@ -279,8 +304,8 @@ def provision(request: Request) -> Redirect: """ # enforce https on base_url base_url = str(request.base_url) - dev_mode = os.getenv('DEVELOPMENT', 'false') - proto = "http://" if dev_mode.lower() == 'true' else "https://" + dev_mode = os.getenv("DEVELOPMENT", "false") + proto = "http://" if dev_mode.lower() == "true" else "https://" base_url = proto + base_url.split("//")[-1] auth_params = { @@ -421,6 +446,8 @@ def plain_text_exception_handler(_: Request, exception: Exception) -> Response: analyst_list_demos, report_player, db_export, + jobs, + ingest, ], exception_handlers={Exception: plain_text_exception_handler}, on_shutdown=shutdown_registers, diff --git a/masterbase/lib.py b/masterbase/lib.py index 9aec222..386fe65 100644 --- a/masterbase/lib.py +++ b/masterbase/lib.py @@ -2,6 +2,7 @@ import hashlib import io +import json import logging import os import secrets @@ -16,11 +17,13 @@ from litestar import WebSocket from minio import Minio, S3Error from minio.datatypes import Object as BlobStat +from pydantic import ValidationError from sqlalchemy import Engine from sqlalchemy.exc import NoResultFound from sqlalchemy.ext.asyncio import AsyncEngine from masterbase.anomaly import DetectionState +from masterbase.models import Analysis logger = logging.getLogger(__name__) @@ -304,6 +307,131 @@ async def check_analyst(engine: AsyncEngine, steam_id: str) -> bool: return analyst +def get_uningested_demos(engine: Engine, limit: int) -> list[str]: + """Get a list of uningested demos.""" + sql = """ + SELECT + session_id + FROM + demo_sessions + WHERE + active = false + AND open = false + AND ingested = false + AND demo_size > 0 + AND blob_name IS NOT NULL + ORDER BY + created_at ASC + LIMIT :limit; + """ + params = {"limit": limit} + + with engine.connect() as conn: + result = conn.execute( + sa.text(sql), + params, + ) + + data = result.all() + uningested_demos = [row[0] for row in data] + + return uningested_demos + + +def ingest_demo(minio_client: Minio, engine: Engine, session_id: str): + """Ingest a demo analysis from an analysis client.""" + blob_name = f"{session_id}.json" + try: + raw_data = minio_client.get_object("jsonblobs", blob_name).read() + decoded_data = raw_data.decode("utf-8") + json_data = json.JSONDecoder().decode(decoded_data) + data = Analysis.parse_obj(json_data) + except S3Error as err: + if err.code == "NoSuchKey": + return "no analysis data found." + else: + return "unknown S3 error while looking up analysis data." + except ValidationError: + return "malformed analysis data." + + # Data preprocessing + algorithm_counts = {} + for detection in data.detections: + key = (detection.player, detection.algorithm) + if key not in algorithm_counts: + algorithm_counts[key] = 0 + algorithm_counts[key] += 1 + + # ensure the demo session is not already ingested + is_ingested_sql = "SELECT ingested, active, open FROM demo_sessions WHERE session_id = :session_id;" + + # Wipe existing analysis data + # (we want to be able to reingest a demo if necessary by manually setting ingested = false) + wipe_analysis_sql = "DELETE FROM analysis WHERE session_id = :session_id;" + wipe_reviews_sql = "DELETE FROM reviews WHERE session_id = :session_id;" + + # Insert the analysis data + insert_sql = """\ + INSERT INTO analysis ( + session_id, target_steam_id, algorithm_type, detection_count, created_at + ) VALUES ( + :session_id, :target_steam_id, :algorithm, :count, :created_at + ); + """ + + # Mark the demo as ingested + mark_ingested_sql = "UPDATE demo_sessions SET ingested = true WHERE session_id = :session_id;" + created_at = datetime.now().astimezone(timezone.utc).isoformat() + + with engine.connect() as conn: + with conn.begin(): + command = conn.execute( + sa.text(is_ingested_sql), + {"session_id": session_id}, + ) + + result = command.one_or_none() + if result is None: + conn.rollback() + return "demo not found" + if result.ingested is True: + conn.rollback() + return "demo already ingested" + if result.active is True: + conn.rollback() + return "session is still active" + if result.open is True: + conn.rollback() + return "session is still open" + + conn.execute( + sa.text(wipe_analysis_sql), + {"session_id": session_id}, + ) + conn.execute( + sa.text(wipe_reviews_sql), + {"session_id": session_id}, + ) + + for key, count in algorithm_counts.items(): + conn.execute( + sa.text(insert_sql), + { + "session_id": session_id, + "target_steam_id": key[0], + "algorithm": key[1], + "count": count, + "created_at": created_at, + }, + ) + + conn.execute( + sa.text(mark_ingested_sql), + {"session_id": session_id}, + ) + return None + + async def session_closed(engine: AsyncEngine, session_id: str) -> bool: """Determine if a session is active.""" sql = "SELECT active FROM demo_sessions WHERE session_id = :session_id;" diff --git a/masterbase/models.py b/masterbase/models.py index 2ff131c..4f5939a 100644 --- a/masterbase/models.py +++ b/masterbase/models.py @@ -1,6 +1,7 @@ """Module of pydantic models.""" from enum import Enum +from typing import Any from pydantic import BaseModel @@ -20,6 +21,25 @@ class ReportBody(BaseModel): reason: ReportReason +class Detection(BaseModel): + """A single detection from the analysis client.""" + + tick: int + algorithm: str + player: int + data: Any + + +class Analysis(BaseModel): + """The body of the POST /demos endpoint.""" + + author: str + detections: list[Detection] + duration: int + map: str + server_ip: str + + class ExportTable(str, Enum): """Tables to be allowed in database exports.""" diff --git a/masterbase/registers.py b/masterbase/registers.py index 4006783..dfe6fc5 100644 --- a/masterbase/registers.py +++ b/masterbase/registers.py @@ -16,6 +16,8 @@ def get_minio_connection(app: Litestar) -> Minio: minio_client = make_minio_client() if not minio_client.bucket_exists("demoblobs"): minio_client.make_bucket("demoblobs", "us-east-1") + if not minio_client.bucket_exists("jsonblobs"): + minio_client.make_bucket("jsonblobs", "us-east-1") app.state.minio_client = minio_client return cast(Minio, app.state.minio_client) diff --git a/migrations/versions/b941ebee3091_analysis.py b/migrations/versions/b941ebee3091_analysis.py new file mode 100644 index 0000000..bcece0a --- /dev/null +++ b/migrations/versions/b941ebee3091_analysis.py @@ -0,0 +1,62 @@ +"""analysis + +Revision ID: b941ebee3091 +Revises: 53d7f00c595e +Create Date: 2024-10-08 13:56:46.796256 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'b941ebee3091' +down_revision: Union[str, None] = '53d7f00c595e' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Add analysis and review tables.""" + op.execute( + """ + CREATE TYPE verdict AS ENUM ('none', 'benign', 'inconclusive', 'confirmed', 'error'); + """ + ) + op.execute( + """ + CREATE TABLE analysis ( + session_id varchar REFERENCES demo_sessions, + target_steam_id varchar, + algorithm_type varchar, + detection_count int, + created_at timestamptz, + PRIMARY KEY (session_id, target_steam_id, algorithm_type) + ); + """ + ) + op.execute( + """ + CREATE TABLE reviews ( + session_id varchar REFERENCES demo_sessions, + target_steam_id varchar, + reviewer_steam_id varchar, + verdict verdict, + created_at timestamptz, + PRIMARY KEY (session_id, target_steam_id, reviewer_steam_id) + ); + """ + ) + + +def downgrade() -> None: + """Remove analysis and review tables.""" + op.execute( + """ + DROP TABLE analysis; + DROP TABLE reviews; + """ + ) + op.execute("DROP TYPE verdict;") diff --git a/services/api/Dockerfile b/services/api/Dockerfile index 1a84fd6..b667cb9 100644 --- a/services/api/Dockerfile +++ b/services/api/Dockerfile @@ -11,7 +11,8 @@ COPY . . RUN apt-get update && apt-get install -y --no-install-recommends \ apt-utils \ - postgresql-client + postgresql-client \ + dos2unix RUN pdm sync --prod --no-editable @@ -22,4 +23,6 @@ RUN touch /first_run COPY services/api/start.sh /usr/local/bin/start.sh RUN chmod +x /usr/local/bin/start.sh +RUN dos2unix /usr/local/bin/start.sh + ENTRYPOINT /usr/local/bin/start.sh diff --git a/services/minio/Dockerfile b/services/minio/Dockerfile index f7893a6..989be77 100644 --- a/services/minio/Dockerfile +++ b/services/minio/Dockerfile @@ -12,6 +12,8 @@ EXPOSE 9001 COPY services/minio/start.sh /usr/local/bin/start.sh +RUN tr -d '\r' < /usr/local/bin/start.sh > /usr/local/bin/start_unix.sh && mv /usr/local/bin/start_unix.sh /usr/local/bin/start.sh + RUN chmod +x /usr/local/bin/start.sh ENTRYPOINT /usr/local/bin/start.sh \ No newline at end of file diff --git a/vars.ps1 b/vars.ps1 new file mode 100644 index 0000000..99e7c54 --- /dev/null +++ b/vars.ps1 @@ -0,0 +1,11 @@ +$env:DEVELOPMENT = $true +$env:DEBUG_WAIT_FOR_ATTACH = $true +$env:STEAM_API_KEY = "foo" +$env:POSTGRES_USER = "MEGASCATTERBOMB" +$env:POSTGRES_PASSWORD = "masterbase" +$env:POSTGRES_HOST = "localhost" +$env:POSTGRES_PORT = 8050 +$env:MINIO_HOST = "localhost" +$env:MINIO_PORT = 9000 +$env:MINIO_ACCESS_KEY = "MEGASCATTERBOMB" +$env:MINIO_SECRET_KEY = "masterbase"