Skip to content

Commit a81db7e

Browse files
committed
add describe -> start making crud lock object
1 parent 194942f commit a81db7e

File tree

3 files changed

+93
-6
lines changed

3 files changed

+93
-6
lines changed

tests/coordination/test_coordination_client.py

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import pytest
44

55
import ydb
6-
from ydb import aio
6+
from ydb import aio, StatusCode
77

88
from ydb.coordination import (
99
NodeConfig,
@@ -96,10 +96,10 @@ async def test_coordination_node_lifecycle_async(self, aio_connection):
9696
with pytest.raises(ydb.SchemeError):
9797
await client.describe_node(node_path)
9898

99-
async def test_coordination_lock_context_exclusive(self, aio_connection):
99+
async def test_coordination_lock_full_lifecycle(self, aio_connection):
100100
client = aio.CoordinationClient(aio_connection)
101101

102-
node_path = "/local/test_lock_context_exclusive"
102+
node_path = "/local/test_lock_full_lifecycle"
103103

104104
try:
105105
await client.delete_node(node_path)
@@ -117,8 +117,34 @@ async def test_coordination_lock_context_exclusive(self, aio_connection):
117117
),
118118
)
119119

120+
lock2_started = asyncio.Event()
121+
lock2_acquired = asyncio.Event()
122+
123+
async def second_lock_task():
124+
lock2_started.set()
125+
async with client.lock("test_lock", node_path):
126+
lock2_acquired.set()
127+
await asyncio.sleep(0.5)
128+
120129
async with client.lock("test_lock", node_path) as lock1:
121-
print("Lock1 acquired")
122-
await asyncio.sleep(10)
123-
print("Lock1 still holding after 10 seconds")
130+
131+
assert lock1._stream is not None
132+
assert lock1._stream.session_id is not None
133+
resp = await lock1.describe()
134+
assert resp.status == StatusCode.SUCCESS
135+
136+
t2 = asyncio.create_task(second_lock_task())
137+
await lock2_started.wait()
138+
139+
140+
await asyncio.sleep(0.5)
141+
142+
assert lock1._stream is not None
143+
144+
await asyncio.wait_for(lock2_acquired.wait(), timeout=5)
145+
await asyncio.wait_for(t2, timeout=5)
146+
147+
async with client.lock("test_lock", node_path) as lock3:
148+
assert lock3._stream is not None
149+
assert lock3._stream.session_id is not None
124150

ydb/_grpc/grpcwrapper/ydb_coordination.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,27 @@ def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
161161
req_id=self.req_id, name=self.name
162162
))
163163

164+
@dataclass
165+
class DescribeSemaphore(IToProto):
166+
include_owners: bool
167+
include_waiters: bool
168+
name: str
169+
req_id: int
170+
watch_data: bool
171+
watch_owners: bool
172+
173+
def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
174+
return ydb_coordination_pb2.SessionRequest(
175+
describe_semaphore=ydb_coordination_pb2.SessionRequest.DescribeSemaphore(
176+
include_owners=self.include_owners,
177+
include_waiters=self.include_waiters,
178+
name=self.name,
179+
req_id=self.req_id,
180+
watch_data=self.watch_data,
181+
watch_owners=self.watch_owners
182+
)
183+
)
184+
164185

165186
@dataclass
166187
class FromServer:

ydb/aio/coordination/lock.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
from ydb._grpc.grpcwrapper.ydb_coordination import (
66
AcquireSemaphore,
77
ReleaseSemaphore,
8+
UpdateSemaphore,
9+
DescribeSemaphore,
810
FromServer,
911
)
1012
from ydb.aio.coordination.stream import CoordinationStream
@@ -79,6 +81,21 @@ async def _wait_for_acquire_response(self):
7981
f"Timeout waiting for lock {self._name} acquisition"
8082
)
8183

84+
async def _wait_for_describe_response(self, req_id: int):
85+
try:
86+
while True:
87+
resp = await asyncio.wait_for(
88+
self._stream._incoming_queue.get(),
89+
timeout=self._wait_timeout,
90+
)
91+
describe_resp = FromServer.from_proto(resp).describe_semaphore_result
92+
if describe_resp and describe_resp.req_id == req_id:
93+
return describe_resp
94+
except asyncio.TimeoutError:
95+
raise issues.Error(
96+
f"Timeout waiting for lock {self._name} describe"
97+
)
98+
8299
async def __aenter__(self):
83100
await self._ensure_session()
84101

@@ -117,3 +134,26 @@ async def acquire(self):
117134

118135
async def release(self):
119136
await self.__aexit__(None, None, None)
137+
138+
139+
async def describe(self):
140+
await self._ensure_session()
141+
142+
req_id = self.next_req_id()
143+
144+
req = DescribeSemaphore(
145+
req_id=req_id,
146+
name=self._name,
147+
include_owners=True,
148+
include_waiters=True,
149+
watch_data=False,
150+
watch_owners=False,
151+
).to_proto()
152+
153+
await self.send(req)
154+
155+
resp = await self._wait_for_describe_response(req_id)
156+
return resp
157+
158+
async def update(self, *, limit: Optional[int] = None, settings=None):
159+
pass

0 commit comments

Comments
 (0)