Skip to content

Commit 93ebff6

Browse files
committed
scheme&table queries
1 parent b2e7d3c commit 93ebff6

File tree

2 files changed

+100
-0
lines changed

2 files changed

+100
-0
lines changed

ydb_dbapi/connection.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
11
from typing import (
22
Any,
3+
List,
34
NamedTuple,
45
Optional,
56
)
7+
import posixpath
68

79
import ydb
10+
from ydb.retries import retry_operation_async
11+
12+
from .utils import (
13+
handle_ydb_errors,
14+
)
815

916
from .errors import (
1017
# InterfaceError,
@@ -38,6 +45,8 @@ def __init__(
3845
"ydb_table_path_prefix", ""
3946
)
4047

48+
self.driver: ydb.aio.Driver = self.conn_kwargs.pop("ydb_driver", None)
49+
4150
self.session_pool: ydb.aio.QuerySessionPool = self.conn_kwargs.pop(
4251
"ydb_session_pool", None
4352
)
@@ -125,6 +134,48 @@ def get_isolation_level(self) -> str:
125134
else:
126135
raise NotSupportedError(f"{self.tx_mode.name} is not supported")
127136

137+
@handle_ydb_errors
138+
async def describe(self, table_path: str) -> ydb.TableSchemeEntry:
139+
abs_table_path = posixpath.join(
140+
self.database, self.table_path_prefix, table_path
141+
)
142+
return self.driver.table_client.describe_table(abs_table_path)
143+
144+
@handle_ydb_errors
145+
async def check_exists(self, table_path: str) -> ydb.SchemeEntry:
146+
abs_table_path = posixpath.join(
147+
self.database, self.table_path_prefix, table_path
148+
)
149+
return await self._check_path_exists(abs_table_path)
150+
151+
@handle_ydb_errors
152+
async def get_table_names(self) -> List[str]:
153+
abs_dir_path = posixpath.join(self.database, self.table_path_prefix)
154+
names = await self._get_table_names(abs_dir_path)
155+
return [posixpath.relpath(path, abs_dir_path) for path in names]
156+
157+
async def _check_path_exists(self, table_path: str) -> ydb.SchemeEntry:
158+
try:
159+
await retry_operation_async(
160+
self.driver.scheme_client.describe_path, table_path
161+
)
162+
return True
163+
except ydb.SchemeError:
164+
return False
165+
166+
async def _get_table_names(self, abs_dir_path: str) -> List[str]:
167+
directory = await retry_operation_async(
168+
self.driver.scheme_client.list_directory, abs_dir_path
169+
)
170+
result = []
171+
for child in directory.children:
172+
child_abs_path = posixpath.join(abs_dir_path, child.name)
173+
if child.is_table():
174+
result.append(child_abs_path)
175+
elif child.is_directory() and not child.name.startswith("."):
176+
result.extend(self.get_table_names(child_abs_path))
177+
return result
178+
128179

129180
async def connect() -> Connection:
130181
return Connection()

ydb_dbapi/utils.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import functools
2+
import ydb
3+
4+
from .errors import (
5+
DatabaseError,
6+
DataError,
7+
IntegrityError,
8+
InternalError,
9+
NotSupportedError,
10+
OperationalError,
11+
ProgrammingError,
12+
)
13+
14+
15+
def handle_ydb_errors(func):
16+
@functools.wraps(func)
17+
def wrapper(*args, **kwargs):
18+
try:
19+
return func(*args, **kwargs)
20+
except (ydb.issues.AlreadyExists, ydb.issues.PreconditionFailed) as e:
21+
raise IntegrityError(e.message, original_error=e) from e
22+
except (ydb.issues.Unsupported, ydb.issues.Unimplemented) as e:
23+
raise NotSupportedError(e.message, original_error=e) from e
24+
except (ydb.issues.BadRequest, ydb.issues.SchemeError) as e:
25+
raise ProgrammingError(e.message, original_error=e) from e
26+
except (
27+
ydb.issues.TruncatedResponseError,
28+
ydb.issues.ConnectionError,
29+
ydb.issues.Aborted,
30+
ydb.issues.Unavailable,
31+
ydb.issues.Overloaded,
32+
ydb.issues.Undetermined,
33+
ydb.issues.Timeout,
34+
ydb.issues.Cancelled,
35+
ydb.issues.SessionBusy,
36+
ydb.issues.SessionExpired,
37+
ydb.issues.SessionPoolEmpty,
38+
) as e:
39+
raise OperationalError(e.message, original_error=e) from e
40+
except ydb.issues.GenericError as e:
41+
raise DataError(e.message, original_error=e) from e
42+
except ydb.issues.InternalError as e:
43+
raise InternalError(e.message, original_error=e) from e
44+
except ydb.Error as e:
45+
raise DatabaseError(e.message, original_error=e) from e
46+
except Exception as e:
47+
raise DatabaseError("Failed to execute query") from e
48+
49+
return wrapper

0 commit comments

Comments
 (0)