Skip to content

Commit 78dc9c4

Browse files
committed
RDBC-700 Final sync
1 parent 027a1ac commit 78dc9c4

File tree

13 files changed

+212
-45
lines changed

13 files changed

+212
-45
lines changed

ravendb/documents/session/event_args.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import TYPE_CHECKING
1+
from typing import TYPE_CHECKING, Optional
22

33
import requests
44

@@ -263,7 +263,12 @@ def attempt_number(self) -> int:
263263

264264
class FailedRequestEventArgs(EventArgs):
265265
def __init__(
266-
self, database: str, url: str, exception: Exception, request: requests.Request, response: requests.Response
266+
self,
267+
database: str,
268+
url: str,
269+
exception: Exception,
270+
request: Optional[requests.Request],
271+
response: Optional[requests.Response],
267272
):
268273
self.__database = database
269274
self.__url = url

ravendb/documents/subscriptions/worker.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,14 @@
3333
SubscriberErrorException,
3434
SubscriptionDoesNotExistException,
3535
AllTopologyNodesDownException,
36+
InvalidNetworkTopologyException,
37+
SubscriptionMessageTypeException,
3638
)
3739
from ravendb.exceptions.raven_exceptions import ClientVersionMismatchException
3840
from ravendb.extensions.json_extensions import JsonExtensions
3941
from ravendb.json.metadata_as_dictionary import MetadataAsDictionary
4042
from ravendb.documents.session.document_session import DocumentSession
41-
from ravendb.documents.subscriptions.options import SubscriptionWorkerOptions
43+
from ravendb.documents.subscriptions.options import SubscriptionWorkerOptions, SubscriptionOpeningStrategy
4244
from ravendb.primitives.exceptions import OperationCancelledException
4345
from ravendb.primitives.misc import CancellationTokenSource
4446
from ravendb.http.request_executor import RequestExecutor
@@ -405,6 +407,8 @@ def _read_server_response_and_get_version(self, url: str, sock: socket) -> int:
405407
# Kindly request the server to drop the connection
406408
self._send_drop_message(reply)
407409
raise RuntimeError(f"Can't connect to database {self._db_name} because: {reply.message}")
410+
if reply.status == TcpConnectionStatus.INVALID_NETWORK_TOPOLOGY:
411+
raise InvalidNetworkTopologyException(f"Failed to connect to url {url} because {reply.message}")
408412

409413
return reply.version
410414

@@ -427,11 +431,16 @@ def _assert_connection_state(self, connection_status: SubscriptionConnectionServ
427431
raise DatabaseDoesNotExistException(f"{self._db_name} does not exists. {connection_status.message}")
428432

429433
if connection_status.type_of_message != SubscriptionConnectionServerMessage.MessageType.CONNECTION_STATUS:
430-
raise RuntimeError(
434+
message = (
431435
f"Server returned illegal type message when expecting connection status, "
432436
f"was: {connection_status.type_of_message}"
433437
)
434438

439+
if connection_status.type_of_message == SubscriptionConnectionServerMessage.MessageType.ERROR:
440+
message += f". Exception: {connection_status.exception}"
441+
442+
raise SubscriptionMessageTypeException(message)
443+
435444
if connection_status.status == SubscriptionConnectionServerMessage.ConnectionStatus.ACCEPTED:
436445
pass
437446
elif connection_status.status == SubscriptionConnectionServerMessage.ConnectionStatus.IN_USE:
@@ -459,6 +468,18 @@ def _assert_connection_state(self, connection_status: SubscriptionConnectionServ
459468
f"{connection_status.exception}"
460469
)
461470
elif connection_status.status == SubscriptionConnectionServerMessage.ConnectionStatus.REDIRECT:
471+
if self._options.strategy == SubscriptionOpeningStrategy.WAIT_FOR_FREE:
472+
if connection_status.data:
473+
register_connection_duration_in_ticks_object = connection_status.data.get(
474+
"RegisterConnectionDurationInTicks"
475+
)
476+
if (
477+
register_connection_duration_in_ticks_object / 10000000
478+
>= self._options.max_erroneous_period.seconds
479+
):
480+
# this worker connection waited for free for more than max erroneous period
481+
self._last_connection_failure = None
482+
462483
data = connection_status.data
463484
redirected_tag = data.get("RedirectedTag")
464485
appropriate_node = None if redirected_tag is None else redirected_tag
@@ -691,7 +712,9 @@ def __run_async() -> None:
691712
self._forced_topology_update_attempts += 1
692713
next_node_index = self._forced_topology_update_attempts % len(cur_topology)
693714
try:
694-
self._redirect_node = cur_topology[next_node_index]
715+
self._redirect_node = req_ex.get_requested_node(
716+
cur_topology[next_node_index].cluster_tag, True
717+
).current_node
695718
self._logger.info(
696719
f"Subscription '{self._options.subscription_name}'. "
697720
f"Will modify redirect node from None to "

ravendb/exceptions/exceptions.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,17 @@ class SubscriptionDoesNotExistException(SubscriptionException):
107107

108108

109109
class SubscriptionDoesNotBelongToNodeException(SubscriptionException):
110-
def __init__(self, *args, appropriate_node: Optional[str] = None, reasons: Dict[str, str] = None):
110+
def __init__(
111+
self,
112+
*args,
113+
appropriate_node: Optional[str] = None,
114+
reasons: Dict[str, str] = None,
115+
register_connection_duration_in_ticks: bool = None,
116+
):
111117
super(SubscriptionDoesNotBelongToNodeException, self).__init__(*args)
112118
self.appropriate_node = appropriate_node
113119
self.reasons = reasons
120+
self.register_connection_duration_in_ticks = register_connection_duration_in_ticks
114121

115122

116123
class SubscriptionChangeVectorUpdateConcurrencyException(SubscriptionException):
@@ -119,3 +126,15 @@ class SubscriptionChangeVectorUpdateConcurrencyException(SubscriptionException):
119126

120127
class SubscriberErrorException(Exception):
121128
pass
129+
130+
131+
class InvalidNetworkTopologyException(Exception):
132+
pass
133+
134+
135+
class SubscriptionMessageTypeException(Exception):
136+
pass
137+
138+
139+
class RavenTimeoutException(Exception):
140+
pass

ravendb/exceptions/raven_exceptions.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,11 @@ def __init__(self, message):
3131
class ClientVersionMismatchException(RavenException):
3232
def __init__(self, message: Optional[str] = None, cause: Optional[Exception] = None):
3333
super().__init__(message, cause)
34+
35+
36+
class PortInUseException(RavenException):
37+
pass
38+
39+
40+
class IndexCompactionInProgressException(RavenException):
41+
pass

ravendb/http/request_executor.py

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
UnsuccessfulRequestException,
2020
DatabaseDoesNotExistException,
2121
AuthorizationException,
22+
RequestedNodeUnavailableException,
2223
)
2324
from ravendb.documents.operations.configuration import GetClientConfigurationOperation
2425
from ravendb.exceptions.exception_dispatcher import ExceptionDispatcher
@@ -199,9 +200,28 @@ def preferred_node(self) -> CurrentIndexAndNode:
199200
self.__ensure_node_selector()
200201
return self._node_selector.get_preferred_node()
201202

203+
def get_requested_node(self, node_tag: str, throw_if_contains_failures: bool = False) -> CurrentIndexAndNode:
204+
self.__ensure_node_selector()
205+
current_index_and_node = self._node_selector.get_requested_node(node_tag)
206+
207+
if throw_if_contains_failures and not self._node_selector.node_is_available(
208+
current_index_and_node.current_index
209+
):
210+
raise RequestedNodeUnavailableException(
211+
f"Requested node {node_tag} currently unavailable, please try again later."
212+
)
213+
214+
return current_index_and_node
215+
202216
def __on_failed_request_invoke(self, url: str, e: Exception):
203217
for event in self.__on_failed_request:
204-
event(FailedRequestEventArgs(self.__database_name, url, e))
218+
event(FailedRequestEventArgs(self.__database_name, url, e, None, None))
219+
220+
def __on_failed_request_invoke_details(
221+
self, url: str, e: Exception, request: Optional[requests.Request], response: Optional[requests.Response]
222+
) -> None:
223+
for event in self.__on_failed_request:
224+
event(FailedRequestEventArgs(self.__database_name, url, e, request, response))
205225

206226
def __on_succeed_request_invoke(
207227
self, database: str, url: str, response: requests.Response, request: requests.Request, attempt_number: int
@@ -550,27 +570,22 @@ def __refresh_if_needed(self, chosen_node: ServerNode, response: requests.Respon
550570
refresh_topology = response.headers.get(constants.Headers.REFRESH_TOPOLOGY, False)
551571
refresh_client_configuration = response.headers.get(constants.Headers.REFRESH_CLIENT_CONFIGURATION, False)
552572

553-
if refresh_topology or refresh_client_configuration:
554-
server_node = ServerNode(chosen_node.url, self.__database_name)
573+
refresh_task = Future()
574+
refresh_task.set_result(False)
575+
576+
refresh_client_configuration_task = Future()
577+
refresh_client_configuration_task.set_result(None)
555578

556-
update_parameters = UpdateTopologyParameters(server_node)
579+
if refresh_topology:
580+
update_parameters = UpdateTopologyParameters(chosen_node)
557581
update_parameters.timeout_in_ms = 0
558582
update_parameters.debug_tag = "refresh-topology-header"
583+
refresh_task = self.update_topology_async(update_parameters)
559584

560-
if refresh_topology:
561-
topology_task = self.update_topology_async(update_parameters)
562-
else:
563-
topology_task = Future()
564-
topology_task.set_result(False)
565-
566-
if refresh_client_configuration:
567-
client_configuration = self._update_client_configuration_async(server_node)
568-
else:
569-
client_configuration = Future()
570-
client_configuration.set_result(None)
585+
if refresh_client_configuration:
586+
refresh_client_configuration_task = self._update_client_configuration_async(chosen_node)
571587

572-
return [topology_task, client_configuration]
573-
return []
588+
return [refresh_task, refresh_client_configuration_task]
574589

575590
def __send_request_to_server(
576591
self,
@@ -1154,7 +1169,7 @@ def __handle_server_down(
11541169
if index_node_and_etag.current_node in command.failed_nodes:
11551170
return False
11561171

1157-
self.__on_failed_request_invoke(url, e)
1172+
self.__on_failed_request_invoke(url, e, request, response)
11581173

11591174
self.execute(
11601175
index_node_and_etag.current_node, index_node_and_etag.current_index, command, should_retry, session_info

ravendb/http/server_node.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
from enum import Enum
2-
from typing import Optional
2+
from typing import Optional, TYPE_CHECKING
3+
4+
if TYPE_CHECKING:
5+
from ravendb.http.topology import ClusterTopology
36

47

58
class ServerNode:
@@ -44,6 +47,22 @@ def __hash__(self) -> int:
4447
def last_server_version(self) -> str:
4548
return self.__last_server_version
4649

50+
@classmethod
51+
def create_from(cls, topology: "ClusterTopology"):
52+
nodes = []
53+
if topology is None:
54+
return nodes
55+
56+
for key, value in topology.members.items():
57+
server_node = cls(value, cluster_tag=key)
58+
nodes.append(server_node)
59+
60+
for key, value in topology.watchers.items():
61+
server_node = cls(value, cluster_tag=key)
62+
nodes.append(server_node)
63+
64+
return nodes
65+
4766
def should_update_server_version(self) -> bool:
4867
if self.last_server_version is None or self.__last_server_version_check > 100:
4968
return True

ravendb/http/topology.py

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import uuid
77
from abc import abstractmethod
88
from concurrent.futures import ThreadPoolExecutor
9-
from typing import TYPE_CHECKING
9+
from typing import TYPE_CHECKING, Optional
1010
from typing import Union, List, Dict
1111

1212
from ravendb.exceptions.exceptions import (
@@ -77,6 +77,16 @@ def __init__(self, topology: Topology):
7777
self.fastest_records = [0] * len(topology.nodes)
7878
self.fastest: Union[None, int] = None
7979
self.speed_test_mode = 0
80+
self.unlikely_everyone_faulted_choice_index: Optional[int] = 0
81+
82+
@property
83+
def node_when_everyone_marked_as_faulted(self) -> CurrentIndexAndNode:
84+
index = self.unlikely_everyone_faulted_choice_index
85+
self.unlikely_everyone_faulted_choice_index = (self.unlikely_everyone_faulted_choice_index + 1) % len(
86+
self.nodes
87+
)
88+
89+
return CurrentIndexAndNode(index, self.nodes[index])
8090

8191
def __enter__(self):
8292
return self
@@ -94,6 +104,9 @@ def __init__(self, topology: Topology, thread_pool: ThreadPoolExecutor):
94104
def topology(self) -> Topology:
95105
return self.__state.topology
96106

107+
def node_is_available(self, index: int) -> bool:
108+
return self.__state.failures[index] == 0
109+
97110
def on_failed_request(self, node_index: int) -> None:
98111
state = self.__state
99112
if node_index < 0 or node_index >= len(state.failures):
@@ -117,16 +130,10 @@ def on_update_topology(self, topology: Topology, force_update: bool = False) ->
117130

118131
def get_requested_node(self, node_tag: str) -> CurrentIndexAndNode:
119132
state = self.__state
120-
state_failures = state.failures
121133
server_nodes = state.nodes
122-
length = min(len(server_nodes), len(state_failures))
123-
for i in range(length):
134+
for i in range(len(server_nodes)):
124135
if server_nodes[i].cluster_tag == node_tag:
125-
if state_failures[i] == 0 and server_nodes[i].url:
126-
return CurrentIndexAndNode(i, server_nodes[i])
127-
raise RequestedNodeUnavailableException(
128-
f"Requested node {node_tag} is currently unavailable, please try again later."
129-
)
136+
return CurrentIndexAndNode(i, server_nodes[i])
130137

131138
if len(state.nodes) == 0:
132139
raise DatabaseDoesNotExistException("There are no nodes in the topology at all")
@@ -142,7 +149,7 @@ def get_preferred_node_internal(cls, state: NodeSelector.__NodeSelectorState) ->
142149
server_nodes = state.nodes
143150
length = min(len(server_nodes), len(state_failures))
144151
for i in range(length):
145-
if state_failures[0] == 0 and server_nodes[i].url:
152+
if state_failures[0] == 0:
146153
return CurrentIndexAndNode(i, server_nodes[i])
147154
return cls.unlikely_everyone_faulted_choice(state)
148155

@@ -154,11 +161,11 @@ def get_preferred_node_with_topology(self) -> CurrentIndexAndNodeAndEtag:
154161

155162
@staticmethod
156163
def unlikely_everyone_faulted_choice(state: NodeSelector.__NodeSelectorState) -> CurrentIndexAndNode:
157-
# if there are all marked as failed, we'll chose the first
164+
# if there are all marked as failed, we'll chose the next (the one in CurrentNodeIndex)
158165
# one so the user will get an error (or recover :-) )
159166
if len(state.nodes) == 0:
160167
raise DatabaseDoesNotExistException("There are no nodes in the topology at all")
161-
return CurrentIndexAndNode(0, state.nodes[0])
168+
return state.node_when_everyone_marked_as_faulted
162169

163170
def get_node_by_session_id(self, session_id: int) -> CurrentIndexAndNode:
164171
state = self.__state
@@ -289,20 +296,34 @@ def __init__(self, current_index: int, current_node: ServerNode, etag: int):
289296
class NodeStatus:
290297
def __init__(
291298
self,
299+
name: str,
292300
connected: bool,
293301
error_details: str,
294302
last_send: datetime.datetime,
295303
last_reply: datetime.datetime,
296304
last_sent_message: str,
297305
last_matching_index: int,
298306
):
307+
self.name = name
299308
self.connected = connected
300309
self.error_details = error_details
301310
self.last_send = last_send
302311
self.last_reply = last_reply
303312
self.last_sent_message = last_sent_message
304313
self.last_matching_index = last_matching_index
305314

315+
def __str__(self):
316+
return (
317+
"NodeStatus{"
318+
f"name='{self.name}'"
319+
f", connected={self.connected}"
320+
f", errorDetails={self.error_details}"
321+
f", lastSend={self.last_send}"
322+
f", lastReply={self.last_reply}"
323+
f", lastSentMessage={self.last_sent_message}"
324+
"}"
325+
)
326+
306327

307328
class RaftCommand:
308329
@abstractmethod

0 commit comments

Comments
 (0)