diff --git a/include/riakc.hrl b/include/riakc.hrl
index 6f711d7d..a0dfe48b 100644
--- a/include/riakc.hrl
+++ b/include/riakc.hrl
@@ -24,7 +24,7 @@
-define(PROTO_MAJOR, 1).
-define(PROTO_MINOR, 0).
-define(DEFAULT_PB_TIMEOUT, 60000).
--define(FIRST_RECONNECT_INTERVAL, 100).
+-define(FIRST_RECONNECT_INTERVAL, 10).
-define(MAX_RECONNECT_INTERVAL, 30000).
-type client_option() :: queue_if_disconnected |
diff --git a/src/riakc_pb_socket.erl b/src/riakc_pb_socket.erl
index 59aca82b..193f8360 100644
--- a/src/riakc_pb_socket.erl
+++ b/src/riakc_pb_socket.erl
@@ -42,6 +42,7 @@
set_options/2, set_options/3,
is_connected/1, is_connected/2,
ping/1, ping/2,
+ queue_len/1,
get_client_id/1, get_client_id/2,
set_client_id/2, set_client_id/3,
get_server_info/1, get_server_info/2,
@@ -96,6 +97,7 @@
-deprecated({get_index,'_', eventually}).
+-type timeout2() :: timeout() | {timeout(), timeout()}.
-type ctx() :: any().
-type rpb_req() :: {tunneled, msg_id(), binary()} | atom() | tuple().
-type rpb_resp() :: atom() | tuple().
@@ -154,6 +156,7 @@
transport = gen_tcp :: 'gen_tcp' | 'ssl',
active :: #request{} | undefined, % active request
queue :: request_queue_t() | undefined, % queue of pending requests
+ queue_len=0 :: non_neg_integer(), % queue size
connects=0 :: non_neg_integer(), % number of successful connects
failed=[] :: [connection_failure()], % breakdown of failed connects
connect_timeout=infinity :: timeout(), % timeout of TCP connection
@@ -244,6 +247,10 @@ ping(Pid) ->
ping(Pid, Timeout) ->
call_infinity(Pid, {req, rpbpingreq, Timeout}).
+%% @doc Check how long the queue of requests is
+queue_len(Pid) ->
+ call_infinity(Pid, {check, queue_len}).
+
%% @doc Get the client id for this connection
%% @equiv get_client_id(Pid, default_timeout(get_client_id_timeout))
-spec get_client_id(pid()) -> {ok, client_id()} | {error, term()}.
@@ -289,8 +296,10 @@ get(Pid, Bucket, Key) ->
%% @doc Get bucket/key from the server specifying timeout.
%% Will return {error, notfound} if the key is not on the server.
%% @equiv get(Pid, Bucket, Key, Options, Timeout)
--spec get(pid(), bucket(), key(), TimeoutOrOptions::timeout() | get_options()) ->
+-spec get(pid(), bucket(), key(), TimeoutOrOptions::timeout2() | get_options()) ->
{ok, riakc_obj()} | {error, term()} | unchanged.
+get(Pid, Bucket, Key, {T1,T2} = Timeout) when is_integer(T1), is_integer(T2) ->
+ get(Pid, Bucket, Key, [], Timeout);
get(Pid, Bucket, Key, Timeout) when is_integer(Timeout); Timeout =:= infinity ->
get(Pid, Bucket, Key, [], Timeout);
get(Pid, Bucket, Key, Options) ->
@@ -300,7 +309,7 @@ get(Pid, Bucket, Key, Options) ->
%% unchanged will be returned when the
%% {if_modified, Vclock} option is specified and the
%% object is unchanged.
--spec get(pid(), bucket(), key(), get_options(), timeout()) ->
+-spec get(pid(), bucket(), key(), get_options(), timeout2()) ->
{ok, riakc_obj()} | {error, term()} | unchanged.
get(Pid, Bucket, Key, Options, Timeout) ->
{T, B} = maybe_bucket_type(Bucket),
@@ -318,8 +327,10 @@ put(Pid, Obj) ->
%% @doc Put the metadata/value in the object under bucket/key with options or timeout.
%% @equiv put(Pid, Obj, Options, Timeout)
%% @see put/4
--spec put(pid(), riakc_obj(), TimeoutOrOptions::timeout() | put_options()) ->
+-spec put(pid(), riakc_obj(), TimeoutOrOptions::timeout2() | put_options()) ->
ok | {ok, riakc_obj()} | riakc_obj() | {ok, key()} | {error, term()}.
+put(Pid, Obj, {T1,T2} = Timeout) when is_integer(T1), is_integer(T2) ->
+ put(Pid, Obj, [], Timeout);
put(Pid, Obj, Timeout) when is_integer(Timeout); Timeout =:= infinity ->
put(Pid, Obj, [], Timeout);
put(Pid, Obj, Options) ->
@@ -336,7 +347,7 @@ put(Pid, Obj, Options) ->
%% `return_body' was specified.
%% @throws siblings
%% @end
--spec put(pid(), riakc_obj(), put_options(), timeout()) ->
+-spec put(pid(), riakc_obj(), put_options(), timeout2()) ->
ok | {ok, riakc_obj()} | riakc_obj() | {ok, key()} | {error, term()}.
put(Pid, Obj, Options, Timeout) ->
Content = riak_pb_kv_codec:encode_content({riakc_obj:get_update_metadata(Obj),
@@ -357,15 +368,17 @@ delete(Pid, Bucket, Key) ->
%% @doc Delete the key/value specifying timeout or options. Note that the rw quorum is deprecated, use r and w.
%% @equiv delete(Pid, Bucket, Key, Options, Timeout)
--spec delete(pid(), bucket(), key(), TimeoutOrOptions::timeout() | delete_options()) ->
+-spec delete(pid(), bucket(), key(), TimeoutOrOptions::timeout2() | delete_options()) ->
ok | {error, term()}.
+delete(Pid, Bucket, Key, {T1,T2} = Timeout) when is_integer(T1), is_integer(T2) ->
+ delete(Pid, Bucket, Key, [], Timeout);
delete(Pid, Bucket, Key, Timeout) when is_integer(Timeout); Timeout =:= infinity ->
delete(Pid, Bucket, Key, [], Timeout);
delete(Pid, Bucket, Key, Options) ->
delete(Pid, Bucket, Key, Options, default_timeout(delete_timeout)).
%% @doc Delete the key/value with options and timeout. Note that the rw quorum is deprecated, use r and w.
--spec delete(pid(), bucket(), key(), delete_options(), timeout()) -> ok | {error, term()}.
+-spec delete(pid(), bucket(), key(), delete_options(), timeout2()) -> ok | {error, term()}.
delete(Pid, Bucket, Key, Options, Timeout) ->
{T, B} = maybe_bucket_type(Bucket),
Req = delete_options(Options, #rpbdelreq{type = T, bucket = B, key = Key}),
@@ -419,7 +432,7 @@ delete_obj(Pid, Obj, Options) ->
%% @doc Delete the riak object with options and timeout.
%% @equiv delete_vclock(Pid, riakc_obj:bucket(Obj), riakc_obj:key(Obj), riakc_obj:vclock(Obj), Options, Timeout)
%% @see delete_vclock/6
--spec delete_obj(pid(), riakc_obj(), delete_options(), timeout()) -> ok | {error, term()}.
+-spec delete_obj(pid(), riakc_obj(), delete_options(), timeout2()) -> ok | {error, term()}.
delete_obj(Pid, Obj, Options, Timeout) ->
delete_vclock(Pid, riakc_obj:bucket(Obj), riakc_obj:key(Obj),
riakc_obj:vclock(Obj), Options, Timeout).
@@ -1136,7 +1149,7 @@ cs_bucket_fold(Pid, Bucket, Opts) when is_pid(Pid), (is_binary(Bucket) orelse
%% @doc Return the default timeout for an operation if none is provided.
%% Falls back to the default timeout.
--spec default_timeout(timeout_name()) -> timeout().
+-spec default_timeout(timeout_name()) -> timeout2().
default_timeout(OpTimeout) ->
case application:get_env(riakc, OpTimeout) of
{ok, EnvTimeout} ->
@@ -1316,26 +1329,30 @@ handle_call(is_connected, _From, State) ->
end;
handle_call({set_options, Options}, _From, State) ->
{reply, ok, parse_options(Options, State)};
+handle_call({check, queue_len}, _From, #state{queue_len = QueueLen} = State) ->
+ {reply, QueueLen, State};
handle_call(stop, _From, State) ->
- _ = disconnect(State),
- {stop, normal, ok, State}.
+ _ = disconnect(State, true),
+ {stop, normal, ok, State};
+handle_call(get_state, _From, State) ->
+ {reply, State, State}.
%% @private
handle_info({tcp_error, _Socket, Reason}, State) ->
error_logger:error_msg("PBC client TCP error for ~p:~p - ~p\n",
[State#state.address, State#state.port, Reason]),
- disconnect(State);
+ disconnect(State, true);
handle_info({tcp_closed, _Socket}, State) ->
- disconnect(State);
+ disconnect(State, true);
handle_info({ssl_error, _Socket, Reason}, State) ->
error_logger:error_msg("PBC client SSL error for ~p:~p - ~p\n",
[State#state.address, State#state.port, Reason]),
- disconnect(State);
+ disconnect(State, true);
handle_info({ssl_closed, _Socket}, State) ->
- disconnect(State);
+ disconnect(State, true);
%% Make sure the two Sock's match. If a request timed out, but there was
%% a response queued up behind it we do not want to process it. Instead
@@ -1374,19 +1391,13 @@ handle_info({Proto, Sock, Data}, State=#state{sock = Sock, active = Active})
ok = ssl:setopts(Sock, [{active, once}])
end,
{noreply, NewState};
-handle_info({req_timeout, Ref}, State) ->
- case State#state.active of %%
- undefined ->
- {noreply, remove_queued_request(Ref, State)};
- Active ->
- case Ref == Active#request.ref of
- true -> %% Matches the current operation
- NewState = maybe_reply(on_timeout(State#state.active, State)),
- disconnect(NewState#state{active = undefined});
- false ->
- {noreply, remove_queued_request(Ref, State)}
- end
- end;
+handle_info({TimeoutTag, Ref}, #state{active = #request{ref = Ref}} = State)
+ when TimeoutTag == op_timeout; TimeoutTag == req_timeout ->
+ NewState = maybe_reply(on_timeout(State#state.active, State)),
+ disconnect(NewState#state{active = undefined}, false);
+handle_info({TimeoutTag, Ref}, State)
+ when TimeoutTag == q_timeout; TimeoutTag == req_timeout ->
+ {noreply, remove_queued_request(Ref, State)};
handle_info(reconnect, State) ->
case connect(State) of
{ok, NewState} ->
@@ -1394,7 +1405,7 @@ handle_info(reconnect, State) ->
{error, Reason} ->
%% Update the failed count and reschedule a reconnection
NewState = State#state{failed = orddict:update_counter(Reason, 1, State#state.failed)},
- disconnect(NewState)
+ disconnect(NewState, true)
end;
handle_info(_, State) ->
{noreply, State}.
@@ -1982,6 +1993,8 @@ create_req_timer(infinity, _Ref) ->
undefined;
create_req_timer(undefined, _Ref) ->
undefined;
+create_req_timer({Msecs,_}, Ref) ->
+ erlang:send_after(Msecs, self(), {q_timeout, Ref});
create_req_timer(Msecs, Ref) ->
erlang:send_after(Msecs, self(), {req_timeout, Ref}).
@@ -2089,7 +2102,7 @@ start_auth(State=#state{credentials={User,Pass}, sock=Sock}) ->
%% @private
%% Disconnect socket if connected
-disconnect(State) ->
+disconnect(State, DelayReconnect) ->
%% Tell any pending requests we've disconnected
_ = case State#state.active of
undefined ->
@@ -2109,12 +2122,15 @@ disconnect(State) ->
%% Decide whether to reconnect or exit
NewState = State#state{sock = undefined, active = undefined},
- case State#state.auto_reconnect of
- true ->
+ case {State#state.auto_reconnect, DelayReconnect} of
+ {true, true} ->
%% Schedule the reconnect message and return state
erlang:send_after(State#state.reconnect_interval, self(), reconnect),
{noreply, increase_reconnect_interval(NewState)};
- false ->
+ {true, false} ->
+ self() ! reconnect,
+ {noreply, NewState};
+ {false, _} ->
{stop, disconnected, NewState}
end.
@@ -2130,16 +2146,27 @@ increase_reconnect_interval(State) ->
%% Send a request to the server and prepare the state for the response
%% @private
-send_request(Request0, State) when State#state.active =:= undefined ->
- {Request, Pkt} = encode_request_message(Request0),
+
+send_request(#request{ref = Ref,
+ tref = TRef,
+ timeout = Timeout} = Request0, State)
+ when State#state.active =:= undefined ->
+ {Request1, Pkt} = encode_request_message(Request0),
Transport = State#state.transport,
case Transport:send(State#state.sock, Pkt) of
ok ->
- maybe_reply(after_send(Request, State#state{active = Request}));
+ case Timeout of
+ {_,Msecs} ->
+ cancel_req_timer(TRef),
+ Request2 = Request1#request{tref = erlang:send_after(Msecs, self(), {op_timeout, Ref})},
+ maybe_reply(after_send(Request2, State#state{active = Request2}));
+ _ ->
+ maybe_reply(after_send(Request1, State#state{active = Request1}))
+ end;
{error, Reason} ->
error_logger:warning_msg("Socket error while sending riakc request: ~p.", [Reason]),
Transport:close(State#state.sock),
- maybe_enqueue_and_reconnect(Request, State#state{sock=undefined})
+ maybe_enqueue_and_reconnect(Request1, State#state{sock=undefined})
end.
%% Already encoded (for tunneled messages), but must provide Message Id
@@ -2164,37 +2191,44 @@ maybe_reconnect(_) -> ok.
%% If we can queue while disconnected, do so, otherwise tell the
%% caller that the socket was disconnected.
enqueue_or_reply_error(Request, #state{queue_if_disconnected=true}=State) ->
- queue_request(Request, State);
+ case Request#request.timeout of
+ {_,_} -> send_caller({error, timeout}, Request); % we've already used part of the op timeout
+ _ -> queue_request_head(Request, State)
+ end;
enqueue_or_reply_error(Request, State) ->
_ = send_caller({error, disconnected}, Request),
State.
%% Queue up a request if one is pending
%% @private
-queue_request(Request, State) ->
- State#state{queue = queue:in(Request, State#state.queue)}.
+queue_request(Request, State) -> queue_request(Request, State, in).
+queue_request_head(Request, State) -> queue_request(Request, State, in_r).
+queue_request(Request, #state{queue_len = QLen, queue = Q} = State, Infunc) ->
+ State#state{queue_len = QLen + 1, queue = queue:Infunc(Request, Q)}.
%% Try and dequeue request and send onto the server if one is waiting
%% @private
-dequeue_request(State) ->
+dequeue_request(#state{queue_len = QLen} = State) ->
case queue:out(State#state.queue) of
{empty, _} ->
- State;
+ State#state{active = undefined};
{{value, Request}, Q2} ->
- send_request(Request, State#state{queue = Q2})
+ send_request(Request, State#state{active = undefined,
+ queue_len = QLen - 1,
+ queue = Q2})
end.
%% Remove a queued request by reference - returns same queue if ref not present
%% @private
-remove_queued_request(Ref, State) ->
- L = queue:to_list(State#state.queue),
- case lists:keytake(Ref, #request.ref, L) of
+remove_queued_request(Ref, #state{queue_len = QLen, queue = Q} = State) ->
+ case lists:keytake(Ref, #request.ref, queue:to_list(Q)) of
false -> % Ref not queued up
State;
{value, Req, L2} ->
{reply, Reply, NewState} = on_timeout(Req, State),
_ = send_caller(Reply, Req),
- NewState#state{queue = queue:from_list(L2)}
+ NewState#state{queue_len = QLen - 1,
+ queue = queue:from_list(L2)}
end.
%% @private
@@ -3921,4 +3955,289 @@ live_node_tests() ->
end)}
].
+timeout_no_conn_test() ->
+ % test req & {q, op} timeouts when there's no connection
+
+ % test requests have to be spawned, so there's some variance in the reponse times
+ % this makes it difficult to decide on the acceptable tolerance for the pass/fail decision
+ % so the test is variable, it may fail the first time but try it again
+
+ {ok, Pid} = start_link(test_ip(), 65225, [auto_reconnect, queue_if_disconnected]),
+ Self = self(),
+ REQ = fun(Func, TO) ->
+ erlang:spawn(fun() ->
+ {T,Info} = (catch timer:tc(?MODULE, Func,
+ [Pid, <<"qwer">>, <<"qwer">>, [], TO])),
+ Self ! {self(), {T div 1000, Info}}
+ end)
+ end,
+
+ RES = fun(RPid) ->
+ receive
+ {RPid, Result} -> Result
+ after
+ 3000 -> error
+ end
+ end,
+
+ P01 = REQ(get, {1500,100}), timer:sleep(1), % sleep to ensure that spawned requests
+ P02 = REQ(get, {1000,100}), timer:sleep(1), % are actually sent in this order
+ P03 = REQ(get, {1500,100}), timer:sleep(1),
+ P04 = REQ(get, {1000,100}), timer:sleep(1),
+ P05 = REQ(get, {1500,100}), timer:sleep(1),
+ P06 = REQ(get, {1000,100}), timer:sleep(1),
+ P07 = REQ(get, {1500,100}), timer:sleep(1),
+ P08 = REQ(get, {1000,100}), timer:sleep(1),
+ P09 = REQ(get, {1500,100}), timer:sleep(1),
+ P10 = REQ(get, 200), timer:sleep(1),
+ P11 = REQ(get, 400), timer:sleep(1),
+ P12 = REQ(get, 600), timer:sleep(1),
+ P13 = REQ(get, 800), timer:sleep(1),
+ P14 = REQ(get, 200), timer:sleep(1),
+ P15 = REQ(get, 1000), timer:sleep(2500), 0 = queue_len(Pid),
+ P16 = REQ(get, {200,1000}), timer:sleep(1),
+ P17 = REQ(get, {200,1000}),
+
+ {T01, {error, timeout}} = RES(P01),
+ {T02, {error, timeout}} = RES(P02),
+ {T03, {error, timeout}} = RES(P03),
+ {T04, {error, timeout}} = RES(P04),
+ {T05, {error, timeout}} = RES(P05),
+ {T06, {error, timeout}} = RES(P06),
+ {T07, {error, timeout}} = RES(P07),
+ {T08, {error, timeout}} = RES(P08),
+ {T09, {error, timeout}} = RES(P09),
+ {T10, {error, timeout}} = RES(P10),
+ {T11, {error, timeout}} = RES(P11),
+ {T12, {error, timeout}} = RES(P12),
+ {T13, {error, timeout}} = RES(P13),
+ {T14, {error, timeout}} = RES(P14),
+ {T15, {error, timeout}} = RES(P15),
+ {T16, {error, timeout}} = RES(P16),
+ {T17, {error, timeout}} = RES(P17),
+
+ % All these requests should spend ~1500ms in the queue
+ io:format(user, "150ms TIMES: ~p ~p ~p ~p ~p~n", [T01,T03,T05,T07,T09]),
+ lists:foreach(fun(T) -> true = T > 1490, true = T < 1515 end, [T01,T03,T05,T07,T09]),
+
+ % All these requests should spend ~1000ms in the queue
+ io:format(user, "100ms TIMES: ~p ~p ~p ~p~n", [T02,T04,T06,T08]),
+ lists:foreach(fun(T) -> true = T > 990, true = T < 1015 end, [T02,T04,T06,T08]),
+
+ % These test the old timeouts
+ io:format(user, "TIMES: ~p ~p ~p ~p ~p ~p ~p ~p~n", [T10,T11,T12,T13,T14,T15,T16,T17]),
+ true = T10 > 190, true = T10 < 215,
+ true = T11 > 390, true = T11 < 415,
+ true = T12 > 590, true = T12 < 615,
+ true = T13 > 790, true = T13 < 815,
+ true = T14 > 190, true = T14 < 215,
+ true = T15 > 990, true = T15 < 1015,
+
+ % These 2 requests should spend ~200ms in the queue
+ true = T16 > 190, true = T16 < 215,
+ true = T17 > 190, true = T17 < 215,
+
+ stop(Pid).
+
+timeout_conn_test() ->
+ % test req & {q, op} timeouts when there is a connection that never responds
+
+ % test requests have to be spawned, so there's some variance in the reponse times
+ % this makes it difficult to decide on the acceptable tolerance for the pass/fail decision
+ % so the test is variable, it may fail the first time but try it again
+
+ % Set up a dummy socket to send requests on
+ {ok, DummyServerPid, Port} = dummy_server(noreply),
+ {ok, Pid} = start("127.0.0.1", Port, [auto_reconnect, queue_if_disconnected]),
+ erlang:monitor(process, DummyServerPid),
+ Self = self(),
+ REQ = fun(Func, TO) ->
+ erlang:spawn(fun() ->
+ {T,Info} = (catch timer:tc(?MODULE, Func,
+ [Pid, <<"qwer">>, <<"qwer">>, [], TO])),
+ Self ! {self(), {T div 1000, Info}}
+ end)
+ end,
+
+ RES = fun(RPid) ->
+ receive
+ {RPid, Result} -> Result
+ after
+ 3000 -> error
+ end
+ end,
+
+ P01 = REQ(get, {1000, 200}), timer:sleep(1), % sleep to ensure that spawned requests
+ P02 = REQ(get, {1000, 200}), timer:sleep(1), % are actually sent in this order
+ P03 = REQ(get, {1000, 200}), timer:sleep(1),
+ P04 = REQ(get, {1000, 200}), timer:sleep(1),
+ P05 = REQ(get, {1000, 200}), timer:sleep(1),
+ P06 = REQ(get, {1000, 200}), timer:sleep(1),
+ P07 = REQ(get, {1000, 200}), timer:sleep(1),
+ P08 = REQ(get, {1000, 200}), timer:sleep(1),
+ P09 = REQ(get, {1000, 200}), timer:sleep(1),
+ P10 = REQ(get, {1000, 200}), timer:sleep(1),
+ P11 = REQ(get, 200), timer:sleep(1),
+ P12 = REQ(get, 400), timer:sleep(1),
+ P13 = REQ(get, 600), timer:sleep(1),
+ P14 = REQ(get, 800), timer:sleep(1),
+ P15 = REQ(get, 200), timer:sleep(1),
+ P16 = REQ(get, 1000), timer:sleep(2000), 0 = queue_len(Pid),
+ P17 = REQ(get, {200, 1000}), timer:sleep(1),
+ P18 = REQ(get, {200, 1000}),
+
+ {T01, {error, timeout}} = RES(P01),
+ {T02, {error, timeout}} = RES(P02),
+ {T03, {error, timeout}} = RES(P03),
+ {T04, {error, timeout}} = RES(P04),
+ {T05, {error, timeout}} = RES(P05),
+ {T06, {error, timeout}} = RES(P06),
+ {T07, {error, timeout}} = RES(P07),
+ {T08, {error, timeout}} = RES(P08),
+ {T09, {error, timeout}} = RES(P09),
+ {T10, {error, timeout}} = RES(P10),
+ {T11, {error, timeout}} = RES(P11),
+ {T12, {error, timeout}} = RES(P12),
+ {T13, {error, timeout}} = RES(P13),
+ {T14, {error, timeout}} = RES(P14),
+ {T15, {error, timeout}} = RES(P15),
+ {T16, {error, timeout}} = RES(P16),
+ {T17, {error, timeout}} = RES(P17),
+ {T18, {error, timeout}} = RES(P18),
+
+ catch DummyServerPid ! stop,
+ timer:sleep(10),
+ receive _Msg -> ok
+ after 1 -> ok
+ end,
+
+ io:format(user, "TIMES: ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p~n",
+ [T01,T02,T03,T04,T05,T06,T07,T08,T09,T10,T11,T12,T13,T14,T15,T16,T17,T18]),
+ true = T01 > 190, true = T01 < 215, % This one is serviced right away & should timeout after 200ms
+ true = T02 > 390, true = T02 < 415, % This one is serviced after the 1st one has timed out, ~400ms
+ true = T03 > 590, true = T03 < 615, % This one is serviced after the 1st two have timed out, ~600ms
+ true = T04 > 790, true = T04 < 815, % This one is serviced after the 1st three have timed out, ~800ms
+ true = T05 > 990, true = T05 < 1015, % This one is serviced after the 1st four have timed out, ~1000ms
+
+ [HD|TL] = lists:reverse(lists:sort([T06,T07,T08,T09,T10])),
+ % One will have queued & been serviced, ~1200ms
+ true = HD > 1190, true = HD < 1215,
+ % All these will timeout in the queue, ~1000ms
+ lists:foreach(fun(T) -> true = T > 990, true = T < 1015 end, TL),
+
+ % These test for backward compatibility
+ true = T11 > 190, true = T11 < 215,
+ true = T12 > 390, true = T12 < 415,
+ true = T13 > 590, true = T13 < 615,
+ true = T14 > 790, true = T14 < 815,
+ true = T15 > 190, true = T15 < 215,
+ true = T16 > 990, true = T16 < 1015,
+
+ true = T17 > 990, true = T17 < 1015, % This one will be serviced right away & timeout after ~100ms
+ true = T18 > 190, true = T18 < 215, % This one will timeout in the queue waiting for the previous one ~20ms
+
+ stop(Pid).
+
+overload_demo_test() ->
+ {ok, DummyServerPid, Port} = dummy_server({5, <<10>>}),
+ {ok, Pid} = start("127.0.0.1", Port, [auto_reconnect, queue_if_disconnected]),
+ timer:sleep(50),
+ erlang:monitor(process, DummyServerPid),
+
+ Self = self(),
+
+ REQ = fun(Func, TO) ->
+ erlang:spawn(fun() ->
+ Info = (catch apply(?MODULE, Func,
+ [Pid, <<"qwer">>, <<"qwer">>, [], TO])),
+ Self ! {self(), Info}
+ end)
+ end,
+
+ RES = fun(RPid) ->
+ receive
+ {RPid, Result} -> Result
+ after
+ 4000 -> error
+ end
+ end,
+
+ TEST = fun(TO) ->
+
+ PidList = lists:foldl(fun(_,Acc) ->
+ timer:sleep(4),
+ [REQ(get, TO) | Acc]
+ end, [], lists:seq(1,200)),
+ timer:sleep(20),
+
+ ReplyList = lists:foldl(fun(RPid,Acc) ->
+ [RES(RPid) | Acc]
+ end, [], PidList),
+
+ Replies = lists:foldl(
+ fun({error,Reply},Acc) ->
+ case lists:keyfind(Reply, 1, Acc) of
+ false -> [{Reply, 1} | Acc];
+ {Reply, C} ->
+ lists:keyreplace(Reply, 1, Acc, {Reply, C+1})
+ end
+ end, [], ReplyList),
+ TimeOuts = case lists:keyfind(timeout, 1, Replies) of
+ {_,TOV} -> TOV;
+ false -> 0
+ end,
+ NotFounds = case lists:keyfind(notfound, 1, Replies) of
+ {_,NFV} -> NFV;
+ false -> 0
+ end,
+
+ io:format(user, " With timeout: ~p we got ~p timeouts and ~p replies~n",
+ [TO, TimeOuts, NotFounds]),
+ {TimeOuts, NotFounds}
+
+ end,
+
+ {OldTO, OldNF} = TEST(9),
+ {NewTO, NewNF} = TEST({1,8}),
+
+ catch DummyServerPid ! stop,
+ timer:sleep(10),
+ receive _Msg -> ok % io:format(user, "MSG: ~p~n", [_Msg])
+ after 1 -> ok % io:format(user, "NO MSG: ~p~n", [process_info(DummyServerPid, messages)])
+ end,
+
+ % the worst-case response time for both timeout mechanisms is the same, 9ms
+ % using the old mechanism we typically get ~10 responses
+ % and the rest timeout as long as the link remains overloaded
+ true = OldTO > 170,
+ true = OldNF < 30,
+ % but using the new mechanism we get get an almost 66% success rate
+ % consistent during the overloaded state
+ true = NewTO < 80,
+ true = NewNF > 120,
+
+ stop(Pid).
+
+dummy_server(Directive) ->
+ {ok, Listen} = gen_tcp:listen(0, [binary, {packet, 4}, {active, true}]),
+ {ok, Port} = inet:port(Listen),
+ Pid = spawn(?MODULE, dummy_server_loop, [{Listen, no_conn, Directive}]),
+ {ok, Pid, Port}.
+
+dummy_server_loop({Listen, no_conn, Directive}) ->
+ {ok, Sock} = gen_tcp:accept(Listen),
+ dummy_server_loop({Listen, Sock, Directive});
+dummy_server_loop({Listen, Sock, Directive}) ->
+ receive
+ stop -> ok;
+ {tcp_closed, Sock} -> dummy_server_loop({Listen, no_conn, Directive});
+ _Data ->
+ case Directive of
+ noreply -> dummy_server_loop({Listen, Sock, Directive}); % ignore requests, let them timeout
+ {SleepMs, Reply} ->
+ spawn(fun() -> timer:sleep(SleepMs), gen_tcp:send(Sock, Reply) end),
+ dummy_server_loop({Listen, Sock, Directive})
+ end
+ end.
+
-endif.