Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions masterbase/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
demo_blob_name,
generate_api_key,
generate_uuid4_int,
get_broadcasts,
get_uningested_demos,
ingest_demo,
late_bytes_helper,
Expand Down Expand Up @@ -77,7 +78,7 @@ def landing() -> Redirect:
return Redirect(path="https://github.com/MegaAntiCheat/client-backend")


@get("/session_id", guards=[valid_key_guard, user_in_session_guard, valid_session_guard], sync_to_thread=False)
@get("/session_id", guards=[valid_key_guard, user_not_in_session_guard, valid_session_guard], sync_to_thread=False)
def session_id(
request: Request,
api_key: str,
Expand Down Expand Up @@ -110,7 +111,7 @@ def session_id(
return {"session_id": _session_id}


@get("/close_session", guards=[valid_key_guard, user_not_in_session_guard], sync_to_thread=False)
@get("/close_session", guards=[valid_key_guard, user_in_session_guard], sync_to_thread=False)
def close_session(request: Request, api_key: str) -> dict[str, bool]:
"""Close a session out. Will find the latest open session for a user.

Expand All @@ -119,15 +120,32 @@ def close_session(request: Request, api_key: str) -> dict[str, bool]:
"""
minio_client = request.app.state.minio_client
engine = request.app.state.engine
steam_id = steam_id_from_api_key(engine, api_key)

msg = close_session_helper(minio_client, engine, steam_id, streaming_sessions, None)
logger.info(msg)

return {"closed_successfully": True}

@post("/close_session", guards=[valid_key_guard, user_in_session_guard], sync_to_thread=False)
def close_with_late_bytes(request: Request, api_key: str, data: LateBytesBody) -> dict[str, bool]:
"""Close a session out. Will find the latest open session for a user.

Returns:
{"closed_successfully": True}
"""
minio_client = request.app.state.minio_client
engine = request.app.state.engine
late_bytes = bytes.fromhex(data.late_bytes)
steam_id = steam_id_from_api_key(engine, api_key)
msg = close_session_helper(minio_client, engine, steam_id, streaming_sessions)

msg = close_session_helper(minio_client, engine, steam_id, streaming_sessions, late_bytes)
logger.info(msg)

return {"closed_successfully": True}


@post("/late_bytes", guards=[valid_key_guard, user_not_in_session_guard], sync_to_thread=False)
@post("/late_bytes", guards=[valid_key_guard, user_in_session_guard], sync_to_thread=False)
def late_bytes(request: Request, api_key: str, data: LateBytesBody) -> dict[str, bool]:
"""Add late bytes to a closed demo session.

Expand Down Expand Up @@ -239,6 +257,11 @@ async def report_player(request: Request, api_key: str, data: ReportBody) -> dic
except IntegrityError:
raise HTTPException(detail=f"Unknown session ID {data.session_id}", status_code=402)

@get("/broadcasts", sync_to_thread=False)
def broadcasts(request: Request) -> list[dict[str, str]]:
"""Return a list of broadcasts."""
engine = request.app.state.engine
return get_broadcasts(engine)

class DemoHandler(WebsocketListener):
"""Custom Websocket Class."""
Expand Down Expand Up @@ -281,7 +304,11 @@ async def on_accept(self, socket: WebSocket, api_key: str, session_id: str) -> N

async def on_disconnect(self, socket: WebSocket) -> None: # type: ignore
"""Close handle on disconnect."""
session_manager = streaming_sessions[socket]
if socket in streaming_sessions:
session_manager = streaming_sessions[socket]
else :
logger.warning("Attempting to disconnect from already disconnected socket!")
return
logger.info(f"Received socket disconnect from session ID: {session_manager.session_id}")
session_manager.disconnect()
await set_open_false(socket.app.state.async_engine, session_manager.session_id)
Expand Down Expand Up @@ -440,6 +467,7 @@ def plain_text_exception_handler(_: Request, exception: Exception) -> Response:
landing,
session_id,
close_session,
close_with_late_bytes,
DemoHandler,
provision,
provision_handler,
Expand All @@ -450,6 +478,7 @@ def plain_text_exception_handler(_: Request, exception: Exception) -> Response:
report_player,
db_export,
jobs,
broadcasts,
ingest,
],
exception_handlers={Exception: plain_text_exception_handler},
Expand Down
6 changes: 3 additions & 3 deletions masterbase/guards.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async def analyst_guard(connection: ASGIConnection, _: BaseRouteHandler) -> None
raise NotAuthorizedException()


async def user_in_session_guard(connection: ASGIConnection, _: BaseRouteHandler) -> None:
async def user_not_in_session_guard(connection: ASGIConnection, _: BaseRouteHandler) -> None:
"""Assert that the user is not currently in a session."""
async_engine = connection.app.state.async_engine

Expand All @@ -59,8 +59,8 @@ async def user_in_session_guard(connection: ASGIConnection, _: BaseRouteHandler)
)


async def user_not_in_session_guard(connection: ASGIConnection, _: BaseRouteHandler) -> None:
"""Assert that the user is not currently in a session."""
async def user_in_session_guard(connection: ASGIConnection, _: BaseRouteHandler) -> None:
"""Assert that the user is currently in a session."""
async_engine = connection.app.state.async_engine

api_key = connection.query_params["api_key"]
Expand Down
22 changes: 21 additions & 1 deletion masterbase/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,12 @@ def _close_session_with_demo(


def close_session_helper(
minio_client: Minio, engine: Engine, steam_id: str, streaming_sessions: SocketManagerMapType
minio_client: Minio,
engine: Engine,
steam_id: str,
streaming_sessions:
SocketManagerMapType,
late_bytes: bytes | None
) -> str:
"""Properly close a session and return a summary message.

Expand Down Expand Up @@ -618,6 +623,10 @@ def close_session_helper(
msg = "No active session found, closing anyway."
else:
if os.path.exists(session_manager.demo_path):
if late_bytes is not None:
late_bytes_msg = late_bytes_helper(engine, steam_id, late_bytes, current_time)
if late_bytes_msg is not None:
return late_bytes_msg
_close_session_with_demo(
minio_client,
engine,
Expand Down Expand Up @@ -853,3 +862,14 @@ def check_is_loser(engine: Engine, steam_id: str) -> bool:
).scalar_one_or_none()

return bool(result)

def get_broadcasts(engine: Engine) -> list[dict[str, str]]:
"""Get the list of broadcasts."""
with engine.connect() as conn:
result = conn.execute(
sa.text("SELECT * FROM broadcasts")
)
rows = [row._asdict() for row in result.all()]
for row in rows:
row["post_date"] = row.pop("created_at")
return rows
41 changes: 41 additions & 0 deletions migrations/versions/f51cab87d3fd_broadcasts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""broadcasts

Revision ID: f51cab87d3fd
Revises: b941ebee3091
Create Date: 2025-02-10 17:03:28.325372

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = 'f51cab87d3fd'
down_revision: Union[str, None] = 'b941ebee3091'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Add Broadcasts table"""
op.execute(
"""
CREATE TABLE broadcasts (
message varchar,
importance varchar,
created_at timestamptz,
PRIMARY KEY (message)
);
"""
)


def downgrade() -> None:
"""Delete broadcasts table"""
op.execute(
"""
DROP TABLE broadcasts;
"""
)