11import dataclasses
2+ import typing
23from typing import (
34 Any ,
45 Dict ,
1011
1112import ydb
1213
14+ if typing .TYPE_CHECKING :
15+ from ydb .aio .query .base import AsyncResponseContextIterator
16+
1317
1418ParametersType = Dict [
1519 str ,
@@ -32,9 +36,10 @@ def __init__(
3236 self ,
3337 session_pool : ydb .aio .QuerySessionPool ,
3438 session : ydb .aio .QuerySession ,
35- tx_mode : ydb .BaseQueryTxMode ,
36- tx_context : Optional [ ydb .QueryTxContext ] = None ,
39+ tx_mode : ydb .aio . QueryTxContext ,
40+ tx_context : ydb .aio . QueryTxContext ,
3741 table_path_prefix : str = "" ,
42+ autocommit : bool = True ,
3843 ):
3944 self .arraysize = 1
4045 self ._description = None
@@ -44,18 +49,37 @@ def __init__(
4449 self ._tx_mode = tx_mode
4550 self ._tx_context = tx_context
4651 self ._table_path_prefix = table_path_prefix
52+ self ._autocommit = autocommit
4753
4854 async def _execute_ddl_query (
49- self , query : YdbQuery , parameters : Optional [ParametersType ] = None
55+ self , query : str , parameters : Optional [ParametersType ] = None
5056 ) -> List [ydb .convert .ResultSet ]:
5157 return await self ._pool .execute_with_retries (
5258 query = query , parameters = parameters
5359 )
5460
61+ async def _execute_dml_query (
62+ self , query : str , parameters : Optional [ParametersType ] = None
63+ ) -> AsyncResponseContextIterator :
64+ return await self ._tx_context .execute (
65+ query = query ,
66+ parameters = parameters ,
67+ commit_tx = self ._autocommit ,
68+ )
69+
5570 async def execute (
5671 self , operation : YdbQuery , parameters : Optional [ParametersType ] = None
5772 ):
58- pass
73+ if operation .is_ddl :
74+ result_sets = await self ._execute_ddl_query (
75+ query = operation .yql_text , parameters = parameters
76+ )
77+ else :
78+ result_sets_stream = await self ._execute_dml_query (
79+ query = operation .yql_text , parameters = parameters
80+ )
81+
82+ return result_sets or result_sets_stream
5983
6084 async def executemany (self ):
6185 pass
0 commit comments