11from typing import (
22 Any ,
3+ NamedTuple ,
34 Optional ,
45)
56
67import ydb
78
9+ from .errors import (
10+ # InterfaceError,
11+ InternalError ,
12+ NotSupportedError ,
13+ )
14+
15+
16+ class IsolationLevel :
17+ SERIALIZABLE = "SERIALIZABLE"
18+ ONLINE_READONLY = "ONLINE READONLY"
19+ ONLINE_READONLY_INCONSISTENT = "ONLINE READONLY INCONSISTENT"
20+ STALE_READONLY = "STALE READONLY"
21+ SNAPSHOT_READONLY = "SNAPSHOT READONLY"
22+ AUTOCOMMIT = "AUTOCOMMIT"
23+
824
925class Connection :
1026 def __init__ (
@@ -22,11 +38,15 @@ def __init__(
2238 "ydb_table_path_prefix" , ""
2339 )
2440
25- self .session_pool : ydb .aio .QuerySessionPool = None
41+ self .session_pool : ydb .aio .QuerySessionPool = self .conn_kwargs .pop (
42+ "ydb_session_pool" , None
43+ )
2644 self .session : ydb .aio .QuerySession = None
2745 self .tx_context : Optional [ydb .QueryTxContext ] = None
2846 self .tx_mode : ydb .BaseQueryTxMode = ydb .QuerySerializableReadWrite ()
2947
48+ self .interactive_transaction : bool = False # AUTOCOMMIT
49+
3050 def cursor (self ):
3151 pass
3252
@@ -51,7 +71,59 @@ async def rollback(self):
5171 self .tx_context = None
5272
5373 async def close (self ):
54- pass
74+ await self .rollback ()
75+
76+ def set_isolation_level (self , isolation_level : str ):
77+ class IsolationSettings (NamedTuple ):
78+ ydb_mode : ydb .BaseQueryTxMode
79+ interactive : bool
80+
81+ ydb_isolation_settings_map = {
82+ IsolationLevel .AUTOCOMMIT : IsolationSettings (
83+ ydb .QuerySerializableReadWrite (), interactive = False
84+ ),
85+ IsolationLevel .SERIALIZABLE : IsolationSettings (
86+ ydb .QuerySerializableReadWrite (), interactive = True
87+ ),
88+ IsolationLevel .ONLINE_READONLY : IsolationSettings (
89+ ydb .QueryOnlineReadOnly (), interactive = False
90+ ),
91+ IsolationLevel .ONLINE_READONLY_INCONSISTENT : IsolationSettings (
92+ ydb .QueryOnlineReadOnly ().with_allow_inconsistent_reads (),
93+ interactive = False ,
94+ ),
95+ IsolationLevel .STALE_READONLY : IsolationSettings (
96+ ydb .QueryStaleReadOnly (), interactive = False
97+ ),
98+ IsolationLevel .SNAPSHOT_READONLY : IsolationSettings (
99+ ydb .QuerySnapshotReadOnly (), interactive = True
100+ ),
101+ }
102+ ydb_isolation_settings = ydb_isolation_settings_map [isolation_level ]
103+ if self .tx_context and self .tx_context .tx_id :
104+ raise InternalError (
105+ "Failed to set transaction mode: transaction is already began"
106+ )
107+ self .tx_mode = ydb_isolation_settings .ydb_mode
108+ self .interactive_transaction = ydb_isolation_settings .interactive
109+
110+ def get_isolation_level (self ) -> str :
111+ if self .tx_mode .name == ydb .SerializableReadWrite ().name :
112+ if self .interactive_transaction :
113+ return IsolationLevel .SERIALIZABLE
114+ else :
115+ return IsolationLevel .AUTOCOMMIT
116+ elif self .tx_mode .name == ydb .OnlineReadOnly ().name :
117+ if self .tx_mode .settings .allow_inconsistent_reads :
118+ return IsolationLevel .ONLINE_READONLY_INCONSISTENT
119+ else :
120+ return IsolationLevel .ONLINE_READONLY
121+ elif self .tx_mode .name == ydb .StaleReadOnly ().name :
122+ return IsolationLevel .STALE_READONLY
123+ elif self .tx_mode .name == ydb .SnapshotReadOnly ().name :
124+ return IsolationLevel .SNAPSHOT_READONLY
125+ else :
126+ raise NotSupportedError (f"{ self .tx_mode .name } is not supported" )
55127
56128
57129async def connect () -> Connection :
0 commit comments