Skip to content

Commit 8defd59

Browse files
committed
crud lock object + acquire and release.
1 parent a81db7e commit 8defd59

File tree

2 files changed

+140
-29
lines changed

2 files changed

+140
-29
lines changed

tests/coordination/test_coordination_client.py

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,37 @@ async def test_coordination_lock_full_lifecycle(self, aio_connection):
117117
),
118118
)
119119

120+
lock = client.lock("test_lock", node_path)
121+
122+
create_resp = await lock.create(init_limit=1, init_data=b"init-data")
123+
assert create_resp.status == StatusCode.SUCCESS
124+
125+
describe_resp = await lock.describe()
126+
assert describe_resp.status == StatusCode.SUCCESS
127+
128+
sem = describe_resp.semaphore_description
129+
assert sem.name == "test_lock"
130+
assert sem.data == b"init-data"
131+
assert sem.count == 0
132+
assert sem.ephemeral is False
133+
assert list(sem.owners) == []
134+
assert list(sem.waiters) == []
135+
136+
update_resp = await lock.update(new_data=b"updated-data")
137+
assert update_resp.status == StatusCode.SUCCESS
138+
139+
describe_resp2 = await lock.describe()
140+
assert describe_resp2.status == StatusCode.SUCCESS
141+
142+
sem2 = describe_resp2.semaphore_description
143+
assert sem2.name == "test_lock"
144+
assert sem2.data == b"updated-data"
145+
assert sem2.count == 0
146+
assert sem2.ephemeral is False
147+
assert list(sem2.owners) == []
148+
assert list(sem2.waiters) == []
149+
150+
120151
lock2_started = asyncio.Event()
121152
lock2_acquired = asyncio.Event()
122153

@@ -127,16 +158,23 @@ async def second_lock_task():
127158
await asyncio.sleep(0.5)
128159

129160
async with client.lock("test_lock", node_path) as lock1:
130-
131161
assert lock1._stream is not None
132162
assert lock1._stream.session_id is not None
163+
133164
resp = await lock1.describe()
134165
assert resp.status == StatusCode.SUCCESS
135166

167+
sem_under_lock = resp.semaphore_description
168+
assert sem_under_lock.name == "test_lock"
169+
assert sem_under_lock.data == b"updated-data"
170+
assert sem_under_lock.count == 1
171+
assert sem_under_lock.ephemeral is False
172+
assert len(list(sem_under_lock.owners)) == 1
173+
assert list(sem_under_lock.waiters) == []
174+
136175
t2 = asyncio.create_task(second_lock_task())
137176
await lock2_started.wait()
138177

139-
140178
await asyncio.sleep(0.5)
141179

142180
assert lock1._stream is not None
@@ -148,3 +186,15 @@ async def second_lock_task():
148186
assert lock3._stream is not None
149187
assert lock3._stream.session_id is not None
150188

189+
resp3 = await lock3.describe()
190+
assert resp3.status == StatusCode.SUCCESS
191+
sem3 = resp3.semaphore_description
192+
assert sem3.count == 1
193+
194+
195+
delete_resp = await lock.delete()
196+
assert delete_resp.status == StatusCode.SUCCESS
197+
198+
describe_after_delete = await lock.describe()
199+
assert describe_after_delete.status == StatusCode.NOT_FOUND
200+

ydb/aio/coordination/lock.py

Lines changed: 88 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
ReleaseSemaphore,
88
UpdateSemaphore,
99
DescribeSemaphore,
10+
CreateSemaphore,
11+
DeleteSemaphore,
1012
FromServer,
1113
)
1214
from ydb.aio.coordination.stream import CoordinationStream
@@ -44,6 +46,7 @@ def __init__(
4446

4547
self._wait_timeout: float = self._timeout_millis / 1000.0
4648

49+
4750
def next_req_id(self) -> int:
4851
r = self._next_req_id
4952
self._next_req_id += 1
@@ -66,52 +69,62 @@ async def _ensure_session(self):
6669

6770
self._stream = self._reconnector.get_stream()
6871

69-
async def _wait_for_acquire_response(self):
72+
async def _wait_for_response(self, req_id: int, *, kind: str):
7073
try:
7174
while True:
7275
resp = await asyncio.wait_for(
7376
self._stream._incoming_queue.get(),
7477
timeout=self._wait_timeout,
7578
)
76-
acquire_resp = FromServer.from_proto(resp).acquire_semaphore_result
77-
if acquire_resp and acquire_resp.req_id == self._req_id:
78-
return acquire_resp
79-
except asyncio.TimeoutError:
80-
raise issues.Error(
81-
f"Timeout waiting for lock {self._name} acquisition"
82-
)
79+
fs = FromServer.from_proto(resp)
80+
81+
if kind == "acquire":
82+
r = fs.acquire_semaphore_result
83+
elif kind == "describe":
84+
r = fs.describe_semaphore_result
85+
elif kind == "create":
86+
r = fs.create_semaphore_result
87+
elif kind == "update":
88+
r = fs.update_semaphore_result
89+
elif kind == "delete":
90+
r = fs.delete_semaphore_result
91+
else:
92+
r = None
93+
94+
if r and r.req_id == req_id:
95+
return r
8396

84-
async def _wait_for_describe_response(self, req_id: int):
85-
try:
86-
while True:
87-
resp = await asyncio.wait_for(
88-
self._stream._incoming_queue.get(),
89-
timeout=self._wait_timeout,
90-
)
91-
describe_resp = FromServer.from_proto(resp).describe_semaphore_result
92-
if describe_resp and describe_resp.req_id == req_id:
93-
return describe_resp
9497
except asyncio.TimeoutError:
98+
action = {
99+
"acquire": "acquisition",
100+
"describe": "describe",
101+
"update": "update",
102+
"delete": "delete",
103+
"create": "create"
104+
}.get(kind, "operation")
105+
95106
raise issues.Error(
96-
f"Timeout waiting for lock {self._name} describe"
107+
f"Timeout waiting for lock {self._name} {action}"
97108
)
98109

110+
99111
async def __aenter__(self):
100112
await self._ensure_session()
101113

102-
self._req_id = self.next_req_id()
114+
req_id = self.next_req_id()
115+
self._req_id = req_id
103116

104117
req = AcquireSemaphore(
105-
req_id=self._req_id,
118+
req_id=req_id,
106119
name=self._name,
107120
count=self._count,
108-
ephemeral=True,
121+
ephemeral=False,
109122
timeout_millis=self._timeout_millis,
110123
).to_proto()
111124

112125
await self.send(req)
113126

114-
resp = await self._wait_for_acquire_response()
127+
resp = await self._wait_for_response(req_id, kind="acquire")
115128
if resp.acquired:
116129
return self
117130
else:
@@ -120,21 +133,57 @@ async def __aenter__(self):
120133
async def __aexit__(self, exc_type, exc, tb):
121134
if self._req_id is not None:
122135
try:
123-
req = ReleaseSemaphore(req_id=self._req_id, name=self._name)
136+
req = ReleaseSemaphore(
137+
req_id=self._req_id,
138+
name=self._name,
139+
).to_proto()
124140
await self.send(req)
125141
except issues.Error:
126142
pass
127143

128144
await self._reconnector.stop()
129145
self._stream = None
130146
self._node_path = None
147+
self._req_id = None
131148

132149
async def acquire(self):
133150
return await self.__aenter__()
134151

135152
async def release(self):
136153
await self.__aexit__(None, None, None)
137154

155+
async def create(self, init_limit, init_data):
156+
await self._ensure_session()
157+
158+
req_id = self.next_req_id()
159+
160+
req = CreateSemaphore(
161+
req_id=req_id,
162+
name=self._name,
163+
limit=init_limit,
164+
data=init_data
165+
).to_proto()
166+
167+
await self.send(req)
168+
169+
resp = await self._wait_for_response(req_id, kind="create")
170+
return resp
171+
172+
async def delete(self):
173+
await self._ensure_session()
174+
175+
req_id = self.next_req_id()
176+
177+
req = DeleteSemaphore(
178+
req_id=req_id,
179+
name=self._name,
180+
).to_proto()
181+
182+
await self.send(req)
183+
184+
resp = await self._wait_for_response(req_id, kind="delete")
185+
return resp
186+
138187

139188
async def describe(self):
140189
await self._ensure_session()
@@ -152,8 +201,20 @@ async def describe(self):
152201

153202
await self.send(req)
154203

155-
resp = await self._wait_for_describe_response(req_id)
204+
resp = await self._wait_for_response(req_id, kind="describe")
156205
return resp
157206

158-
async def update(self, *, limit: Optional[int] = None, settings=None):
159-
pass
207+
async def update(self, new_data):
208+
await self._ensure_session()
209+
210+
req_id = self.next_req_id()
211+
req = UpdateSemaphore(
212+
req_id=req_id,
213+
name=self._name,
214+
data=new_data
215+
).to_proto()
216+
217+
await self.send(req)
218+
219+
resp = await self._wait_for_response(req_id, kind="update")
220+
return resp

0 commit comments

Comments
 (0)