1212)
1313
1414import ydb
15- from .errors import DatabaseError
15+ from .errors import Error , DatabaseError
1616from .utils import handle_ydb_errors , AsyncFromSyncIterator
1717
1818
@@ -40,37 +40,37 @@ class Cursor:
4040 def __init__ (
4141 self ,
4242 session_pool : ydb .aio .QuerySessionPool ,
43- session : ydb .aio .QuerySession ,
44- tx_mode : Optional [ydb .aio .QueryTxContext ] = None ,
4543 tx_context : Optional [ydb .aio .QueryTxContext ] = None ,
4644 table_path_prefix : str = "" ,
4745 autocommit : bool = True ,
4846 ):
4947 self .arraysize : int = 1
5048 self ._description : Optional [List [Tuple ]] = None
5149
52- self ._pool = session_pool
53- self ._session = session
54- self ._tx_mode = tx_mode
55- self ._tx_context : ydb .aio .QueryTxContext = tx_context
50+ self ._session_pool = session_pool
51+ self ._tx_context = tx_context
5652 self ._table_path_prefix = table_path_prefix
5753 self ._autocommit = autocommit
5854
5955 self ._stream : Optional [AsyncIterator ] = None
6056 self ._rows : Optional [Iterator [Dict ]] = None
6157
6258 @handle_ydb_errors
63- async def _execute_ddl_query (
59+ async def _execute_generic_query (
6460 self , query : str , parameters : Optional [ParametersType ] = None
6561 ) -> List [ydb .convert .ResultSet ]:
66- return await self ._pool .execute_with_retries (
62+ return await self ._session_pool .execute_with_retries (
6763 query = query , parameters = parameters
6864 )
6965
7066 @handle_ydb_errors
71- async def _execute_dml_query (
67+ async def _execute_transactional_query (
7268 self , query : str , parameters : Optional [ParametersType ] = None
7369 ) -> AsyncIterator :
70+ if self ._tx_context is None :
71+ raise Error (
72+ "Unable to execute tx based queries without transaction."
73+ )
7474 return await self ._tx_context .execute (
7575 query = query ,
7676 parameters = parameters ,
@@ -79,17 +79,17 @@ async def _execute_dml_query(
7979
8080 @handle_ydb_errors
8181 async def execute (
82- self , operation : YdbQuery , parameters : Optional [ParametersType ] = None
82+ self , query : str , parameters : Optional [ParametersType ] = None
8383 ):
84- if operation . is_ddl :
85- result_sets = await self ._execute_ddl_query (
86- query = operation . yql_text , parameters = parameters
84+ if self . _tx_context is not None :
85+ self . _stream = await self ._execute_transactional_query (
86+ query = query , parameters = parameters
8787 )
88- self ._stream = AsyncFromSyncIterator (iter (result_sets ))
8988 else :
90- self . _stream = await self ._execute_dml_query (
91- query = operation . yql_text , parameters = parameters
89+ result_sets = await self ._execute_generic_query (
90+ query = query , parameters = parameters
9291 )
92+ self ._stream = AsyncFromSyncIterator (iter (result_sets ))
9393
9494 if self ._stream is None :
9595 return
@@ -98,7 +98,6 @@ async def execute(
9898 self ._update_result_set (result_set )
9999
100100 def _update_result_set (self , result_set : ydb .convert .ResultSet ):
101- # self._result_set = result_set
102101 self ._update_description (result_set )
103102 self ._rows = self ._rows_iterable (result_set )
104103
@@ -133,8 +132,13 @@ async def fetchone(self):
133132 return next (self ._rows or iter ([]), None )
134133
135134 async def fetchmany (self , size : Optional [int ] = None ):
136- return list (
137- itertools .islice (self ._rows or iter ([]), size or self .arraysize )
135+ return (
136+ list (
137+ itertools .islice (
138+ self ._rows or iter ([]), size or self .arraysize
139+ )
140+ )
141+ or None
138142 )
139143
140144 async def fetchall (self ):
0 commit comments