Skip to content

Commit 92a115c

Browse files
committed
IGNITE-14911 Unify timeouts, add support for datetime.timedelta for expiry_policy - Fixes #44.
1 parent 05413e7 commit 92a115c

File tree

12 files changed

+151
-79
lines changed

12 files changed

+151
-79
lines changed

docs/async_examples.rst

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,20 +63,20 @@ in cache settings dictionary on creation.
6363
.. literalinclude:: ../examples/expiry_policy.py
6464
:language: python
6565
:dedent: 12
66-
:lines: 72-75
66+
:lines: 73-76
6767

6868
.. literalinclude:: ../examples/expiry_policy.py
6969
:language: python
7070
:dedent: 12
71-
:lines: 81-89
71+
:lines: 82-90
7272

7373
Secondly, expiry policy can be set for all cache operations, which are done under decorator. To create it use
7474
:py:meth:`~pyignite.cache.BaseCache.with_expire_policy`
7575

7676
.. literalinclude:: ../examples/expiry_policy.py
7777
:language: python
7878
:dedent: 12
79-
:lines: 96-105
79+
:lines: 97-106
8080

8181
Transactions
8282
------------

docs/examples.rst

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,20 +97,20 @@ in cache settings dictionary on creation.
9797
.. literalinclude:: ../examples/expiry_policy.py
9898
:language: python
9999
:dedent: 12
100-
:lines: 31-34
100+
:lines: 32-35
101101

102102
.. literalinclude:: ../examples/expiry_policy.py
103103
:language: python
104104
:dedent: 12
105-
:lines: 40-46
105+
:lines: 41-47
106106

107107
Secondly, expiry policy can be set for all cache operations, which are done under decorator. To create it use
108108
:py:meth:`~pyignite.cache.BaseCache.with_expire_policy`
109109

110110
.. literalinclude:: ../examples/expiry_policy.py
111111
:language: python
112112
:dedent: 12
113-
:lines: 53-60
113+
:lines: 54-61
114114

115115
Scan
116116
====

examples/expiry_policy.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# limitations under the License.
1515
import asyncio
1616
import time
17+
from datetime import timedelta
1718

1819
from pyignite import Client, AioClient
1920
from pyignite.datatypes import ExpiryPolicy
@@ -30,7 +31,7 @@ def main():
3031
try:
3132
ttl_cache = client.create_cache({
3233
PROP_NAME: 'test',
33-
PROP_EXPIRY_POLICY: ExpiryPolicy(create=1.0)
34+
PROP_EXPIRY_POLICY: ExpiryPolicy(create=timedelta(seconds=1.0))
3435
})
3536
except NotSupportedByClusterError:
3637
print("'ExpiryPolicy' API is not supported by cluster. Finishing...")
@@ -50,7 +51,7 @@ def main():
5051
print("Create simple Cache and set TTL through `with_expire_policy`")
5152
simple_cache = client.create_cache('test')
5253
try:
53-
ttl_cache = simple_cache.with_expire_policy(access=1.0)
54+
ttl_cache = simple_cache.with_expire_policy(access=timedelta(seconds=1.0))
5455
ttl_cache.put(1, 1)
5556
time.sleep(0.5)
5657
print(f"key = {1}, value = {ttl_cache.get(1)}")
@@ -71,7 +72,7 @@ async def async_main():
7172
try:
7273
ttl_cache = await client.create_cache({
7374
PROP_NAME: 'test',
74-
PROP_EXPIRY_POLICY: ExpiryPolicy(create=1.0)
75+
PROP_EXPIRY_POLICY: ExpiryPolicy(create=timedelta(seconds=1.0))
7576
})
7677
except NotSupportedByClusterError:
7778
print("'ExpiryPolicy' API is not supported by cluster. Finishing...")
@@ -93,7 +94,7 @@ async def async_main():
9394
print("Create simple Cache and set TTL through `with_expire_policy`")
9495
simple_cache = await client.create_cache('test')
9596
try:
96-
ttl_cache = simple_cache.with_expire_policy(access=1.0)
97+
ttl_cache = simple_cache.with_expire_policy(access=timedelta(seconds=1.0))
9798
await ttl_cache.put(1, 1)
9899
await asyncio.sleep(0.5)
99100
value = await ttl_cache.get(1)

examples/transactions.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ async def async_example():
6262

6363
# rollback transaction on timeout.
6464
try:
65-
async with client.tx_start(timeout=1.0, label='long-tx') as tx:
65+
async with client.tx_start(timeout=1000, label='long-tx') as tx:
6666
await cache.put(key, 'fail')
6767
await asyncio.sleep(2.0)
6868
await tx.commit()
@@ -114,7 +114,7 @@ def sync_example():
114114

115115
# rollback transaction on timeout.
116116
try:
117-
with client.tx_start(timeout=1.0, label='long-tx') as tx:
117+
with client.tx_start(timeout=1000, label='long-tx') as tx:
118118
cache.put(key, 'fail')
119119
time.sleep(2.0)
120120
tx.commit()

pyignite/aio_client.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -489,15 +489,15 @@ def get_cluster(self) -> 'AioCluster':
489489

490490
def tx_start(self, concurrency: TransactionConcurrency = TransactionConcurrency.PESSIMISTIC,
491491
isolation: TransactionIsolation = TransactionIsolation.REPEATABLE_READ,
492-
timeout: Union[int, float] = 0, label: Optional[str] = None) -> 'AioTransaction':
492+
timeout: int = 0, label: Optional[str] = None) -> 'AioTransaction':
493493
"""
494494
Start async thin client transaction. **Supported only python 3.7+**
495495
496496
:param concurrency: (optional) transaction concurrency, see
497-
:py:class:`~pyignite.datatypes.transactions.TransactionConcurrency`
497+
:py:class:`~pyignite.datatypes.transactions.TransactionConcurrency`,
498498
:param isolation: (optional) transaction isolation level, see
499-
:py:class:`~pyignite.datatypes.transactions.TransactionIsolation`
500-
:param timeout: (optional) transaction timeout in seconds if float, in millis if int
499+
:py:class:`~pyignite.datatypes.transactions.TransactionIsolation`,
500+
:param timeout: (optional) transaction timeout in milliseconds,
501501
:param label: (optional) transaction label.
502502
:return: :py:class:`~pyignite.transaction.AioTransaction` instance.
503503
"""

pyignite/cache.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
15-
15+
import datetime
1616
from typing import Any, Iterable, Optional, Tuple, Union
1717

1818
from .api.tx_api import get_tx_connection
@@ -136,16 +136,16 @@ def cache_id(self) -> int:
136136

137137
def with_expire_policy(
138138
self, expiry_policy: Optional[ExpiryPolicy] = None,
139-
create: Union[int, float] = ExpiryPolicy.UNCHANGED,
140-
update: Union[int, float] = ExpiryPolicy.UNCHANGED,
141-
access: Union[int, float] = ExpiryPolicy.UNCHANGED
139+
create: Union[int, datetime.timedelta] = ExpiryPolicy.UNCHANGED,
140+
update: Union[int, datetime.timedelta] = ExpiryPolicy.UNCHANGED,
141+
access: Union[int, datetime.timedelta] = ExpiryPolicy.UNCHANGED
142142
):
143143
"""
144144
:param expiry_policy: optional :class:`~pyignite.datatypes.expiry_policy.ExpiryPolicy`
145-
object. If it is set, other params will be ignored.
146-
:param create: create TTL in seconds (float) or milliseconds (int),
147-
:param update: Create TTL in seconds (float) or milliseconds (int),
148-
:param access: Create TTL in seconds (float) or milliseconds (int).
145+
object. If it is set, other params will be ignored,
146+
:param create: TTL for create in milliseconds or :py:class:`~time.timedelta`,
147+
:param update: TTL for update in milliseconds or :py:class:`~time.timedelta`,
148+
:param access: TTL for access in milliseconds or :py:class:`~time.timedelta`,
149149
:return: cache decorator with expiry policy set.
150150
"""
151151
if not self.client.protocol_context.is_expiry_policy_supported():

pyignite/client.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -744,15 +744,15 @@ def get_cluster(self) -> 'Cluster':
744744

745745
def tx_start(self, concurrency: TransactionConcurrency = TransactionConcurrency.PESSIMISTIC,
746746
isolation: TransactionIsolation = TransactionIsolation.REPEATABLE_READ,
747-
timeout: Union[int, float] = 0, label: Optional[str] = None) -> 'Transaction':
747+
timeout: int = 0, label: Optional[str] = None) -> 'Transaction':
748748
"""
749749
Start thin client transaction.
750750
751751
:param concurrency: (optional) transaction concurrency, see
752-
:py:class:`~pyignite.datatypes.transactions.TransactionConcurrency`
752+
:py:class:`~pyignite.datatypes.transactions.TransactionConcurrency`,
753753
:param isolation: (optional) transaction isolation level, see
754-
:py:class:`~pyignite.datatypes.transactions.TransactionIsolation`
755-
:param timeout: (optional) transaction timeout in seconds if float, in millis if int
754+
:py:class:`~pyignite.datatypes.transactions.TransactionIsolation`,
755+
:param timeout: (optional) transaction timeout in milliseconds,
756756
:param label: (optional) transaction label.
757757
:return: :py:class:`~pyignite.transaction.Transaction` instance.
758758
"""

pyignite/datatypes/cache_properties.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
# limitations under the License.
1515

1616
import ctypes
17+
import math
18+
from typing import Union
1719

1820
from . import ExpiryPolicy
1921
from .prop_codes import *
@@ -137,6 +139,20 @@ async def from_python_async(cls, stream, value):
137139
return cls.from_python(stream, value)
138140

139141

142+
class TimeoutProp(PropBase):
143+
prop_data_class = Long
144+
145+
@classmethod
146+
def from_python(cls, stream, value: int):
147+
if not isinstance(value, int) or value < 0:
148+
raise ValueError(f'Timeout value should be a positive integer, {value} passed instead')
149+
return super().from_python(stream, value)
150+
151+
@classmethod
152+
async def from_python_async(cls, stream, value):
153+
return cls.from_python(stream, value)
154+
155+
140156
class PropName(PropBase):
141157
prop_code = PROP_NAME
142158
prop_data_class = String
@@ -227,9 +243,8 @@ class PropRebalanceDelay(PropBase):
227243
prop_data_class = Long
228244

229245

230-
class PropRebalanceTimeout(PropBase):
246+
class PropRebalanceTimeout(TimeoutProp):
231247
prop_code = PROP_REBALANCE_TIMEOUT
232-
prop_data_class = Long
233248

234249

235250
class PropRebalanceBatchSize(PropBase):
@@ -262,9 +277,8 @@ class PropCacheKeyConfiguration(PropBase):
262277
prop_data_class = CacheKeyConfiguration
263278

264279

265-
class PropDefaultLockTimeout(PropBase):
280+
class PropDefaultLockTimeout(TimeoutProp):
266281
prop_code = PROP_DEFAULT_LOCK_TIMEOUT
267-
prop_data_class = Long
268282

269283

270284
class PropMaxConcurrentAsyncOperation(PropBase):

pyignite/datatypes/expiry_policy.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515
import ctypes
16+
import math
17+
from datetime import timedelta
1618
from io import SEEK_CUR
1719
from typing import Union
1820

@@ -22,13 +24,16 @@
2224

2325

2426
def _positive(_, attrib, value):
27+
if isinstance(value, timedelta):
28+
value = value.total_seconds() * 1000
29+
2530
if value < 0 and value not in [ExpiryPolicy.UNCHANGED, ExpiryPolicy.ETERNAL]:
2631
raise ValueError(f"'{attrib.name}' value must not be negative")
2732

2833

2934
def _write_duration(stream, value):
30-
if isinstance(value, float):
31-
value = int(value * 1000)
35+
if isinstance(value, timedelta):
36+
value = math.floor(value.total_seconds() * 1000)
3237

3338
stream.write(value.to_bytes(8, byteorder=PROTOCOL_BYTE_ORDER, signed=True))
3439

@@ -44,17 +49,17 @@ class ExpiryPolicy:
4449
#: Set TTL eternal.
4550
ETERNAL = -1
4651

47-
#: Set TTL for create in seconds(float) or millis(int)
48-
create = attr.ib(kw_only=True, default=UNCHANGED,
49-
validator=[attr.validators.instance_of((int, float)), _positive])
52+
#: Set TTL for create in milliseconds or :py:class:`~time.timedelta`
53+
create = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, timedelta],
54+
validator=[attr.validators.instance_of((int, timedelta)), _positive])
5055

51-
#: Set TTL for update in seconds(float) or millis(int)
52-
update = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, float],
53-
validator=[attr.validators.instance_of((int, float)), _positive])
56+
#: Set TTL for update in milliseconds or :py:class:`~time.timedelta`
57+
update = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, timedelta],
58+
validator=[attr.validators.instance_of((int, timedelta)), _positive])
5459

55-
#: Set TTL for access in seconds(float) or millis(int)
56-
access = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, float],
57-
validator=[attr.validators.instance_of((int, float)), _positive])
60+
#: Set TTL for access in milliseconds or :py:class:`~time.timedelta`
61+
access = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, timedelta],
62+
validator=[attr.validators.instance_of((int, timedelta)), _positive])
5863

5964
class _CType(ctypes.LittleEndianStructure):
6065
_pack_ = 1

pyignite/transaction.py

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,30 +13,50 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16-
import math
17-
from typing import Union
16+
from enum import IntEnum
17+
from typing import Union, Type
1818

1919
from pyignite.api.tx_api import tx_end, tx_start, tx_end_async, tx_start_async
2020
from pyignite.datatypes import TransactionIsolation, TransactionConcurrency
2121
from pyignite.exceptions import CacheError
2222
from pyignite.utils import status_to_exception
2323

2424

25-
def _convert_to_millis(timeout: Union[int, float]) -> int:
26-
if isinstance(timeout, float):
27-
return math.floor(timeout * 1000)
28-
return timeout
25+
def _validate_int_enum_param(value: Union[int, IntEnum], cls: Type[IntEnum]):
26+
if value not in cls:
27+
raise ValueError(f'{value} not in {cls}')
28+
return value
2929

3030

31-
class Transaction:
31+
def _validate_timeout(value):
32+
if not isinstance(value, int) or value < 0:
33+
raise ValueError(f'Timeout value should be a positive integer, {value} passed instead')
34+
return value
35+
36+
37+
def _validate_label(value):
38+
if value and not isinstance(value, str):
39+
raise ValueError(f'Label should be str, {type(value)} passed instead')
40+
return value
41+
42+
43+
class _BaseTransaction:
44+
def __init__(self, client, concurrency=TransactionConcurrency.PESSIMISTIC,
45+
isolation=TransactionIsolation.REPEATABLE_READ, timeout=0, label=None):
46+
self.client = client
47+
self.concurrency = _validate_int_enum_param(concurrency, TransactionConcurrency)
48+
self.isolation = _validate_int_enum_param(isolation, TransactionIsolation)
49+
self.timeout = _validate_timeout(timeout)
50+
self.label, self.closed = _validate_label(label), False
51+
52+
53+
class Transaction(_BaseTransaction):
3254
"""
3355
Thin client transaction.
3456
"""
3557
def __init__(self, client, concurrency=TransactionConcurrency.PESSIMISTIC,
3658
isolation=TransactionIsolation.REPEATABLE_READ, timeout=0, label=None):
37-
self.client, self.concurrency = client, concurrency
38-
self.isolation, self.timeout = isolation, _convert_to_millis(timeout)
39-
self.label, self.closed = label, False
59+
super().__init__(client, concurrency, isolation, timeout, label)
4060
self.tx_id = self.__start_tx()
4161

4262
def commit(self) -> None:
@@ -77,15 +97,13 @@ def __end_tx(self, committed):
7797
return tx_end(self.tx_id, committed)
7898

7999

80-
class AioTransaction:
100+
class AioTransaction(_BaseTransaction):
81101
"""
82102
Async thin client transaction.
83103
"""
84104
def __init__(self, client, concurrency=TransactionConcurrency.PESSIMISTIC,
85105
isolation=TransactionIsolation.REPEATABLE_READ, timeout=0, label=None):
86-
self.client, self.concurrency = client, concurrency
87-
self.isolation, self.timeout = isolation, _convert_to_millis(timeout)
88-
self.label, self.closed = label, False
106+
super().__init__(client, concurrency, isolation, timeout, label)
89107

90108
def __await__(self):
91109
return (yield from self.__aenter__().__await__())

0 commit comments

Comments
 (0)