Skip to content

Commit f9dacef

Browse files
authored
Merge pull request #2 from ydb-platform/fix_tx_modes
Fix tx modes
2 parents 5f50b2c + d6850b2 commit f9dacef

File tree

4 files changed

+112
-55
lines changed

4 files changed

+112
-55
lines changed

tests/test_connections.py

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,48 +26,39 @@ def _test_isolation_level_read_only(
2626
isolation_level: str,
2727
read_only: bool,
2828
) -> None:
29-
connection.set_isolation_level("AUTOCOMMIT")
3029
cursor = connection.cursor()
3130
with suppress(dbapi.DatabaseError):
32-
maybe_await(cursor.execute("DROP TABLE foo"))
33-
34-
cursor = connection.cursor()
35-
maybe_await(cursor.execute(
31+
maybe_await(cursor.execute_scheme("DROP TABLE foo"))
32+
maybe_await(cursor.execute_scheme(
3633
"CREATE TABLE foo(id Int64 NOT NULL, PRIMARY KEY (id))"
3734
))
3835

3936
connection.set_isolation_level(isolation_level)
4037
cursor = connection.cursor()
41-
4238
query = "UPSERT INTO foo(id) VALUES (1)"
4339
if read_only:
4440
with pytest.raises(dbapi.DatabaseError):
4541
maybe_await(cursor.execute(query))
46-
4742
else:
4843
maybe_await(cursor.execute(query))
4944

5045
maybe_await(connection.rollback())
5146

52-
connection.set_isolation_level("AUTOCOMMIT")
53-
54-
cursor = connection.cursor()
55-
56-
maybe_await(cursor.execute("DROP TABLE foo"))
47+
maybe_await(cursor.execute_scheme("DROP TABLE foo"))
5748

5849
def _test_connection(self, connection: dbapi.Connection) -> None:
5950
maybe_await(connection.commit())
6051
maybe_await(connection.rollback())
6152

6253
cur = connection.cursor()
6354
with suppress(dbapi.DatabaseError):
64-
maybe_await(cur.execute("DROP TABLE foo"))
55+
maybe_await(cur.execute_scheme("DROP TABLE foo"))
6556

6657
assert not maybe_await(connection.check_exists("/local/foo"))
6758
with pytest.raises(dbapi.ProgrammingError):
6859
maybe_await(connection.describe("/local/foo"))
6960

70-
maybe_await(cur.execute(
61+
maybe_await(cur.execute_scheme(
7162
"CREATE TABLE foo(id Int64 NOT NULL, PRIMARY KEY (id))"
7263
))
7364

@@ -77,17 +68,17 @@ def _test_connection(self, connection: dbapi.Connection) -> None:
7768
assert col.name == "id"
7869
assert col.type == ydb.PrimitiveType.Int64
7970

80-
maybe_await(cur.execute("DROP TABLE foo"))
71+
maybe_await(cur.execute_scheme("DROP TABLE foo"))
8172
maybe_await(cur.close())
8273

8374
def _test_cursor_raw_query(self, connection: dbapi.Connection) -> None:
8475
cur = connection.cursor()
8576
assert cur
8677

8778
with suppress(dbapi.DatabaseError):
88-
maybe_await(cur.execute("DROP TABLE test"))
79+
maybe_await(cur.execute_scheme("DROP TABLE test"))
8980

90-
maybe_await(cur.execute(
81+
maybe_await(cur.execute_scheme(
9182
"CREATE TABLE test(id Int64 NOT NULL, text Utf8, PRIMARY KEY (id))"
9283
))
9384

@@ -112,7 +103,7 @@ def _test_cursor_raw_query(self, connection: dbapi.Connection) -> None:
112103
},
113104
))
114105

115-
maybe_await(cur.execute("DROP TABLE test"))
106+
maybe_await(cur.execute_scheme("DROP TABLE test"))
116107

117108
maybe_await(cur.close())
118109

@@ -130,7 +121,7 @@ def _test_errors(
130121
cur = connection.cursor()
131122

132123
with suppress(dbapi.DatabaseError):
133-
maybe_await(cur.execute("DROP TABLE test"))
124+
maybe_await(cur.execute_scheme("DROP TABLE test"))
134125

135126
with pytest.raises(dbapi.DataError):
136127
maybe_await(cur.execute("SELECT 18446744073709551616"))
@@ -144,7 +135,7 @@ def _test_errors(
144135
with pytest.raises(dbapi.ProgrammingError):
145136
maybe_await(cur.execute("SELECT * FROM test"))
146137

147-
maybe_await(cur.execute(
138+
maybe_await(cur.execute_scheme(
148139
"CREATE TABLE test(id Int64, PRIMARY KEY (id))"
149140
))
150141

@@ -153,7 +144,7 @@ def _test_errors(
153144
with pytest.raises(dbapi.IntegrityError):
154145
maybe_await(cur.execute("INSERT INTO test(id) VALUES(1)"))
155146

156-
maybe_await(cur.execute("DROP TABLE test"))
147+
maybe_await(cur.execute_scheme("DROP TABLE test"))
157148
maybe_await(cur.close())
158149

159150

@@ -211,7 +202,9 @@ def connect() -> dbapi.AsyncConnection:
211202
try:
212203
yield conn
213204
finally:
214-
await greenlet_spawn(conn.close)
205+
def close() -> None:
206+
maybe_await(conn.close())
207+
await greenlet_spawn(close)
215208

216209
@pytest.mark.asyncio
217210
@pytest.mark.parametrize(

tests/test_cursors.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ def _test_cursor_fetch_all_multiple_result_sets(
140140
class TestCursor(BaseCursorTestSuit):
141141
@pytest.fixture
142142
def sync_cursor(self, session_sync: ydb.QuerySession) -> Generator[Cursor]:
143-
cursor = Cursor(session_sync)
143+
cursor = Cursor(session_sync, ydb.QuerySerializableReadWrite())
144144
yield cursor
145145
cursor.close()
146146

@@ -175,7 +175,7 @@ class TestAsyncCursor(BaseCursorTestSuit):
175175
async def async_cursor(
176176
self, session: ydb.aio.QuerySession
177177
) -> AsyncGenerator[Cursor]:
178-
cursor = AsyncCursor(session)
178+
cursor = AsyncCursor(session, ydb.QuerySerializableReadWrite())
179179
yield cursor
180180
await greenlet_spawn(cursor.close)
181181

ydb_dbapi/connections.py

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,14 @@ class _IsolationSettings(NamedTuple):
4242
ydb.QuerySerializableReadWrite(), interactive=True
4343
),
4444
IsolationLevel.ONLINE_READONLY: _IsolationSettings(
45-
ydb.QueryOnlineReadOnly(), interactive=True
45+
ydb.QueryOnlineReadOnly(), interactive=False
4646
),
4747
IsolationLevel.ONLINE_READONLY_INCONSISTENT: _IsolationSettings(
4848
ydb.QueryOnlineReadOnly().with_allow_inconsistent_reads(),
49-
interactive=True,
49+
interactive=False,
5050
),
5151
IsolationLevel.STALE_READONLY: _IsolationSettings(
52-
ydb.QueryStaleReadOnly(), interactive=True
52+
ydb.QueryStaleReadOnly(), interactive=False
5353
),
5454
IsolationLevel.SNAPSHOT_READONLY: _IsolationSettings(
5555
ydb.QuerySnapshotReadOnly(), interactive=True
@@ -78,10 +78,11 @@ def __init__(
7878

7979
self.connection_kwargs: dict = kwargs
8080

81-
self._tx_mode: ydb.BaseQueryTxMode = ydb.QuerySerializableReadWrite()
81+
self._shared_session_pool: bool = False
82+
8283
self._tx_context: TxContext | AsyncTxContext | None = None
84+
self._tx_mode: ydb.BaseQueryTxMode = ydb.QuerySerializableReadWrite()
8385
self.interactive_transaction: bool = False
84-
self._shared_session_pool: bool = False
8586

8687
if ydb_session_pool is not None:
8788
self._shared_session_pool = True
@@ -99,11 +100,13 @@ def __init__(
99100
self._session: ydb.QuerySession | ydb.aio.QuerySession | None = None
100101

101102
def set_isolation_level(self, isolation_level: IsolationLevel) -> None:
102-
ydb_isolation_settings = _ydb_isolation_settings_map[isolation_level]
103103
if self._tx_context and self._tx_context.tx_id:
104104
raise InternalError(
105105
"Failed to set transaction mode: transaction is already began"
106106
)
107+
108+
ydb_isolation_settings = _ydb_isolation_settings_map[isolation_level]
109+
107110
self._tx_mode = ydb_isolation_settings.ydb_mode
108111
self.interactive_transaction = ydb_isolation_settings.interactive
109112

@@ -113,7 +116,7 @@ def get_isolation_level(self) -> str:
113116
return IsolationLevel.SERIALIZABLE
114117
return IsolationLevel.AUTOCOMMIT
115118
if self._tx_mode.name == ydb.QueryOnlineReadOnly().name:
116-
if self._tx_mode.settings.allow_inconsistent_reads:
119+
if self._tx_mode.allow_inconsistent_reads:
117120
return IsolationLevel.ONLINE_READONLY_INCONSISTENT
118121
return IsolationLevel.ONLINE_READONLY
119122
if self._tx_mode.name == ydb.QueryStaleReadOnly().name:
@@ -123,6 +126,12 @@ def get_isolation_level(self) -> str:
123126
msg = f"{self._tx_mode.name} is not supported"
124127
raise NotSupportedError(msg)
125128

129+
def _maybe_init_tx(
130+
self, session: ydb.QuerySession | ydb.aio.QuerySession
131+
) -> None:
132+
if self._tx_context is None and self.interactive_transaction:
133+
self._tx_context = session.transaction(self._tx_mode)
134+
126135

127136
class Connection(BaseConnection):
128137
_driver_cls = ydb.Driver
@@ -154,15 +163,12 @@ def cursor(self) -> Cursor:
154163
if self._session is None:
155164
raise RuntimeError("Connection is not ready, use wait_ready.")
156165

157-
if self.interactive_transaction:
158-
self._tx_context = self._session.transaction(self._tx_mode)
159-
else:
160-
self._tx_context = None
166+
self._maybe_init_tx(self._session)
161167

162168
self._current_cursor = self._cursor_cls(
163169
session=self._session,
170+
tx_mode=self._tx_mode,
164171
tx_context=self._tx_context,
165-
autocommit=(not self.interactive_transaction),
166172
)
167173
return self._current_cursor
168174

@@ -181,16 +187,19 @@ def wait_ready(self, timeout: int = 10) -> None:
181187

182188
self._session = self._session_pool.acquire()
183189

190+
@handle_ydb_errors
184191
def commit(self) -> None:
185192
if self._tx_context and self._tx_context.tx_id:
186193
self._tx_context.commit()
187194
self._tx_context = None
188195

196+
@handle_ydb_errors
189197
def rollback(self) -> None:
190198
if self._tx_context and self._tx_context.tx_id:
191199
self._tx_context.rollback()
192200
self._tx_context = None
193201

202+
@handle_ydb_errors
194203
def close(self) -> None:
195204
self.rollback()
196205

@@ -281,15 +290,12 @@ def cursor(self) -> AsyncCursor:
281290
if self._session is None:
282291
raise RuntimeError("Connection is not ready, use wait_ready.")
283292

284-
if self.interactive_transaction:
285-
self._tx_context = self._session.transaction(self._tx_mode)
286-
else:
287-
self._tx_context = None
293+
self._maybe_init_tx(self._session)
288294

289295
self._current_cursor = self._cursor_cls(
290296
session=self._session,
297+
tx_mode=self._tx_mode,
291298
tx_context=self._tx_context,
292-
autocommit=(not self.interactive_transaction),
293299
)
294300
return self._current_cursor
295301

@@ -308,16 +314,19 @@ async def wait_ready(self, timeout: int = 10) -> None:
308314

309315
self._session = await self._session_pool.acquire()
310316

317+
@handle_ydb_errors
311318
async def commit(self) -> None:
312319
if self._tx_context and self._tx_context.tx_id:
313320
await self._tx_context.commit()
314321
self._tx_context = None
315322

323+
@handle_ydb_errors
316324
async def rollback(self) -> None:
317325
if self._tx_context and self._tx_context.tx_id:
318326
await self._tx_context.rollback()
319327
self._tx_context = None
320328

329+
@handle_ydb_errors
321330
async def close(self) -> None:
322331
await self.rollback()
323332

0 commit comments

Comments
 (0)