Skip to content

Commit c16e51e

Browse files
committed
first working version of sync and asunc client with lock api
1 parent e7a4357 commit c16e51e

File tree

4 files changed

+132
-81
lines changed

4 files changed

+132
-81
lines changed

ydb/aio/coordination/lock.py

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import asyncio
22
from typing import Optional
33

4-
from ydb import issues
4+
from ydb import issues, StatusCode
55
from ydb._grpc.grpcwrapper.ydb_coordination import (
66
AcquireSemaphore,
77
ReleaseSemaphore,
@@ -130,17 +130,15 @@ async def __aenter__(self):
130130
async def __aexit__(self, exc_type, exc, tb):
131131
if self._req_id is not None:
132132
try:
133-
req = ReleaseSemaphore(req_id=self._req_id, name=self._name)
133+
req = ReleaseSemaphore(
134+
req_id=self._req_id,
135+
name=self._name,
136+
)
134137
await self.send(req)
135138
except issues.Error:
136139
pass
137140

138-
if self._reconnector:
139-
await self._reconnector.stop(flush=True)
140-
141-
self._stream = None
142141
self._req_id = None
143-
self._node_path = None
144142

145143
async def acquire(self):
146144
return await self.__aenter__()
@@ -160,18 +158,31 @@ async def create(self, init_limit, init_data):
160158
resp = await self._wait_for_response(req_id, kind="create")
161159
return CreateSemaphoreResult.from_proto(resp)
162160

163-
async def delete(self):
161+
async def delete(self, wait_empty_timeout: float = 5.0, poll_interval: float = 0.05):
164162
await self._ensure_session()
165163

166-
req_id = self.next_req_id()
167-
168-
req = DeleteSemaphore(
169-
req_id=req_id,
170-
name=self._name,
171-
)
164+
deadline = asyncio.get_running_loop().time() + wait_empty_timeout
165+
while True:
166+
try:
167+
desc = await self.describe()
168+
if desc.count == 0 and not desc.owners:
169+
break
170+
except issues.Error as e:
171+
if getattr(e, "status", None) == StatusCode.NOT_FOUND:
172+
break
173+
raise
174+
175+
now = asyncio.get_running_loop().time()
176+
if now > deadline:
177+
raise issues.Error(
178+
f"Timeout waiting for semaphore '{self._name}' to become empty before delete. "
179+
f"count={desc.count}, owners={list(desc.owners)}"
180+
)
181+
await asyncio.sleep(poll_interval)
172182

183+
req_id = self.next_req_id()
184+
req = DeleteSemaphore(req_id=req_id, name=self._name)
173185
await self.send(req)
174-
175186
resp = await self._wait_for_response(req_id, kind="delete")
176187
return resp
177188

@@ -206,18 +217,17 @@ async def update(self, new_data):
206217
return resp
207218

208219
async def close(self, flush: bool = True):
209-
210220
try:
211221
if self._req_id is not None:
212222
req = ReleaseSemaphore(req_id=self._req_id, name=self._name)
213-
await self.send(req)
223+
if self._stream is not None:
224+
await self.send(req)
214225
except issues.Error:
215226
pass
216227

217-
if self._reconnector:
228+
if self._reconnector is not None:
218229
await self._reconnector.stop(flush)
219230

220231
self._stream = None
221232
self._req_id = None
222233
self._node_path = None
223-

ydb/aio/coordination/reconnector.py

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,6 @@
66

77

88
class CoordinationReconnector:
9-
"""
10-
Простейшая затычка реконнектора:
11-
- не делает автопереподключение
12-
- сразу бросает ошибку, если stream не стартан
13-
- интерфейс start/stop/get_stream/wait_ready полностью рабочий
14-
- не зависает ни на каких таймаутах
15-
"""
16-
179
def __init__(
1810
self,
1911
driver,
@@ -29,18 +21,23 @@ def __init__(
2921
self._task: Optional[asyncio.Task] = None
3022
self._stream: Optional[CoordinationStream] = None
3123

32-
self._ready = asyncio.Event()
24+
self._ready: Optional[asyncio.Event] = None
3325
self._stopped = False
3426
self._first_error: Optional[Exception] = None
3527

3628
def start(self):
37-
"""Запуск реконнектора (фактически старт одного stream)"""
3829
if self._stopped:
3930
return
31+
32+
if self._ready is None:
33+
self._ready = asyncio.Event()
34+
35+
self._first_error = None
36+
4037
if self._task is None or self._task.done():
4138
self._task = asyncio.create_task(self._connection_loop())
4239

43-
async def stop(self, flush: bool = True):
40+
async def stop(self, flush: bool):
4441
self._stopped = True
4542

4643
if self._task:
@@ -50,29 +47,23 @@ async def stop(self, flush: bool = True):
5047
self._task = None
5148

5249
if self._stream:
53-
if flush:
54-
try:
55-
await self._stream.close()
56-
except Exception:
57-
pass
58-
else:
59-
self._stream._closed = True
50+
with contextlib.suppress(Exception):
51+
await self._stream.close()
6052
self._stream = None
6153

62-
# 3. Сбрасываем состояние
63-
self._ready.clear()
64-
self._first_error = None
54+
if self._ready:
55+
self._ready.clear()
6556

6657
async def wait_ready(self):
67-
"""Ждём, пока stream будет готов, или сразу бросаем ошибку"""
6858
if self._first_error:
6959
raise self._first_error
60+
if not self._ready:
61+
raise RuntimeError("Reconnector not started")
7062
await self._ready.wait()
7163
if self._first_error:
7264
raise self._first_error
7365

7466
def get_stream(self) -> CoordinationStream:
75-
"""Получить готовый stream"""
7667
if self._stream is None or self._stream.session_id is None:
7768
raise RuntimeError("Coordination stream is not ready")
7869
return self._stream
@@ -84,19 +75,27 @@ async def _connection_loop(self):
8475
try:
8576
self._stream = CoordinationStream(self._driver)
8677
await self._stream.start_session(self._node_path, self._timeout_millis)
87-
self._ready.set()
78+
if self._ready:
79+
self._ready.set()
8880

8981
if self._stream._background_tasks:
90-
await asyncio.wait(
82+
done, pending = await asyncio.wait(
9183
self._stream._background_tasks,
9284
return_when=asyncio.FIRST_EXCEPTION,
9385
)
9486

87+
for d in done:
88+
if d.cancelled():
89+
continue
90+
exc = d.exception()
91+
if exc:
92+
raise exc
93+
9594
except Exception as exc:
9695
self._first_error = exc
97-
self._ready.clear()
96+
if self._ready:
97+
self._ready.clear()
9898
if self._stream:
9999
with contextlib.suppress(Exception):
100100
await self._stream.close()
101101
self._stream = None
102-

ydb/aio/coordination/stream.py

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
import asyncio
22
import contextlib
3+
import logging
34
from typing import Optional, Set
45

56
import ydb
67
from ydb import issues, _apis
78
from ydb._grpc.grpcwrapper.common_utils import IToProto, GrpcWrapperAsyncIO
89
from ydb._grpc.grpcwrapper.ydb_coordination import FromServer, Ping, SessionStart
910

11+
logger = logging.getLogger(__name__)
12+
1013

1114
class CoordinationStream:
1215
def __init__(self, driver: "ydb.aio.Driver"):
@@ -32,6 +35,7 @@ async def start_session(self, path: str, timeout_millis: int):
3235
self._stream.write(start_msg)
3336

3437
try:
38+
# Wait for SessionStart response
3539
async for resp in self._stream.from_server_grpc:
3640
fs = FromServer.from_proto(resp)
3741
if fs.session_started:
@@ -44,7 +48,9 @@ async def start_session(self, path: str, timeout_millis: int):
4448
except Exception as e:
4549
raise issues.Error(f"Failed to start session: {e}")
4650

47-
self._background_tasks.add(asyncio.create_task(self._reader_loop()))
51+
task = asyncio.get_running_loop().create_task(self._reader_loop())
52+
self._background_tasks.add(task)
53+
logger.debug("CoordinationStream: started reader task %r", task)
4854

4955
async def _reader_loop(self):
5056
try:
@@ -54,13 +60,18 @@ async def _reader_loop(self):
5460

5561
fs = FromServer.from_proto(resp)
5662
if fs.opaque:
57-
self._stream.write(Ping(fs.opaque))
63+
try:
64+
self._stream.write(Ping(fs.opaque))
65+
except Exception:
66+
self._set_first_error(RuntimeError("Failed to write Ping"))
5867
else:
5968
await self._incoming_queue.put(resp)
6069
self._state_changed.set()
6170
except asyncio.CancelledError:
71+
logger.debug("CoordinationStream: reader loop cancelled")
6272
pass
6373
except Exception as exc:
74+
logger.exception("CoordinationStream: reader loop error")
6475
self._set_first_error(exc)
6576

6677
async def send(self, req: IToProto):
@@ -89,26 +100,25 @@ async def close(self):
89100
return
90101
self._closed = True
91102

92-
tasks = list(self._background_tasks)
93-
for task in tasks:
103+
logger.debug("CoordinationStream: closing, cancelling %d background tasks", len(self._background_tasks))
104+
for task in list(self._background_tasks):
94105
task.cancel()
95106

96107
with contextlib.suppress(asyncio.CancelledError):
97-
await asyncio.gather(*tasks, return_exceptions=True)
108+
await asyncio.gather(*self._background_tasks, return_exceptions=True)
98109

99110
self._background_tasks.clear()
100111

101112
if self._stream:
102-
close_coro = getattr(self._stream, "close", None)
103-
if close_coro and asyncio.iscoroutinefunction(close_coro):
104-
await self._stream.close()
105-
else:
113+
try:
106114
self._stream.close()
115+
except Exception:
116+
logger.exception("CoordinationStream: error closing underlying stream")
117+
self._stream = None
107118

108119
self.session_id = None
109-
self._started = False
110120
self._state_changed.set()
111-
self._incoming_queue = asyncio.Queue()
121+
logger.debug("CoordinationStream: closed")
112122

113123
def _set_first_error(self, exc: Exception):
114124
if not self._first_error.done():
@@ -118,6 +128,7 @@ def _set_first_error(self, exc: Exception):
118128
def _get_first_error(self):
119129
if self._first_error.done():
120130
return self._first_error.result()
131+
return None
121132

122133
def _check_error(self):
123134
err = self._get_first_error()

0 commit comments

Comments
 (0)