-
Notifications
You must be signed in to change notification settings - Fork 102
Expand file tree
/
Copy pathdatabase.py
More file actions
56 lines (44 loc) · 1.59 KB
/
database.py
File metadata and controls
56 lines (44 loc) · 1.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
"""Database connection and session management."""
import logging
from contextlib import asynccontextmanager
from typing import AsyncGenerator
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorClientSession
logger = logging.getLogger(__name__)
client: AsyncIOMotorClient | None = None
session: AsyncIOMotorClientSession | None = None
@asynccontextmanager
async def get_session(
client: AsyncIOMotorClient,
) -> AsyncGenerator[AsyncIOMotorClientSession | None, None]:
"""Yield a MongoDB session with transaction management."""
session = await client.start_session()
try:
await session.start_transaction() # type: ignore[misc]
yield session
await session.commit_transaction()
except Exception as e:
await session.abort_transaction()
raise e
finally:
await session.end_session()
session = None # type: ignore[assignment]
def get_client() -> AsyncIOMotorClient | None:
"""Get the MongoDB client instance."""
return client
def connect_to_mongo(uri: str) -> None:
"""Connect to MongoDB using the provided URI."""
global client
if client is None:
client = AsyncIOMotorClient(uri)
logger.info("Connected to MongoDB.")
else:
logger.warning("MongoDB client is already connected.")
def close_mongo_connection() -> None:
"""Close the MongoDB connection."""
global client
if client:
client.close()
client = None # ignore: type[assignment]
logger.info("MongoDB connection closed.")
else:
logger.warning("MongoDB client is already closed.")