Skip to content

Commit 53d547b

Browse files
authored
Merge branch 'coordination-service-impementation' into coordination-lock
2 parents 478a375 + 8defd59 commit 53d547b

File tree

10 files changed

+830
-4
lines changed

10 files changed

+830
-4
lines changed

tests/coordination/test_coordination_client.py

Lines changed: 106 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
import asyncio
2+
13
import pytest
24

35
import ydb
4-
from ydb import aio
6+
from ydb import aio, StatusCode
57

68
from ydb.coordination import (
79
NodeConfig,
@@ -93,3 +95,106 @@ async def test_coordination_node_lifecycle_async(self, aio_connection):
9395

9496
with pytest.raises(ydb.SchemeError):
9597
await client.describe_node(node_path)
98+
99+
async def test_coordination_lock_full_lifecycle(self, aio_connection):
100+
client = aio.CoordinationClient(aio_connection)
101+
102+
node_path = "/local/test_lock_full_lifecycle"
103+
104+
try:
105+
await client.delete_node(node_path)
106+
except ydb.SchemeError:
107+
pass
108+
109+
await client.create_node(
110+
node_path,
111+
NodeConfig(
112+
session_grace_period_millis=1000,
113+
attach_consistency_mode=ConsistencyMode.STRICT,
114+
read_consistency_mode=ConsistencyMode.STRICT,
115+
rate_limiter_counters_mode=RateLimiterCountersMode.UNSET,
116+
self_check_period_millis=0,
117+
),
118+
)
119+
120+
lock = client.lock("test_lock", node_path)
121+
122+
create_resp = await lock.create(init_limit=1, init_data=b"init-data")
123+
assert create_resp.status == StatusCode.SUCCESS
124+
125+
describe_resp = await lock.describe()
126+
assert describe_resp.status == StatusCode.SUCCESS
127+
128+
sem = describe_resp.semaphore_description
129+
assert sem.name == "test_lock"
130+
assert sem.data == b"init-data"
131+
assert sem.count == 0
132+
assert sem.ephemeral is False
133+
assert list(sem.owners) == []
134+
assert list(sem.waiters) == []
135+
136+
update_resp = await lock.update(new_data=b"updated-data")
137+
assert update_resp.status == StatusCode.SUCCESS
138+
139+
describe_resp2 = await lock.describe()
140+
assert describe_resp2.status == StatusCode.SUCCESS
141+
142+
sem2 = describe_resp2.semaphore_description
143+
assert sem2.name == "test_lock"
144+
assert sem2.data == b"updated-data"
145+
assert sem2.count == 0
146+
assert sem2.ephemeral is False
147+
assert list(sem2.owners) == []
148+
assert list(sem2.waiters) == []
149+
150+
151+
lock2_started = asyncio.Event()
152+
lock2_acquired = asyncio.Event()
153+
154+
async def second_lock_task():
155+
lock2_started.set()
156+
async with client.lock("test_lock", node_path):
157+
lock2_acquired.set()
158+
await asyncio.sleep(0.5)
159+
160+
async with client.lock("test_lock", node_path) as lock1:
161+
assert lock1._stream is not None
162+
assert lock1._stream.session_id is not None
163+
164+
resp = await lock1.describe()
165+
assert resp.status == StatusCode.SUCCESS
166+
167+
sem_under_lock = resp.semaphore_description
168+
assert sem_under_lock.name == "test_lock"
169+
assert sem_under_lock.data == b"updated-data"
170+
assert sem_under_lock.count == 1
171+
assert sem_under_lock.ephemeral is False
172+
assert len(list(sem_under_lock.owners)) == 1
173+
assert list(sem_under_lock.waiters) == []
174+
175+
t2 = asyncio.create_task(second_lock_task())
176+
await lock2_started.wait()
177+
178+
await asyncio.sleep(0.5)
179+
180+
assert lock1._stream is not None
181+
182+
await asyncio.wait_for(lock2_acquired.wait(), timeout=5)
183+
await asyncio.wait_for(t2, timeout=5)
184+
185+
async with client.lock("test_lock", node_path) as lock3:
186+
assert lock3._stream is not None
187+
assert lock3._stream.session_id is not None
188+
189+
resp3 = await lock3.describe()
190+
assert resp3.status == StatusCode.SUCCESS
191+
sem3 = resp3.semaphore_description
192+
assert sem3.count == 1
193+
194+
195+
delete_resp = await lock.delete()
196+
assert delete_resp.status == StatusCode.SUCCESS
197+
198+
describe_after_delete = await lock.describe()
199+
assert describe_after_delete.status == StatusCode.NOT_FOUND
200+

ydb/_apis.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,9 @@ class QueryService(object):
143143

144144
class CoordinationService(object):
145145
Stub = ydb_coordination_v1_pb2_grpc.CoordinationServiceStub
146-
147-
Session = "Session"
148146
CreateNode = "CreateNode"
149147
AlterNode = "AlterNode"
150148
DropNode = "DropNode"
151149
DescribeNode = "DescribeNode"
150+
SessionRequest = "SessionRequest"
151+
Session = "Session"

ydb/_grpc/grpcwrapper/ydb_coordination.py

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,3 +55,201 @@ def to_proto(self) -> ydb_coordination_pb2.DropNodeRequest:
5555
return ydb_coordination_pb2.DropNodeRequest(
5656
path=self.path,
5757
)
58+
59+
60+
@dataclass
61+
class SessionStart(IToProto):
62+
path: str
63+
timeout_millis: int
64+
description: str = ""
65+
session_id: int = 0
66+
seq_no: int = 0
67+
protection_key: bytes = b""
68+
69+
def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
70+
return ydb_coordination_pb2.SessionRequest(session_start=ydb_coordination_pb2.SessionRequest.SessionStart(
71+
path=self.path,
72+
session_id=self.session_id,
73+
timeout_millis=self.timeout_millis,
74+
description=self.description,
75+
seq_no=self.seq_no,
76+
protection_key=self.protection_key,
77+
))
78+
79+
80+
@dataclass
81+
class SessionStop(IToProto):
82+
def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
83+
return ydb_coordination_pb2.SessionRequest(session_stop=ydb_coordination_pb2.SessionRequest.SessionStop())
84+
85+
86+
@dataclass
87+
class Ping(IToProto):
88+
opaque: int = 0
89+
90+
def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
91+
return ydb_coordination_pb2.SessionRequest(
92+
ping=ydb_coordination_pb2.SessionRequest.PingPong(opaque=self.opaque))
93+
94+
95+
@dataclass
96+
class CreateSemaphore(IToProto):
97+
name: str
98+
req_id: int
99+
limit: int
100+
data: bytes = b""
101+
102+
def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
103+
return ydb_coordination_pb2.SessionRequest(create_semaphore=ydb_coordination_pb2.SessionRequest.CreateSemaphore(
104+
req_id=self.req_id, name=self.name, limit=self.limit, data=self.data
105+
))
106+
107+
108+
@dataclass
109+
class UpdateSemaphore(IToProto):
110+
name: str
111+
req_id: int
112+
data: bytes
113+
114+
def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
115+
return ydb_coordination_pb2.SessionRequest(update_semaphore=ydb_coordination_pb2.SessionRequest.UpdateSemaphore(
116+
req_id=self.req_id, name=self.name, data=self.data
117+
))
118+
119+
120+
@dataclass
121+
class DeleteSemaphore(IToProto):
122+
name: str
123+
req_id: int
124+
force: bool = False
125+
126+
def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
127+
return ydb_coordination_pb2.SessionRequest(delete_semaphore=ydb_coordination_pb2.SessionRequest.DeleteSemaphore(
128+
req_id=self.req_id, name=self.name, force=self.force
129+
))
130+
131+
132+
@dataclass
133+
class AcquireSemaphore(IToProto):
134+
name: str
135+
req_id: int
136+
count: int = 1
137+
timeout_millis: int = 0
138+
data: bytes = b""
139+
ephemeral: bool = False
140+
141+
def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
142+
return ydb_coordination_pb2.SessionRequest(
143+
acquire_semaphore=ydb_coordination_pb2.SessionRequest.AcquireSemaphore(
144+
req_id=self.req_id,
145+
name=self.name,
146+
timeout_millis=self.timeout_millis,
147+
count=self.count,
148+
data=self.data,
149+
ephemeral=self.ephemeral,
150+
))
151+
152+
153+
@dataclass
154+
class ReleaseSemaphore(IToProto):
155+
name: str
156+
req_id: int
157+
158+
def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
159+
return ydb_coordination_pb2.SessionRequest(
160+
release_semaphore=ydb_coordination_pb2.SessionRequest.ReleaseSemaphore(
161+
req_id=self.req_id, name=self.name
162+
))
163+
164+
@dataclass
165+
class DescribeSemaphore(IToProto):
166+
include_owners: bool
167+
include_waiters: bool
168+
name: str
169+
req_id: int
170+
watch_data: bool
171+
watch_owners: bool
172+
173+
def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
174+
return ydb_coordination_pb2.SessionRequest(
175+
describe_semaphore=ydb_coordination_pb2.SessionRequest.DescribeSemaphore(
176+
include_owners=self.include_owners,
177+
include_waiters=self.include_waiters,
178+
name=self.name,
179+
req_id=self.req_id,
180+
watch_data=self.watch_data,
181+
watch_owners=self.watch_owners
182+
)
183+
)
184+
185+
186+
@dataclass
187+
class FromServer:
188+
raw: ydb_coordination_pb2.SessionResponse
189+
190+
@staticmethod
191+
def from_proto(resp: ydb_coordination_pb2.SessionResponse) -> "FromServer":
192+
return FromServer(raw=resp)
193+
194+
def __getattr__(self, name: str):
195+
return getattr(self.raw, name)
196+
197+
@property
198+
def status(self) -> typing.Optional[int]:
199+
return getattr(self.raw, "status", None)
200+
201+
@property
202+
def session_started(self) -> typing.Optional[ydb_coordination_pb2.SessionResponse.SessionStarted]:
203+
x = getattr(self.raw, "session_started", None)
204+
return x if getattr(x, "session_id", 0) else None
205+
206+
@property
207+
def session_stopped(self) -> typing.Optional[ydb_coordination_pb2.SessionResponse.SessionStopped]:
208+
return getattr(self.raw, "session_stopped", None) or None
209+
210+
@property
211+
def failure(self) -> typing.Optional[ydb_coordination_pb2.SessionResponse.Failure]:
212+
return getattr(self.raw, "failure", None) or None
213+
214+
@property
215+
def pong(self) -> typing.Optional[ydb_coordination_pb2.SessionResponse.PingPong]:
216+
return getattr(self.raw, "pong", None) or None
217+
218+
@property
219+
def ping(self) -> typing.Optional[ydb_coordination_pb2.SessionResponse.PingPong]:
220+
return getattr(self.raw, "ping", None) or None
221+
222+
@property
223+
def opaque(self) -> typing.Optional[int]:
224+
if getattr(self.raw, "ping") is not None:
225+
return getattr(getattr(self.raw, "ping"), "opaque", None)
226+
227+
@property
228+
def acquire_semaphore_result(
229+
self,
230+
) -> typing.Optional[ydb_coordination_pb2.SessionResponse.AcquireSemaphoreResult]:
231+
return getattr(self.raw, "acquire_semaphore_result", None) or None
232+
233+
@property
234+
def create_semaphore_result(
235+
self,
236+
) -> typing.Optional[ydb_coordination_pb2.SessionResponse.CreateSemaphoreResult]:
237+
return getattr(self.raw, "create_semaphore_result", None) or None
238+
239+
@property
240+
def delete_semaphore_result(
241+
self,
242+
) -> typing.Optional[ydb_coordination_pb2.SessionResponse.DeleteSemaphoreResult]:
243+
return getattr(self.raw, "delete_semaphore_result", None) or None
244+
245+
@property
246+
def update_semaphore_result(
247+
self,
248+
) -> typing.Optional[ydb_coordination_pb2.SessionResponse.UpdateSemaphoreResult]:
249+
return getattr(self.raw, "update_semaphore_result", None) or None
250+
251+
@property
252+
def describe_semaphore_result(
253+
self,
254+
) -> typing.Optional[ydb_coordination_pb2.SessionResponse.DescribeSemaphoreResult]:
255+
return getattr(self.raw, "describe_semaphore_result", None) or None

ydb/_grpc/grpcwrapper/ydb_coordination_public_types.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,3 +55,31 @@ def from_proto(msg: ydb_coordination_pb2.DescribeNodeResponse) -> "NodeConfig":
5555
result = ydb_coordination_pb2.DescribeNodeResult()
5656
msg.operation.result.Unpack(result)
5757
return NodeConfig.from_proto(result.config)
58+
59+
60+
@dataclass
61+
class AcquireSemaphoreResult:
62+
req_id: int
63+
acquired: bool
64+
status: int
65+
66+
@staticmethod
67+
def from_proto(msg: ydb_coordination_pb2.SessionResponse.AcquireSemaphoreResult) -> "AcquireSemaphoreResult":
68+
return AcquireSemaphoreResult(
69+
req_id=msg.req_id,
70+
acquired=msg.acquired,
71+
status=msg.status,
72+
)
73+
74+
75+
@dataclass
76+
class CreateSemaphoreResult:
77+
req_id: int
78+
status: int
79+
80+
@staticmethod
81+
def from_proto(msg: ydb_coordination_pb2.SessionResponse.CreateSemaphoreResult) -> "CreateSemaphoreResult":
82+
return CreateSemaphoreResult(
83+
req_id=msg.req_id,
84+
status=msg.status,
85+
)

ydb/aio/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
from .driver import Driver # noqa
22
from .table import SessionPool, retry_operation # noqa
33
from .query import QuerySessionPool, QuerySession, QueryTxContext # noqa
4-
from .coordination_client import CoordinationClient # noqa
4+
from .coordination import CoordinationClient # noqa

ydb/aio/coordination/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
__all__ = [
2+
"CoordinationClient",
3+
]
4+
5+
from .client import CoordinationClient

0 commit comments

Comments
 (0)