Skip to content
16 changes: 13 additions & 3 deletions masterbase/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
sys.path.append(os.path.dirname(CURRENT_DIR))

# ruff: noqa: E402
# ruff: noqa: I001
from masterbase.anomaly import DetectionState
from masterbase.guards import (
analyst_guard,
Expand Down Expand Up @@ -65,6 +66,8 @@
from masterbase.registers import shutdown_registers, startup_registers
from masterbase.steam import account_exists, is_limited_account

# ruff: enable

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -127,6 +130,7 @@ def close_session(request: Request, api_key: str) -> dict[str, bool]:

return {"closed_successfully": True}


@post("/close_session", guards=[valid_key_guard, user_in_session_guard], sync_to_thread=False)
def close_with_late_bytes(request: Request, api_key: str, data: LateBytesBody) -> dict[str, bool]:
"""Close a session out. Will find the latest open session for a user.
Expand Down Expand Up @@ -199,8 +203,12 @@ async def demodata(request: Request, api_key: str, session_id: str) -> Stream:
"""Return the demo."""
minio_client = request.app.state.minio_client
blob_name = demo_blob_name(session_id)
file = minio_client.get_object("demoblobs", blob_name)
stat = minio_client.stat_object("demoblobs", blob_name)

try:
file = minio_client.get_object("demoblobs", blob_name)
stat = minio_client.stat_object("demoblobs", blob_name)
except Exception as exc:
raise HTTPException(detail="Demo not found!", status_code=404) from exc

headers = {
"Content-Disposition": f'attachment; filename="{blob_name}"',
Expand Down Expand Up @@ -257,12 +265,14 @@ async def report_player(request: Request, api_key: str, data: ReportBody) -> dic
except IntegrityError:
raise HTTPException(detail=f"Unknown session ID {data.session_id}", status_code=402)


@get("/broadcasts", sync_to_thread=False)
def broadcasts(request: Request) -> list[dict[str, str]]:
"""Return a list of broadcasts."""
engine = request.app.state.engine
return get_broadcasts(engine)


class DemoHandler(WebsocketListener):
"""Custom Websocket Class."""

Expand Down Expand Up @@ -306,7 +316,7 @@ async def on_disconnect(self, socket: WebSocket) -> None: # type: ignore
"""Close handle on disconnect."""
if socket in streaming_sessions:
session_manager = streaming_sessions[socket]
else :
else:
logger.warning("Attempting to disconnect from already disconnected socket!")
return
logger.info(f"Received socket disconnect from session ID: {session_manager.session_id}")
Expand Down
217 changes: 206 additions & 11 deletions masterbase/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io
import json
import logging
import math
import os
import secrets
import socket
Expand Down Expand Up @@ -318,8 +319,8 @@ def get_uningested_demos(engine: Engine, limit: int) -> list[str]:
active = false
AND open = false
AND ingested = false
AND pruned = false
AND demo_size > 0
AND blob_name IS NOT NULL
ORDER BY
created_at ASC
LIMIT :limit;
Expand All @@ -340,7 +341,7 @@ def get_uningested_demos(engine: Engine, limit: int) -> list[str]:

def ingest_demo(minio_client: Minio, engine: Engine, session_id: str):
"""Ingest a demo analysis from an analysis client."""
blob_name = f"{session_id}.json"
blob_name = json_blob_name(session_id)
try:
raw_data = minio_client.get_object("jsonblobs", blob_name).read()
decoded_data = raw_data.decode("utf-8")
Expand Down Expand Up @@ -551,8 +552,7 @@ def _close_session_with_demo(
end_time = :end_time,
demo_size = :demo_size,
markov_score = :markov_score,
updated_at = :updated_at,
blob_name = :blob_name
updated_at = :updated_at
WHERE
steam_id = :steam_id AND
session_id = :session_id
Expand All @@ -566,7 +566,6 @@ def _close_session_with_demo(
"updated_at": current_time.isoformat(),
"demo_size": size,
"markov_score": markov_score,
"blob_name": demo_blob_name(session_id),
},
).scalar_one()
if late_bytes is not None:
Expand All @@ -590,9 +589,8 @@ def close_session_helper(
minio_client: Minio,
engine: Engine,
steam_id: str,
streaming_sessions:
SocketManagerMapType,
late_bytes: bytes | None
streaming_sessions: SocketManagerMapType,
late_bytes: bytes | None,
) -> str:
"""Properly close a session and return a summary message.

Expand Down Expand Up @@ -655,6 +653,11 @@ def demo_blob_name(session_id: str) -> str:
return f"{session_id}.dem"


def json_blob_name(session_id: str) -> str:
"""Format the object name for a json blob."""
return f"{session_id}.json"


def demo_sink_path(session_id: str) -> str:
"""Format the media path for a demo blob."""
return os.path.join(DEMOS_PATH, demo_blob_name(session_id))
Expand Down Expand Up @@ -863,13 +866,205 @@ def check_is_loser(engine: Engine, steam_id: str) -> bool:

return bool(result)


def get_broadcasts(engine: Engine) -> list[dict[str, str]]:
"""Get the list of broadcasts."""
with engine.connect() as conn:
result = conn.execute(
sa.text("SELECT * FROM broadcasts")
)
result = conn.execute(sa.text("SELECT * FROM broadcasts"))
rows = [row._asdict() for row in result.all()]
for row in rows:
row["post_date"] = row.pop("created_at")
return rows


# This function is only meant to run on boot!
def cleanup_hung_sessions(engine: Engine) -> None:
"""Remove any sessions that were left open/active after shutdown."""
logger.info("Checking for hanging sessions...")
with engine.connect() as conn:
result = conn.execute(
sa.text( # We have to delete reports first because of the REFERENCES constraint
"""
DELETE FROM reports WHERE session_id IN (
SELECT session_id FROM demo_sessions
WHERE active = true
OR open = true
OR demo_size IS NULL
);

DELETE FROM demo_sessions
WHERE active = true
OR open = true
OR demo_size IS NULL;
"""
)
)
deleted_rows = result.rowcount
conn.commit()
logger.info("Deleted %d hanging sessions.", deleted_rows)


# This function is only meant to run on boot!
def prune_if_necessary(engine: Engine, minio_client: Minio) -> bool:
"""Mark sessions as pruned so the specificed amount of free space is available."""
logger.info("Checking if we need to prune demos...")
current_size = get_total_storage_usage(minio_client)

with engine.connect() as conn:
max_result = conn.execute(
sa.text(
"""
SELECT max_storage_gb FROM prune_config;
"""
)
)
max_size_gb = max_result.scalar_one()
if max_size_gb is None or max_size_gb <= 0:
logger.warning("No storage limit set, enjoy filling your disk!")
return False
max_size = max_size_gb * (1024**3)
total_bytes_to_remove = current_size - max_size
logger.info("Current size: %d MB; Max size: %d MB", current_size / (1024**2), max_size / (1024**2))
if total_bytes_to_remove <= 0:
logger.info("No need to prune.")
return False

logger.info("Attempting to prune %d MB", max(0, total_bytes_to_remove / (1024**2)))

# get the oldest demos that don't have any detections
# we allow demos that have already been pruned in case we somehow end up in a state
# where a demo is marked as pruned but its blob remains.
result = conn.execute(
sa.text(
"""
SELECT session_id FROM demo_sessions
WHERE active = false
AND open = false
AND session_id NOT IN (SELECT session_id FROM analysis)
ORDER BY created_at ASC
"""
)
)

prunable_demos_oldest_first = [row[0] for row in result.all()]

minio_demoblobs_dict = {blob.object_name: blob for blob in minio_client.list_objects("demoblobs")}
session_ids_to_remove = []
bytes_saved = 0

# prune just enough so we're in our space budget
for session_id in prunable_demos_oldest_first:
blob = minio_demoblobs_dict.get(demo_blob_name(session_id))
if blob is None:
# already pruned, do not count
continue
session_ids_to_remove.append(session_id)
bytes_saved += blob.size
if bytes_saved >= total_bytes_to_remove:
break

if len(session_ids_to_remove) == 0:
logger.warning("No demos to prune, but we're over the limit!")
return False

# mark as pruned
conn.execute(
sa.text(
"""
UPDATE demo_sessions
SET pruned = true
WHERE session_id IN :session_ids_to_remove;
"""
),
{"session_ids_to_remove": tuple(session_ids_to_remove)},
)
conn.commit()
logger.info("Marked %d demos for pruning.", len(session_ids_to_remove))
# pruned demo blobs will be deleted by cleanup_orphaned_demos, which runs after this on boot
return True


# This function is only meant to run on boot!
def cleanup_pruned_demos(engine: Engine, minio_client: Minio) -> None:
"""Remove blobs for pruned or deleted sessions."""
logger.info("Checking for orphaned demos.")
with engine.connect() as conn:
result = conn.execute(
sa.text(
"""
SELECT session_id FROM demo_sessions WHERE pruned = false;
"""
)
)
ids_in_db = [row[0] for row in result.all()]
minio_demoblobs_dict = {blob.object_name: blob for blob in minio_client.list_objects("demoblobs")}
minio_jsonblobs_dict = {blob.object_name: blob for blob in minio_client.list_objects("jsonblobs")}

for session_id in ids_in_db:
demo_blob = demo_blob_name(session_id)
json_blob = json_blob_name(session_id)
if minio_demoblobs_dict.get(demo_blob) is not None:
minio_demoblobs_dict.pop(demo_blob)
if minio_jsonblobs_dict.get(json_blob) is not None:
minio_jsonblobs_dict.pop(json_blob)

# dicts now contain only orphaned blobs

ratio_result = conn.execute(
sa.text(
"""
SELECT max_prune_ratio FROM prune_config;
"""
)
)
# If we're gonna wipe more than max_prune_ratio (default 0.05) of the blobs, something is probably very wrong.
# Setting this to negative will perform a one-time prune regardless of ratio.
max_prune_ratio = ratio_result.scalar_one()
if len(minio_demoblobs_dict) > len(ids_in_db) * max_prune_ratio and max_prune_ratio >= 0:
logger.warning(
"Too many orphaned demo blobs: %d (%f%%) found, but limit set to %d (%f%%). "
"Refusing to clean up because something probably broke.",
len(minio_demoblobs_dict),
len(minio_demoblobs_dict) / len(ids_in_db) * 100,
math.floor(len(ids_in_db) * max_prune_ratio),
max_prune_ratio * 100,
)
return

if max_prune_ratio < 0:
max_prune_ratio = abs(max_prune_ratio)
logger.info("Orphaned demo cleanup forced by config. Setting back to %f", max_prune_ratio)
conn.execute(
sa.text(
"""
UPDATE prune_config
SET max_prune_ratio = :max_prune_ratio;
"""
),
{"max_prune_ratio": max_prune_ratio},
)
conn.commit()

for blob in minio_demoblobs_dict.values():
logger.info("Removing orphaned demo %s", blob.object_name)
minio_client.remove_object("demoblobs", blob.object_name)
for blob in minio_jsonblobs_dict.values():
logger.info("Removing orphaned json %s", blob.object_name)
minio_client.remove_object("jsonblobs", blob.object_name)


def get_total_storage_usage(minio_client: Minio) -> int:
"""Get the total storage used by all buckets in bytes."""
try:
buckets = minio_client.list_buckets()
total_size = 0

for bucket in buckets:
objects = minio_client.list_objects(bucket.name, recursive=True)
bucket_size = sum(obj.size for obj in objects)
total_size += bucket_size

return total_size
except S3Error as exc:
print("Error occurred:", exc)
return -1
20 changes: 18 additions & 2 deletions masterbase/registers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@
from sqlalchemy import Engine, create_engine
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine

from masterbase.lib import make_db_uri, make_minio_client
from masterbase.lib import (
cleanup_hung_sessions,
cleanup_pruned_demos,
make_db_uri,
make_minio_client,
prune_if_necessary,
)


def get_minio_connection(app: Litestar) -> Minio:
Expand Down Expand Up @@ -55,5 +61,15 @@ async def close_async_db_connection(app: Litestar) -> None:
await cast("AsyncEngine", app.state.async_engine).dispose()


startup_registers = (get_db_connection, get_async_db_connection, get_minio_connection)
def boot_cleanup(app: Litestar) -> None:
"""Cleanup the database on boot."""
engine = app.state.engine
minio_client = app.state.minio_client

cleanup_hung_sessions(engine)
prune_if_necessary(engine, minio_client)
cleanup_pruned_demos(engine, minio_client)


startup_registers = (get_db_connection, get_async_db_connection, get_minio_connection, boot_cleanup)
shutdown_registers = (close_db_connection, close_async_db_connection)
Loading