diff --git a/.bumpversion.cfg b/.bumpversion.cfg index e0380b6..274b67c 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,7 +1,7 @@ [bumpversion] commit = True tag = True -current_version = 0.1.4 +current_version = 0.2.6 parse = (?P\d+)\.(?P\d+)\.(?P\d+)(\-(?P[a-z]+))? serialize = {major}.{minor}.{patch}-{release} diff --git a/consul/__init__.py b/consul/__init__.py index cabfc76..43a3e56 100644 --- a/consul/__init__.py +++ b/consul/__init__.py @@ -1,4 +1,4 @@ -__version__ = '0.1.4' +__version__ = '0.2.6' from consul.base import ACLDisabled # noqa from consul.base import ACLPermissionDenied # noqa diff --git a/consul/aio.py b/consul/aio.py index e782f62..b22d2bc 100644 --- a/consul/aio.py +++ b/consul/aio.py @@ -5,6 +5,7 @@ import warnings import aiohttp +from aiohttp import ClientTimeout from consul import base @@ -20,10 +21,10 @@ def __init__(self, *args, loop=None, **kwargs): self._session = None self._loop = loop or asyncio.get_event_loop() - async def _request(self, callback, method, uri, data=None, headers=None): + async def _request(self, callback, method, uri, data=None, headers=None, total_timeout=None): connector = aiohttp.TCPConnector(loop=self._loop, verify_ssl=self.verify) - async with aiohttp.ClientSession(connector=connector) as session: + async with aiohttp.ClientSession(connector=connector, timeout=ClientTimeout(total=total_timeout)) as session: self._session = session resp = await session.request(method=method, url=uri, @@ -47,9 +48,9 @@ def __del__(self): ResourceWarning) asyncio.ensure_future(self.close()) - async def get(self, callback, path, params=None, headers=None): + async def get(self, callback, path, params=None, headers=None, total_timeout=None): uri = self.uri(path, params) - return await self._request(callback, 'GET', uri, headers=headers) + return await self._request(callback, 'GET', uri, headers=headers, total_timeout=total_timeout) async def put(self, callback, path, params=None, data='', headers=None): uri = self.uri(path, params) diff --git a/consul/base.py b/consul/base.py index 9d27dbd..c862204 100755 --- a/consul/base.py +++ b/consul/base.py @@ -1,9 +1,13 @@ import abc import base64 import collections +import enum import json import logging import os +import re +import threading +import time import warnings import six @@ -284,6 +288,232 @@ def cb(response): return cb +# +# Convenience to define weight + +class Weight(object): + """ + There object for set weights parameters like this + {'passing': 100, 'warning': 100} + """ + + @classmethod + def weights(cls, passing, warning): + return {'passing': passing, 'warning': warning} + + +class ConsistencyMode(enum.Enum): + """ + Most of the read query endpoints support multiple levels of consistency. + Since no policy will suit all clients' needs, + these consistency modes allow the user to have the ultimate say in how + to balance the trade-offs inherent in a distributed system. + The three read modes are: + + *DEFAULT* - If not specified, the default is strongly consistent in almost all cases. + However, there is a small window in which a new leader may be elected during which + he old leader may service stale values. The trade-off is fast reads but potentially stale values. + The condition resulting in stale reads is hard to trigger, and most clients should + not need to worry about this case. Also, note that this race condition only applies to reads, not writes. + + *CONSISTENT* - This mode is strongly consistent without caveats. + It requires that a leader verify with a quorum of peers that it is still leader. + This introduces an additional round-trip to all server nodes. The trade-off is increased + latency due to an extra round trip. Most clients should not use + this unless they cannot tolerate a stale read. + + *STALE* - This mode allows any server to service the read regardless of whether it is the leader. + This means reads can be arbitrarily stale; however, results are generally + consistent to within 50 milliseconds of the leader. The trade-off is very fast and scalable reads + with a higher likelihood of stale values. Since this mode allows reads without a leader, a cluster + that is unavailable will still be able to respond to queries. + """ + + DEFAULT = 'default' + CONSISTENT = 'consistent' + STALE = 'stale' + + +class ConsulCacheBase(metaclass=abc.ABCMeta): + """ + Base consul cache implements, that support blocking query. + + *cache* is a dict that consist cache values by key. + + *callbacks* is a list of methods, that will be invoked when cache updated + + *watch_seconds* is the maximum duration for the blocking request. + + *index* is the current Consul index, suitable for making subsequent + calls to wait for changes since this query was last run. + """ + + def __init__(self, watch_seconds: str, backoff_delay_seconds: int, caller: str): + self.cache = dict() + self.callbacks = [] + self.caller = caller + self.watch_seconds = watch_seconds + self.backoff_delay_seconds = backoff_delay_seconds + self.index = None + self._running = True + self._cache_thread = threading.Thread( + target=self._update_cache, + name='update_consul_cache_thread', + daemon=True) + + def start(self): + self._cache_thread.start() + + def stop(self): + self._running = False + + def add_listener(self, callback, trigger_current=False): + self.callbacks.append(callback) + log.debug(f'Registered callback: {self.callbacks}') + if trigger_current: + for key, value in self.cache.items(): + callback(key, value) + + @abc.abstractmethod + def _update_cache(cls): + pass + + +class HealthCache(ConsulCacheBase): + """ + Consul health service cache. + + *service* is a service name for getting healths info. + + *passing* specifies that the server should return only nodes + with all checks in the passing state. This can be used to avoid + additional filtering on the client side. + """ + + def __init__(self, + health_client, + watch_seconds: str, + backoff_delay_seconds: int, + service: str, + passing: bool, + dc: str, + caller: str): + super().__init__(watch_seconds, backoff_delay_seconds, caller) + self.service = service + self.health_client = health_client + self.passing = passing + self.dc = dc.lower() + self.index, service_health = health_client.service( + service=self.service, + passing=self.passing, + dc=self.dc, + caller=self.caller + ) + self.cache = {self.service: service_health} + + def _update_cache(self): + while self._running: + try: + params = { + 'service': self.service, + 'passing': self.passing, + 'index': self.index, + 'wait': self.watch_seconds, + 'dc': self.dc, + 'caller': self.caller + } + log.debug(f'Param for health query: {params}') + self.index, values = self.health_client.service(**params) + old_cache = self.cache + self.cache = {self.service: values} + if self.callbacks and self._running: + for key, old_value in old_cache.items(): + new_value = self.cache.get(key, None) + for callback in self.callbacks: + callback(key, new_value) + except Exception as e: + log.error(f'Some problem with update consul cache: {e}. Will retry in {self.backoff_delay_seconds}s') + time.sleep(self.backoff_delay_seconds) + + +class KVCache(ConsulCacheBase): + """ + Consul key-value cache. + + *path* is a key for getting value + + *consistency_mode* sets the consistency mode to use by default for all reads + that support the consistency option. It's still possible to override + this by passing explicitly for a given request. *consistency* can be + either 'default', 'consistent' or 'stale'. + + *total_timeout* is a ttl of HTTP session. Should be more than *watch_seconds* + + *cache_initial_warmup_timeout* is a ttl of HTTP session for initialize cache. + May be None, will use *total_timeout* insted + """ + + def __init__(self, + kv_client, + watch_seconds: str, + backoff_delay_seconds: int, + path: str, + total_timeout: int, + recurse: bool, + consistency_mode: ConsistencyMode, + caller: str, + cache_initial_warmup_timeout=None): + super().__init__(watch_seconds, backoff_delay_seconds, caller) + self.kv_client = kv_client + self.path = path + self.recurse = recurse + self.consistency_mode = consistency_mode.value + self.total_timeout = total_timeout + self.cache_initial_warmup_timeout = cache_initial_warmup_timeout + self.index, kv = kv_client.get( + key=path, + recurse=recurse, + total_timeout=self._get_warmup_timeout(), + caller=self.caller + ) + self.cache = {self.path: kv} + + def _get_warmup_timeout(self): + if self.cache_initial_warmup_timeout: + return self.cache_initial_warmup_timeout + return self.total_timeout + + def get_value(self): + return self.cache.get(self.path, None) + + def _update_cache(self): + while self._running: + try: + params = { + 'key': self.path, + 'index': self.index, + 'wait': self.watch_seconds, + 'total_timeout': self.total_timeout, + 'consistency': self.consistency_mode, + 'recurse': self.recurse, + 'caller': self.caller + } + log.debug(f'Param for kv query: {params}') + self.index, values = self.kv_client.get(**params) + old_cache = self.cache + self.cache = {self.path: values} + if self.callbacks and self._running: + for key, new_value in self.cache.items(): + old_value = old_cache.get(key, None) + if old_value != new_value: + log.debug(f'Value was changed for key={key}. old: {old_value} new: {new_value}') + for callback in self.callbacks: + callback(key, new_value) + except Exception as e: + log.error(f'Some problem with update consul cache: {e}. Will retry in {self.backoff_delay_seconds}s') + time.sleep(self.backoff_delay_seconds) + + class HTTPClient(six.with_metaclass(abc.ABCMeta, object)): def __init__(self, host='127.0.0.1', port=8500, scheme='http', verify=True, cert=None, timeout=None): @@ -302,7 +532,7 @@ def uri(self, path, params=None): return uri @abc.abstractmethod - def get(self, callback, path, params=None, headers=None): + def get(self, callback, path, params=None, headers=None, total_timeout=None): raise NotImplementedError @abc.abstractmethod @@ -1364,12 +1594,14 @@ def register( check=None, token=None, meta=None, + weights=None, # *deprecated* use check parameter script=None, interval=None, ttl=None, http=None, timeout=None, + caller=None, enable_tag_override=False): """ Add a new service to the local agent. There is more @@ -1396,9 +1628,15 @@ def register( *meta* specifies arbitrary KV metadata linked to the service formatted as {k1:v1, k2:v2}. + *weights* specifies weights for the service. + If this field is not provided weights + will default to {"Passing": 1, "Warning": 1}. + *script*, *interval*, *ttl*, *http*, and *timeout* arguments are deprecated. use *check* instead. + *caller* is a name of caller service. + *enable_tag_override* is an optional bool that enable you to modify a service tags from servers(consul agent role server) Default is set to False. @@ -1407,7 +1645,7 @@ def register( for more information https://www.consul.io/docs/agent/services.html """ - + params = [] payload = {} payload['name'] = name @@ -1425,6 +1663,11 @@ def register( payload['meta'] = meta if check: payload['check'] = check + if weights: + payload['weights'] = weights + + if caller: + params.append(('caller', caller)) else: payload.update(Check._compat( @@ -1443,22 +1686,28 @@ def register( CB.bool(), path='/v1/agent/service/register', headers=headers, + params=params, data=json.dumps(payload)) - def deregister(self, service_id, token=None): + def deregister(self, service_id, caller=None, token=None): """ Used to remove a service from the local agent. The agent will take care of deregistering the service with the Catalog. If there is an associated check, that is also deregistered. """ + + params = [] headers = {} token = token or self.agent.token if token: headers['X-Consul-Token'] = token + if caller: + params.append(('caller', caller)) return self.agent.http.put( CB.bool(), path='/v1/agent/service/deregister/%s' % service_id, - headers=headers + headers=headers, + params=params ) def maintenance(self, service_id, enable, reason=None, token=None): @@ -2703,7 +2952,8 @@ def service(self, dc=None, near=None, token=None, - node_meta=None): + node_meta=None, + caller=None): """ Returns a tuple of (*index*, *nodes*) @@ -2731,6 +2981,8 @@ def service(self, *node_meta* is an optional meta data used for filtering, a dictionary formatted as {k1:v1, k2:v2}. + + *caller* is a name of caller service. """ params = [] headers = {} @@ -2755,6 +3007,8 @@ def service(self, for nodemeta_name, nodemeta_value in node_meta.items(): params.append(('node-meta', '{0}:{1}'. format(nodemeta_name, nodemeta_value))) + if caller: + params.append(('caller', caller)) return self.agent.http.get( CB.json(index=True), path='/v1/health/service/%s' % service, @@ -2934,7 +3188,9 @@ def get( consistency=None, keys=False, separator=None, - dc=None): + dc=None, + total_timeout=None, + caller=None): """ Returns a tuple of (*index*, *value[s]*) @@ -2970,6 +3226,8 @@ def get( "Session": "adf4238a-882b-9ddc-4a9d-5b6758e4159e" } + *caller* is a name of caller service. + Note, if the requested key does not exists *(index, None)* is returned. It's then possible to long poll on the index for when the key is created. @@ -2981,6 +3239,10 @@ def get( if index: params.append(('index', index)) if wait: + assert total_timeout, \ + 'total_timeout should be setted' + assert not self._convert_wait_to_seconds(wait) >= total_timeout, \ + f'wait: {wait} should be less than total_timeout: {total_timeout}s' params.append(('wait', wait)) if recurse: params.append(('recurse', '1')) @@ -2997,6 +3259,8 @@ def get( consistency = consistency or self.agent.consistency if consistency in ('consistent', 'stale'): params.append((consistency, '1')) + if caller: + params.append(('caller', caller)) one = False decode = False @@ -3009,7 +3273,18 @@ def get( CB.json(index=True, decode=decode, one=one, map=lambda x: x if x else None), path='/v1/kv/%s' % key, - params=params, headers=headers) + params=params, headers=headers, total_timeout=total_timeout) + + def _convert_wait_to_seconds(self, wait): + unit_to_seconds_multiplier = { + 'ms': 0.001, + 's': 1, + 'm': 60 + } + wait_digit = int(re.search(r'\d+', wait).group()) + multiplier = unit_to_seconds_multiplier[re.search(r'ms|s|m', wait).group()] + + return wait_digit * multiplier def put( self, diff --git a/consul/std.py b/consul/std.py index 224b6fe..feb54ea 100644 --- a/consul/std.py +++ b/consul/std.py @@ -19,7 +19,7 @@ def response(response): response.text, response.content) - def get(self, callback, path, params=None, headers=None): + def get(self, callback, path, params=None, headers=None, total_timeout=None): uri = self.uri(path, params) return callback(self.response( self.session.get(uri, diff --git a/consul/tornado.py b/consul/tornado.py index 8294885..d90e610 100644 --- a/consul/tornado.py +++ b/consul/tornado.py @@ -31,12 +31,13 @@ def _request(self, callback, request): response = e.response raise gen.Return(callback(self.response(response))) - def get(self, callback, path, params=None, headers=None): + def get(self, callback, path, params=None, headers=None, total_timeout=None): uri = self.uri(path, params) request = httpclient.HTTPRequest(uri, method='GET', validate_cert=self.verify, - headers=headers) + headers=headers, + connect_timeout=total_timeout) return self._request(callback, request) def put(self, callback, path, params=None, data='', headers=None): diff --git a/consul/twisted.py b/consul/twisted.py index 900ea75..02c37e3 100644 --- a/consul/twisted.py +++ b/consul/twisted.py @@ -100,13 +100,14 @@ def request(self, callback, method, url, **kwargs): 'Request incomplete: {} {}'.format(method.upper(), url)) @inlineCallbacks - def get(self, callback, path, params=None, headers=None): + def get(self, callback, path, params=None, headers=None, total_timeout=None): uri = self.uri(path, params) response = yield self.request(callback, 'get', uri, params=params, - headers=headers) + headers=headers, + total_timeout=total_timeout) returnValue(response) @inlineCallbacks diff --git a/requirements.txt b/requirements.txt index 5e03435..51fb41d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ requests six>=1.4 +aiohttp == 3.7.4 diff --git a/setup.py b/setup.py index 92dc99a..4452195 100644 --- a/setup.py +++ b/setup.py @@ -42,7 +42,7 @@ def run_tests(self): setup( - name='python-consul2', + name='python-consul2-hh', version=metadata['version'], author='yan.gao', author_email='373251686@qq.com', diff --git a/sonar-project.properties b/sonar-project.properties index 373175e..979d314 100644 --- a/sonar-project.properties +++ b/sonar-project.properties @@ -1,7 +1,7 @@ # Required metadata sonar.projectKey=com.github:poppyred:python-consul2 -sonar.projectName=Python Consul2 -sonar.projectVersion=0.1.4 +sonar.projectName=Python Consul2 HH +sonar.projectVersion=0.2.6 # Comma-separated paths to directories with sources (required) sonar.sources=consul diff --git a/tests/test_base.py b/tests/test_base.py index a80227f..5d815dc 100755 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -11,7 +11,7 @@ Response = consul.base.Response Request = collections.namedtuple( - 'Request', ['method', 'path', 'params', 'data', 'headers']) + 'Request', ['method', 'path', 'params', 'data', 'headers', 'total_timeout']) class HTTPClient(object): @@ -19,14 +19,14 @@ def __init__(self, host=None, port=None, scheme=None, verify=True, cert=None, timeout=None, headers=None): pass - def get(self, callback, path, params=None, headers=None): - return Request('get', path, params, None, headers) + def get(self, callback, path, params=None, headers=None, total_timeout=None): + return Request('get', path, params, None, headers, total_timeout) - def put(self, callback, path, params=None, data='', headers=None): - return Request('put', path, params, data, headers) + def put(self, callback, path, params=None, data='', headers=None, total_timeout=None): + return Request('put', path, params, data, headers, total_timeout) - def delete(self, callback, path, params=None, headers=None): - return Request('delete', path, params, None, headers) + def delete(self, callback, path, params=None, headers=None, total_timeout=None): + return Request('delete', path, params, None, headers, total_timeout) class Consul(consul.base.Consul): diff --git a/tests/test_std.py b/tests/test_std.py index 2ff1099..7475b99 100644 --- a/tests/test_std.py +++ b/tests/test_std.py @@ -8,6 +8,7 @@ import consul import consul.std +from consul.base import Weight Check = consul.Check @@ -30,13 +31,57 @@ def test_kv(self, consul_port): index, data = c.kv.get('foo') assert data['Value'] == six.b('bar') - def test_kv_wait(self, consul_port): + def test_kv_wait_ms(self, consul_port): c = consul.Consul(port=consul_port) assert c.kv.put('foo', 'bar') is True index, data = c.kv.get('foo') - check, data = c.kv.get('foo', index=index, wait='20ms') + check, data = c.kv.get('foo', index=index, wait='20ms', total_timeout=30) assert index == check + def test_kv_wait_s(self, consul_port): + c = consul.Consul(port=consul_port) + assert c.kv.put('foo', 'bar') is True + index, data = c.kv.get('foo') + check, data = c.kv.get('foo', index=index, wait='20s', total_timeout=30) + assert index == check + + def test_kv_wait_m(self, consul_port): + c = consul.Consul(port=consul_port) + assert c.kv.put('foo', 'bar') is True + index, data = c.kv.get('foo') + check, data = c.kv.get('foo', index=index, wait='1m', total_timeout=61) + assert index == check + + def test_kv_wait_more_timeout_ms(self, consul_port): + c = consul.Consul(port=consul_port) + assert c.kv.put('foo', 'bar') is True + index, data = c.kv.get('foo') + pytest.raises( + AssertionError, + c.kv.get, + 'foo', index=index, wait='20ms', total_timeout=0 + ) + + def test_kv_wait_more_timeout_s(self, consul_port): + c = consul.Consul(port=consul_port) + assert c.kv.put('foo', 'bar') is True + index, data = c.kv.get('foo') + pytest.raises( + AssertionError, + c.kv.get, + 'foo', index=index, wait='20s', total_timeout=19 + ) + + def test_kv_wait_more_timeout_m(self, consul_port): + c = consul.Consul(port=consul_port) + assert c.kv.put('foo', 'bar') is True + index, data = c.kv.get('foo') + pytest.raises( + AssertionError, + c.kv.get, + 'foo', index=index, wait='1m', total_timeout=59 + ) + def test_kv_encoding(self, consul_port): c = consul.Consul(port=consul_port) @@ -348,6 +393,26 @@ def test_agent_register_enable_tag_override(self, consul_port): # Cleanup tasks c.agent.check.deregister('foo') + def test_agent_register_enable_weights(self, consul_port): + c = consul.Consul(port=consul_port) + index, nodes = c.health.service("foo1") + assert nodes == [] + + c.agent.service.register('foo', weights=Weight.weights(10, 10)) + assert c.agent.services()['foo']['Weights'] == {"Passing": 10, "Warning": 10} + # Cleanup tasks + c.agent.check.deregister('foo') + + def test_agent_register_disable_weights(self, consul_port): + c = consul.Consul(port=consul_port) + index, nodes = c.health.service("foo1") + assert nodes == [] + + c.agent.service.register('foo') + assert c.agent.services()['foo']['Weights'] == {"Passing": 1, "Warning": 1} + # Cleanup tasks + c.agent.check.deregister('foo') + def test_agent_service_maintenance(self, consul_port): c = consul.Consul(port=consul_port) diff --git a/tests/test_std_token.py b/tests/test_std_token.py index 86b60c9..b5852ad 100644 --- a/tests/test_std_token.py +++ b/tests/test_std_token.py @@ -24,7 +24,7 @@ def test_kv_wait(self, acl_consul): c = consul.Consul(port=acl_consul.port, token=acl_consul.token) assert c.kv.put('foo', 'bar') is True index, data = c.kv.get('foo') - check, data = c.kv.get('foo', index=index, wait='20ms') + check, data = c.kv.get('foo', index=index, wait='20ms', total_timeout=30) assert index == check def test_kv_encoding(self, acl_consul):