Skip to content

Commit e7a4357

Browse files
committed
rewrite stream with wrapper plus sync client
1 parent 45fc213 commit e7a4357

File tree

8 files changed

+355
-212
lines changed

8 files changed

+355
-212
lines changed

tests/coordination/test_coordination_client.py

Lines changed: 97 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import asyncio
2+
import threading
3+
import time
24

35
import pytest
46

57
import ydb
6-
from ydb import aio, StatusCode
8+
from ydb import aio, StatusCode, logger
79

810
from ydb.coordination import (
911
NodeConfig,
@@ -155,8 +157,6 @@ async def second_lock_task():
155157
await asyncio.sleep(0.5)
156158

157159
async with client.lock("test_lock", node_path) as lock1:
158-
assert lock1._stream is not None
159-
assert lock1._stream.session_id is not None
160160

161161
resp: DescribeLockResult = await lock1.describe()
162162
assert resp.status == StatusCode.SUCCESS
@@ -172,14 +172,10 @@ async def second_lock_task():
172172

173173
await asyncio.sleep(0.5)
174174

175-
assert lock1._stream is not None
176-
177175
await asyncio.wait_for(lock2_acquired.wait(), timeout=5)
178176
await asyncio.wait_for(t2, timeout=5)
179177

180178
async with client.lock("test_lock", node_path) as lock3:
181-
assert lock3._stream is not None
182-
assert lock3._stream.session_id is not None
183179

184180
resp3: DescribeLockResult = await lock3.describe()
185181
assert resp3.status == StatusCode.SUCCESS
@@ -190,3 +186,97 @@ async def second_lock_task():
190186

191187
describe_after_delete: DescribeLockResult = await lock.describe()
192188
assert describe_after_delete.status == StatusCode.NOT_FOUND
189+
190+
def test_coordination_lock_full_lifecycle_sync(self, driver_sync):
191+
client = CoordinationClient(driver_sync)
192+
node_path = "/local/test_lock_full_lifecycle"
193+
194+
# --- cleanup ---
195+
try:
196+
client.delete_node(node_path)
197+
except ydb.SchemeError:
198+
pass
199+
200+
client.create_node(
201+
node_path,
202+
NodeConfig(
203+
session_grace_period_millis=1000,
204+
attach_consistency_mode=ConsistencyMode.STRICT,
205+
read_consistency_mode=ConsistencyMode.STRICT,
206+
rate_limiter_counters_mode=RateLimiterCountersMode.UNSET,
207+
self_check_period_millis=0,
208+
),
209+
)
210+
211+
lock = client.lock("test_lock", node_path)
212+
213+
# --- create/update/describe ---
214+
create_resp: CreateSemaphoreResult = lock.create(init_limit=1, init_data=b"init-data")
215+
assert create_resp.status == StatusCode.SUCCESS
216+
217+
describe_resp: DescribeLockResult = lock.describe()
218+
assert describe_resp.status == StatusCode.SUCCESS
219+
assert describe_resp.data == b"init-data"
220+
221+
update_resp = lock.update(new_data=b"updated-data")
222+
assert update_resp.status == StatusCode.SUCCESS
223+
assert lock.describe().data == b"updated-data"
224+
225+
# --- threading coordination ---
226+
lock2_ready = threading.Event()
227+
lock2_acquired = threading.Event()
228+
thread_exc = {"err": None}
229+
230+
def second_lock_task():
231+
try:
232+
lock2_ready.set() # сигнал, что поток готов
233+
with client.lock("test_lock", node_path):
234+
lock2_acquired.set() # сигнал, что захватил lock
235+
logger.info("Second thread acquired lock")
236+
except Exception as e:
237+
logger.exception("second_lock_task failed")
238+
thread_exc["err"] = e
239+
240+
t2 = threading.Thread(target=second_lock_task)
241+
242+
# --- main thread acquires first lock ---
243+
with client.lock("test_lock", node_path) as lock1:
244+
resp = lock1.describe()
245+
assert resp.status == StatusCode.SUCCESS
246+
assert resp.count == 1
247+
248+
# запускаем второй поток
249+
t2.start()
250+
started = lock2_ready.wait(timeout=2.0)
251+
assert started, "Second thread did not signal readiness to acquire lock"
252+
253+
# --- main thread released lock, второй поток должен захватить ---
254+
acquired = lock2_acquired.wait(timeout=10.0) # увеличенный таймаут
255+
t2.join(timeout=5.0)
256+
257+
if not acquired:
258+
if thread_exc["err"]:
259+
raise AssertionError(f"Second thread raised exception: {thread_exc['err']!r}") from thread_exc["err"]
260+
else:
261+
raise AssertionError(
262+
"Second thread did not acquire the lock in time. Check logs for details."
263+
)
264+
265+
assert not t2.is_alive(), "Second thread did not finish after acquiring lock"
266+
267+
# --- проверяем, что lock можно снова взять в главном потоке ---
268+
with client.lock("test_lock", node_path) as lock3:
269+
resp3: DescribeLockResult = lock3.describe()
270+
assert resp3.status == StatusCode.SUCCESS
271+
assert resp3.count == 1
272+
273+
# --- cleanup ---
274+
delete_resp = lock.delete()
275+
assert delete_resp.status == StatusCode.SUCCESS
276+
# небольшая пауза для удаления на сервере
277+
time.sleep(0.1)
278+
describe_after_delete: DescribeLockResult = lock.describe()
279+
assert describe_after_delete.status == StatusCode.NOT_FOUND
280+
281+
282+
Lines changed: 51 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,21 @@
11
import typing
22
from dataclasses import dataclass
33

4-
54
if typing.TYPE_CHECKING:
65
from ..v4.protos import ydb_coordination_pb2
76
else:
87
from ..common.protos import ydb_coordination_pb2
98

109
from .common_utils import IToProto
1110

11+
# ---------- CRUD для узлов ----------
1212

1313
@dataclass
1414
class CreateNodeRequest(IToProto):
1515
path: str
1616
config: typing.Any
1717

18-
def to_proto(self) -> ydb_coordination_pb2.CreateNodeRequest:
18+
def to_proto(self) -> "ydb_coordination_pb2.CreateNodeRequest":
1919
cfg_proto = self.config.to_proto() if self.config else None
2020
return ydb_coordination_pb2.CreateNodeRequest(
2121
path=self.path,
@@ -28,7 +28,7 @@ class AlterNodeRequest(IToProto):
2828
path: str
2929
config: typing.Any
3030

31-
def to_proto(self) -> ydb_coordination_pb2.AlterNodeRequest:
31+
def to_proto(self) -> "ydb_coordination_pb2.AlterNodeRequest":
3232
cfg_proto = self.config.to_proto() if self.config else None
3333
return ydb_coordination_pb2.AlterNodeRequest(
3434
path=self.path,
@@ -40,7 +40,7 @@ def to_proto(self) -> ydb_coordination_pb2.AlterNodeRequest:
4040
class DescribeNodeRequest(IToProto):
4141
path: str
4242

43-
def to_proto(self) -> ydb_coordination_pb2.DescribeNodeRequest:
43+
def to_proto(self) -> "ydb_coordination_pb2.DescribeNodeRequest":
4444
return ydb_coordination_pb2.DescribeNodeRequest(
4545
path=self.path,
4646
)
@@ -50,11 +50,12 @@ def to_proto(self) -> ydb_coordination_pb2.DescribeNodeRequest:
5050
class DropNodeRequest(IToProto):
5151
path: str
5252

53-
def to_proto(self) -> ydb_coordination_pb2.DropNodeRequest:
53+
def to_proto(self) -> "ydb_coordination_pb2.DropNodeRequest":
5454
return ydb_coordination_pb2.DropNodeRequest(
5555
path=self.path,
5656
)
5757

58+
# ---------- Сессии и семафоры ----------
5859

5960
@dataclass
6061
class SessionStart(IToProto):
@@ -65,7 +66,7 @@ class SessionStart(IToProto):
6566
seq_no: int = 0
6667
protection_key: bytes = b""
6768

68-
def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
69+
def to_proto(self) -> "ydb_coordination_pb2.SessionRequest":
6970
return ydb_coordination_pb2.SessionRequest(
7071
session_start=ydb_coordination_pb2.SessionRequest.SessionStart(
7172
path=self.path,
@@ -80,15 +81,17 @@ def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
8081

8182
@dataclass
8283
class SessionStop(IToProto):
83-
def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
84-
return ydb_coordination_pb2.SessionRequest(session_stop=ydb_coordination_pb2.SessionRequest.SessionStop())
84+
def to_proto(self) -> "ydb_coordination_pb2.SessionRequest":
85+
return ydb_coordination_pb2.SessionRequest(
86+
session_stop=ydb_coordination_pb2.SessionRequest.SessionStop()
87+
)
8588

8689

8790
@dataclass
8891
class Ping(IToProto):
8992
opaque: int = 0
9093

91-
def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
94+
def to_proto(self) -> "ydb_coordination_pb2.SessionRequest":
9295
return ydb_coordination_pb2.SessionRequest(
9396
ping=ydb_coordination_pb2.SessionRequest.PingPong(opaque=self.opaque)
9497
)
@@ -101,42 +104,14 @@ class CreateSemaphore(IToProto):
101104
limit: int
102105
data: bytes = b""
103106

104-
def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
107+
def to_proto(self) -> "ydb_coordination_pb2.SessionRequest":
105108
return ydb_coordination_pb2.SessionRequest(
106109
create_semaphore=ydb_coordination_pb2.SessionRequest.CreateSemaphore(
107110
req_id=self.req_id, name=self.name, limit=self.limit, data=self.data
108111
)
109112
)
110113

111114

112-
@dataclass
113-
class UpdateSemaphore(IToProto):
114-
name: str
115-
req_id: int
116-
data: bytes
117-
118-
def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
119-
return ydb_coordination_pb2.SessionRequest(
120-
update_semaphore=ydb_coordination_pb2.SessionRequest.UpdateSemaphore(
121-
req_id=self.req_id, name=self.name, data=self.data
122-
)
123-
)
124-
125-
126-
@dataclass
127-
class DeleteSemaphore(IToProto):
128-
name: str
129-
req_id: int
130-
force: bool = False
131-
132-
def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
133-
return ydb_coordination_pb2.SessionRequest(
134-
delete_semaphore=ydb_coordination_pb2.SessionRequest.DeleteSemaphore(
135-
req_id=self.req_id, name=self.name, force=self.force
136-
)
137-
)
138-
139-
140115
@dataclass
141116
class AcquireSemaphore(IToProto):
142117
name: str
@@ -146,7 +121,7 @@ class AcquireSemaphore(IToProto):
146121
data: bytes = b""
147122
ephemeral: bool = False
148123

149-
def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
124+
def to_proto(self) -> "ydb_coordination_pb2.SessionRequest":
150125
return ydb_coordination_pb2.SessionRequest(
151126
acquire_semaphore=ydb_coordination_pb2.SessionRequest.AcquireSemaphore(
152127
req_id=self.req_id,
@@ -164,9 +139,11 @@ class ReleaseSemaphore(IToProto):
164139
name: str
165140
req_id: int
166141

167-
def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
142+
def to_proto(self) -> "ydb_coordination_pb2.SessionRequest":
168143
return ydb_coordination_pb2.SessionRequest(
169-
release_semaphore=ydb_coordination_pb2.SessionRequest.ReleaseSemaphore(req_id=self.req_id, name=self.name)
144+
release_semaphore=ydb_coordination_pb2.SessionRequest.ReleaseSemaphore(
145+
req_id=self.req_id, name=self.name
146+
)
170147
)
171148

172149

@@ -179,7 +156,7 @@ class DescribeSemaphore(IToProto):
179156
watch_data: bool
180157
watch_owners: bool
181158

182-
def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
159+
def to_proto(self) -> "ydb_coordination_pb2.SessionRequest":
183160
return ydb_coordination_pb2.SessionRequest(
184161
describe_semaphore=ydb_coordination_pb2.SessionRequest.DescribeSemaphore(
185162
include_owners=self.include_owners,
@@ -191,20 +168,47 @@ def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
191168
)
192169
)
193170

171+
@dataclass
172+
class UpdateSemaphore(IToProto):
173+
name: str
174+
req_id: int
175+
data: bytes
176+
177+
def to_proto(self) -> "ydb_coordination_pb2.SessionRequest":
178+
return ydb_coordination_pb2.SessionRequest(
179+
update_semaphore=ydb_coordination_pb2.SessionRequest.UpdateSemaphore(
180+
req_id=self.req_id, name=self.name, data=self.data
181+
)
182+
)
183+
184+
185+
@dataclass
186+
class DeleteSemaphore(IToProto):
187+
name: str
188+
req_id: int
189+
force: bool = False
190+
191+
def to_proto(self) -> "ydb_coordination_pb2.SessionRequest":
192+
return ydb_coordination_pb2.SessionRequest(
193+
delete_semaphore=ydb_coordination_pb2.SessionRequest.DeleteSemaphore(
194+
req_id=self.req_id, name=self.name, force=self.force
195+
)
196+
)
197+
194198

195199
@dataclass
196200
class FromServer:
197-
raw: ydb_coordination_pb2.SessionResponse
201+
raw: "ydb_coordination_pb2.SessionResponse"
198202

199203
@staticmethod
200-
def from_proto(resp: ydb_coordination_pb2.SessionResponse) -> "FromServer":
204+
def from_proto(resp: "ydb_coordination_pb2.SessionResponse") -> "FromServer":
201205
return FromServer(raw=resp)
202206

203207
def __getattr__(self, name: str):
204208
return getattr(self.raw, name)
205209

206210
@property
207-
def session_started(self) -> typing.Optional[ydb_coordination_pb2.SessionResponse.SessionStarted]:
211+
def session_started(self) -> typing.Optional["ydb_coordination_pb2.SessionResponse.SessionStarted"]:
208212
s = self.raw.session_started
209213
return s if s.session_id else None
210214

@@ -215,23 +219,9 @@ def opaque(self) -> typing.Optional[int]:
215219
return None
216220

217221
@property
218-
def acquire_semaphore_result(self) -> typing.Optional[ydb_coordination_pb2.SessionResponse.AcquireSemaphoreResult]:
222+
def acquire_semaphore_result(self):
219223
return self.raw.acquire_semaphore_result if self.raw.HasField("acquire_semaphore_result") else None
220224

221225
@property
222-
def create_semaphore_result(self) -> typing.Optional[ydb_coordination_pb2.SessionResponse.CreateSemaphoreResult]:
226+
def create_semaphore_result(self):
223227
return self.raw.create_semaphore_result if self.raw.HasField("create_semaphore_result") else None
224-
225-
@property
226-
def delete_semaphore_result(self) -> typing.Optional[ydb_coordination_pb2.SessionResponse.DeleteSemaphoreResult]:
227-
return self.raw.delete_semaphore_result if self.raw.HasField("delete_semaphore_result") else None
228-
229-
@property
230-
def update_semaphore_result(self) -> typing.Optional[ydb_coordination_pb2.SessionResponse.UpdateSemaphoreResult]:
231-
return self.raw.update_semaphore_result if self.raw.HasField("update_semaphore_result") else None
232-
233-
@property
234-
def describe_semaphore_result(
235-
self,
236-
) -> typing.Optional[ydb_coordination_pb2.SessionResponse.DescribeSemaphoreResult]:
237-
return self.raw.describe_semaphore_result if self.raw.HasField("describe_semaphore_result") else None

0 commit comments

Comments
 (0)