From 6d890f0a9f23a6dc1b65d626230e4c2dcecaacf9 Mon Sep 17 00:00:00 2001 From: seanmcevoy Date: Fri, 13 Nov 2015 16:44:37 +0000 Subject: [PATCH 1/4] added new timeout handling, better for overloaded connections --- include/riakc.hrl | 2 +- src/riakc_pb_socket.erl | 322 ++++++++++++++++++++++++++++++++++------ 2 files changed, 277 insertions(+), 47 deletions(-) diff --git a/include/riakc.hrl b/include/riakc.hrl index 6f711d7d..a0dfe48b 100644 --- a/include/riakc.hrl +++ b/include/riakc.hrl @@ -24,7 +24,7 @@ -define(PROTO_MAJOR, 1). -define(PROTO_MINOR, 0). -define(DEFAULT_PB_TIMEOUT, 60000). --define(FIRST_RECONNECT_INTERVAL, 100). +-define(FIRST_RECONNECT_INTERVAL, 10). -define(MAX_RECONNECT_INTERVAL, 30000). -type client_option() :: queue_if_disconnected | diff --git a/src/riakc_pb_socket.erl b/src/riakc_pb_socket.erl index 59aca82b..aa0351f1 100644 --- a/src/riakc_pb_socket.erl +++ b/src/riakc_pb_socket.erl @@ -42,6 +42,7 @@ set_options/2, set_options/3, is_connected/1, is_connected/2, ping/1, ping/2, + queue_len/1, get_client_id/1, get_client_id/2, set_client_id/2, set_client_id/3, get_server_info/1, get_server_info/2, @@ -96,6 +97,7 @@ -deprecated({get_index,'_', eventually}). +-type timeout2() :: timeout() | {timeout(), timeout()}. -type ctx() :: any(). -type rpb_req() :: {tunneled, msg_id(), binary()} | atom() | tuple(). -type rpb_resp() :: atom() | tuple(). @@ -154,6 +156,7 @@ transport = gen_tcp :: 'gen_tcp' | 'ssl', active :: #request{} | undefined, % active request queue :: request_queue_t() | undefined, % queue of pending requests + queue_len=0 :: non_neg_integer(), % queue size connects=0 :: non_neg_integer(), % number of successful connects failed=[] :: [connection_failure()], % breakdown of failed connects connect_timeout=infinity :: timeout(), % timeout of TCP connection @@ -244,6 +247,10 @@ ping(Pid) -> ping(Pid, Timeout) -> call_infinity(Pid, {req, rpbpingreq, Timeout}). +%% @doc Check how long the queue of requests is +queue_len(Pid) -> + call_infinity(Pid, {check, queue_len}). + %% @doc Get the client id for this connection %% @equiv get_client_id(Pid, default_timeout(get_client_id_timeout)) -spec get_client_id(pid()) -> {ok, client_id()} | {error, term()}. @@ -289,8 +296,10 @@ get(Pid, Bucket, Key) -> %% @doc Get bucket/key from the server specifying timeout. %% Will return {error, notfound} if the key is not on the server. %% @equiv get(Pid, Bucket, Key, Options, Timeout) --spec get(pid(), bucket(), key(), TimeoutOrOptions::timeout() | get_options()) -> +-spec get(pid(), bucket(), key(), TimeoutOrOptions::timeout2() | get_options()) -> {ok, riakc_obj()} | {error, term()} | unchanged. +get(Pid, Bucket, Key, {T1,T2} = Timeout) when is_integer(T1), is_integer(T2) -> + get(Pid, Bucket, Key, [], Timeout); get(Pid, Bucket, Key, Timeout) when is_integer(Timeout); Timeout =:= infinity -> get(Pid, Bucket, Key, [], Timeout); get(Pid, Bucket, Key, Options) -> @@ -300,7 +309,7 @@ get(Pid, Bucket, Key, Options) -> %% unchanged will be returned when the %% {if_modified, Vclock} option is specified and the %% object is unchanged. --spec get(pid(), bucket(), key(), get_options(), timeout()) -> +-spec get(pid(), bucket(), key(), get_options(), timeout2()) -> {ok, riakc_obj()} | {error, term()} | unchanged. get(Pid, Bucket, Key, Options, Timeout) -> {T, B} = maybe_bucket_type(Bucket), @@ -318,8 +327,10 @@ put(Pid, Obj) -> %% @doc Put the metadata/value in the object under bucket/key with options or timeout. %% @equiv put(Pid, Obj, Options, Timeout) %% @see put/4 --spec put(pid(), riakc_obj(), TimeoutOrOptions::timeout() | put_options()) -> +-spec put(pid(), riakc_obj(), TimeoutOrOptions::timeout2() | put_options()) -> ok | {ok, riakc_obj()} | riakc_obj() | {ok, key()} | {error, term()}. +put(Pid, Obj, {T1,T2} = Timeout) when is_integer(T1), is_integer(T2) -> + put(Pid, Obj, [], Timeout); put(Pid, Obj, Timeout) when is_integer(Timeout); Timeout =:= infinity -> put(Pid, Obj, [], Timeout); put(Pid, Obj, Options) -> @@ -336,7 +347,7 @@ put(Pid, Obj, Options) -> %% `return_body' was specified. %% @throws siblings %% @end --spec put(pid(), riakc_obj(), put_options(), timeout()) -> +-spec put(pid(), riakc_obj(), put_options(), timeout2()) -> ok | {ok, riakc_obj()} | riakc_obj() | {ok, key()} | {error, term()}. put(Pid, Obj, Options, Timeout) -> Content = riak_pb_kv_codec:encode_content({riakc_obj:get_update_metadata(Obj), @@ -357,15 +368,17 @@ delete(Pid, Bucket, Key) -> %% @doc Delete the key/value specifying timeout or options. Note that the rw quorum is deprecated, use r and w. %% @equiv delete(Pid, Bucket, Key, Options, Timeout) --spec delete(pid(), bucket(), key(), TimeoutOrOptions::timeout() | delete_options()) -> +-spec delete(pid(), bucket(), key(), TimeoutOrOptions::timeout2() | delete_options()) -> ok | {error, term()}. +delete(Pid, Bucket, Key, {T1,T2} = Timeout) when is_integer(T1), is_integer(T2) -> + delete(Pid, Bucket, Key, [], Timeout); delete(Pid, Bucket, Key, Timeout) when is_integer(Timeout); Timeout =:= infinity -> delete(Pid, Bucket, Key, [], Timeout); delete(Pid, Bucket, Key, Options) -> delete(Pid, Bucket, Key, Options, default_timeout(delete_timeout)). %% @doc Delete the key/value with options and timeout. Note that the rw quorum is deprecated, use r and w. --spec delete(pid(), bucket(), key(), delete_options(), timeout()) -> ok | {error, term()}. +-spec delete(pid(), bucket(), key(), delete_options(), timeout2()) -> ok | {error, term()}. delete(Pid, Bucket, Key, Options, Timeout) -> {T, B} = maybe_bucket_type(Bucket), Req = delete_options(Options, #rpbdelreq{type = T, bucket = B, key = Key}), @@ -419,7 +432,7 @@ delete_obj(Pid, Obj, Options) -> %% @doc Delete the riak object with options and timeout. %% @equiv delete_vclock(Pid, riakc_obj:bucket(Obj), riakc_obj:key(Obj), riakc_obj:vclock(Obj), Options, Timeout) %% @see delete_vclock/6 --spec delete_obj(pid(), riakc_obj(), delete_options(), timeout()) -> ok | {error, term()}. +-spec delete_obj(pid(), riakc_obj(), delete_options(), timeout2()) -> ok | {error, term()}. delete_obj(Pid, Obj, Options, Timeout) -> delete_vclock(Pid, riakc_obj:bucket(Obj), riakc_obj:key(Obj), riakc_obj:vclock(Obj), Options, Timeout). @@ -1136,7 +1149,7 @@ cs_bucket_fold(Pid, Bucket, Opts) when is_pid(Pid), (is_binary(Bucket) orelse %% @doc Return the default timeout for an operation if none is provided. %% Falls back to the default timeout. --spec default_timeout(timeout_name()) -> timeout(). +-spec default_timeout(timeout_name()) -> timeout2(). default_timeout(OpTimeout) -> case application:get_env(riakc, OpTimeout) of {ok, EnvTimeout} -> @@ -1316,26 +1329,30 @@ handle_call(is_connected, _From, State) -> end; handle_call({set_options, Options}, _From, State) -> {reply, ok, parse_options(Options, State)}; +handle_call({check, queue_len}, _From, #state{queue_len = QueueLen} = State) -> + {reply, QueueLen, State}; handle_call(stop, _From, State) -> - _ = disconnect(State), - {stop, normal, ok, State}. + _ = disconnect(State, true), + {stop, normal, ok, State}; +handle_call(get_state, _From, State) -> + {reply, State, State}. %% @private handle_info({tcp_error, _Socket, Reason}, State) -> error_logger:error_msg("PBC client TCP error for ~p:~p - ~p\n", [State#state.address, State#state.port, Reason]), - disconnect(State); + disconnect(State, true); handle_info({tcp_closed, _Socket}, State) -> - disconnect(State); + disconnect(State, true); handle_info({ssl_error, _Socket, Reason}, State) -> error_logger:error_msg("PBC client SSL error for ~p:~p - ~p\n", [State#state.address, State#state.port, Reason]), - disconnect(State); + disconnect(State, true); handle_info({ssl_closed, _Socket}, State) -> - disconnect(State); + disconnect(State, true); %% Make sure the two Sock's match. If a request timed out, but there was %% a response queued up behind it we do not want to process it. Instead @@ -1374,19 +1391,13 @@ handle_info({Proto, Sock, Data}, State=#state{sock = Sock, active = Active}) ok = ssl:setopts(Sock, [{active, once}]) end, {noreply, NewState}; -handle_info({req_timeout, Ref}, State) -> - case State#state.active of %% - undefined -> - {noreply, remove_queued_request(Ref, State)}; - Active -> - case Ref == Active#request.ref of - true -> %% Matches the current operation - NewState = maybe_reply(on_timeout(State#state.active, State)), - disconnect(NewState#state{active = undefined}); - false -> - {noreply, remove_queued_request(Ref, State)} - end - end; +handle_info({TimeoutTag, Ref}, #state{active = #request{ref = Ref}} = State) + when TimeoutTag == op_timeout; TimeoutTag == req_timeout -> + NewState = maybe_reply(on_timeout(State#state.active, State)), + disconnect(NewState#state{active = undefined}, false); +handle_info({TimeoutTag, Ref}, State) + when TimeoutTag == q_timeout; TimeoutTag == req_timeout -> + {noreply, remove_queued_request(Ref, State)}; handle_info(reconnect, State) -> case connect(State) of {ok, NewState} -> @@ -1394,7 +1405,7 @@ handle_info(reconnect, State) -> {error, Reason} -> %% Update the failed count and reschedule a reconnection NewState = State#state{failed = orddict:update_counter(Reason, 1, State#state.failed)}, - disconnect(NewState) + disconnect(NewState, true) end; handle_info(_, State) -> {noreply, State}. @@ -1982,6 +1993,8 @@ create_req_timer(infinity, _Ref) -> undefined; create_req_timer(undefined, _Ref) -> undefined; +create_req_timer({Msecs,_}, Ref) -> + erlang:send_after(Msecs, self(), {q_timeout, Ref}); create_req_timer(Msecs, Ref) -> erlang:send_after(Msecs, self(), {req_timeout, Ref}). @@ -2089,7 +2102,7 @@ start_auth(State=#state{credentials={User,Pass}, sock=Sock}) -> %% @private %% Disconnect socket if connected -disconnect(State) -> +disconnect(State, DelayReconnect) -> %% Tell any pending requests we've disconnected _ = case State#state.active of undefined -> @@ -2109,12 +2122,15 @@ disconnect(State) -> %% Decide whether to reconnect or exit NewState = State#state{sock = undefined, active = undefined}, - case State#state.auto_reconnect of - true -> + case {State#state.auto_reconnect, DelayReconnect} of + {true, true} -> %% Schedule the reconnect message and return state erlang:send_after(State#state.reconnect_interval, self(), reconnect), {noreply, increase_reconnect_interval(NewState)}; - false -> + {true, false} -> + self() ! reconnect, + {noreply, NewState}; + {false, _} -> {stop, disconnected, NewState} end. @@ -2130,16 +2146,27 @@ increase_reconnect_interval(State) -> %% Send a request to the server and prepare the state for the response %% @private -send_request(Request0, State) when State#state.active =:= undefined -> - {Request, Pkt} = encode_request_message(Request0), + +send_request(#request{ref = Ref, + tref = TRef, + timeout = Timeout} = Request0, State) + when State#state.active =:= undefined -> + {Request1, Pkt} = encode_request_message(Request0), Transport = State#state.transport, case Transport:send(State#state.sock, Pkt) of ok -> - maybe_reply(after_send(Request, State#state{active = Request})); + case Timeout of + {_,Msecs} -> + cancel_req_timer(TRef), + Request2 = Request1#request{tref = erlang:send_after(Msecs, self(), {op_timeout, Ref})}, + maybe_reply(after_send(Request2, State#state{active = Request2})); + _ -> + maybe_reply(after_send(Request1, State#state{active = Request1})) + end; {error, Reason} -> error_logger:warning_msg("Socket error while sending riakc request: ~p.", [Reason]), Transport:close(State#state.sock), - maybe_enqueue_and_reconnect(Request, State#state{sock=undefined}) + maybe_enqueue_and_reconnect(Request1, State#state{sock=undefined}) end. %% Already encoded (for tunneled messages), but must provide Message Id @@ -2164,37 +2191,44 @@ maybe_reconnect(_) -> ok. %% If we can queue while disconnected, do so, otherwise tell the %% caller that the socket was disconnected. enqueue_or_reply_error(Request, #state{queue_if_disconnected=true}=State) -> - queue_request(Request, State); + case Request#request.timeout of + {_,_} -> send_caller({error, timeout}, Request); % we've already used part of the op timeout + _ -> queue_request_head(Request, State) + end; enqueue_or_reply_error(Request, State) -> _ = send_caller({error, disconnected}, Request), State. %% Queue up a request if one is pending %% @private -queue_request(Request, State) -> - State#state{queue = queue:in(Request, State#state.queue)}. +queue_request(Request, State) -> queue_request(Request, State, in). +queue_request_head(Request, State) -> queue_request(Request, State, in_r). +queue_request(Request, #state{queue_len = QLen, queue = Q} = State, Infunc) -> + State#state{queue_len = QLen + 1, queue = queue:Infunc(Request, Q)}. %% Try and dequeue request and send onto the server if one is waiting %% @private -dequeue_request(State) -> +dequeue_request(#state{queue_len = QLen} = State) -> case queue:out(State#state.queue) of {empty, _} -> - State; + State#state{active = undefined}; {{value, Request}, Q2} -> - send_request(Request, State#state{queue = Q2}) + send_request(Request, State#state{active = undefined, + queue_len = QLen - 1, + queue = Q2}) end. %% Remove a queued request by reference - returns same queue if ref not present %% @private -remove_queued_request(Ref, State) -> - L = queue:to_list(State#state.queue), - case lists:keytake(Ref, #request.ref, L) of +remove_queued_request(Ref, #state{queue_len = QLen, queue = Q} = State) -> + case lists:keytake(Ref, #request.ref, queue:to_list(Q)) of false -> % Ref not queued up State; {value, Req, L2} -> {reply, Reply, NewState} = on_timeout(Req, State), _ = send_caller(Reply, Req), - NewState#state{queue = queue:from_list(L2)} + NewState#state{queue_len = QLen - 1, + queue = queue:from_list(L2)} end. %% @private @@ -3921,4 +3955,200 @@ live_node_tests() -> end)} ]. +timeout_no_conn_test() -> + % test req & {q, op} timeouts when there's no connection + + % test requests have to be spawned, so there's some variance in the reponse times + % this makes it difficult to decide on the acceptable tolerance for the pass/fail decision + % so the test is variable, it may fail the first time but try it again + + {ok, Pid} = start_link(test_ip(), 65225, [auto_reconnect, queue_if_disconnected]), + Self = self(), + REQ = fun(Func, TO) -> + erlang:spawn(fun() -> + {T,Info} = (catch timer:tc(?MODULE, Func, + [Pid, <<"qwer">>, <<"qwer">>, [], TO])), + Self ! {self(), {T div 1000, Info}} + end) + end, + + RES = fun(RPid) -> + receive + {RPid, Result} -> Result + after + 3000 -> error + end + end, + + P01 = REQ(get, {150,10}), timer:sleep(1), % sleep to ensure that spawned requests + P02 = REQ(get, {100,10}), timer:sleep(1), % are actually sent in this order + P03 = REQ(get, {150,10}), timer:sleep(1), + P04 = REQ(get, {100,10}), timer:sleep(1), + P05 = REQ(get, {150,10}), timer:sleep(1), + P06 = REQ(get, {100,10}), timer:sleep(1), + P07 = REQ(get, {150,10}), timer:sleep(1), + P08 = REQ(get, {100,10}), timer:sleep(1), + P09 = REQ(get, {150,10}), timer:sleep(1), + P10 = REQ(get, 20), timer:sleep(1), + P11 = REQ(get, 40), timer:sleep(1), + P12 = REQ(get, 60), timer:sleep(1), + P13 = REQ(get, 80), timer:sleep(1), + P14 = REQ(get, 20), timer:sleep(1), + P15 = REQ(get, 100), timer:sleep(250), 0 = queue_len(Pid), + P16 = REQ(get, {20,100}), timer:sleep(1), + P17 = REQ(get, {20,100}), + + {T01, {error, timeout}} = RES(P01), + {T02, {error, timeout}} = RES(P02), + {T03, {error, timeout}} = RES(P03), + {T04, {error, timeout}} = RES(P04), + {T05, {error, timeout}} = RES(P05), + {T06, {error, timeout}} = RES(P06), + {T07, {error, timeout}} = RES(P07), + {T08, {error, timeout}} = RES(P08), + {T09, {error, timeout}} = RES(P09), + {T10, {error, timeout}} = RES(P10), + {T11, {error, timeout}} = RES(P11), + {T12, {error, timeout}} = RES(P12), + {T13, {error, timeout}} = RES(P13), + {T14, {error, timeout}} = RES(P14), + {T15, {error, timeout}} = RES(P15), + {T16, {error, timeout}} = RES(P16), + {T17, {error, timeout}} = RES(P17), + + % All these requests should spend ~150ms in the queue + io:format(user, "150ms TIMES: ~p ~p ~p ~p ~p~n", [T01,T03,T05,T07,T09]), + lists:foreach(fun(T) -> true = T > 145, true = T < 159 end, [T01,T03,T05,T07,T09]), + + % All these requests should spend ~100ms in the queue + io:format(user, "100ms TIMES: ~p ~p ~p ~p~n", [T02,T04,T06,T08]), + lists:foreach(fun(T) -> true = T > 95, true = T < 109 end, [T02,T04,T06,T08]), + + % These test the old timeouts + io:format(user, "TIMES: ~p ~p ~p ~p ~p ~p ~p ~p~n", [T10,T11,T12,T13,T14,T15,T16,T17]), + true = T10 > 18, true = T10 < 25, + true = T11 > 37, true = T11 < 45, + true = T12 > 55, true = T12 < 65, + true = T13 > 73, true = T13 < 85, + true = T14 > 18, true = T14 < 25, + true = T15 > 98, true = T15 < 105, + + % These 2 requests should spend ~20ms in the queue + true = T16 > 18, true = T16 < 25, + true = T17 > 18, true = T17 < 25, + + stop(Pid). + +timeout_conn_test() -> + % test req & {q, op} timeouts when there is a connection that never responds + + % test requests have to be spawned, so there's some variance in the reponse times + % this makes it difficult to decide on the acceptable tolerance for the pass/fail decision + % so the test is variable, it may fail the first time but try it again + + % Set up a dummy socket to send requests on + {ok, DummyServerPid, Port} = dummy_server(), + {ok, Pid} = start("127.0.0.1", Port, [auto_reconnect, queue_if_disconnected]), + erlang:monitor(process, DummyServerPid), + Self = self(), + REQ = fun(Func, TO) -> + erlang:spawn(fun() -> + {T,Info} = (catch timer:tc(?MODULE, Func, + [Pid, <<"qwer">>, <<"qwer">>, [], TO])), + Self ! {self(), {T div 1000, Info}} + end) + end, + + RES = fun(RPid) -> + receive + {RPid, Result} -> Result + after + 3000 -> error + end + end, + + P01 = REQ(get, {100, 20}), timer:sleep(1), % sleep to ensure that spawned requests + P02 = REQ(get, {100, 20}), timer:sleep(1), % are actually sent in this order + P03 = REQ(get, {100, 20}), timer:sleep(1), + P04 = REQ(get, {100, 20}), timer:sleep(1), + P05 = REQ(get, {100, 20}), timer:sleep(1), + P06 = REQ(get, {100, 20}), timer:sleep(1), + P07 = REQ(get, {100, 20}), timer:sleep(1), + P08 = REQ(get, {100, 20}), timer:sleep(1), + P09 = REQ(get, {100, 20}), timer:sleep(1), + P10 = REQ(get, {100, 20}), timer:sleep(1), + P11 = REQ(get, 20), timer:sleep(1), + P12 = REQ(get, 40), timer:sleep(1), + P13 = REQ(get, 60), timer:sleep(1), + P14 = REQ(get, 80), timer:sleep(1), + P15 = REQ(get, 20), timer:sleep(1), + P16 = REQ(get, 100), timer:sleep(200), 0 = queue_len(Pid), + P17 = REQ(get, {20, 100}), timer:sleep(1), + P18 = REQ(get, {20, 100}), + + {T01, {error, timeout}} = RES(P01), + {T02, {error, timeout}} = RES(P02), + {T03, {error, timeout}} = RES(P03), + {T04, {error, timeout}} = RES(P04), + {T05, {error, timeout}} = RES(P05), + {T06, {error, timeout}} = RES(P06), + {T07, {error, timeout}} = RES(P07), + {T08, {error, timeout}} = RES(P08), + {T09, {error, timeout}} = RES(P09), + {T10, {error, timeout}} = RES(P10), + {T11, {error, timeout}} = RES(P11), + {T12, {error, timeout}} = RES(P12), + {T13, {error, timeout}} = RES(P13), + {T14, {error, timeout}} = RES(P14), + {T15, {error, timeout}} = RES(P15), + {T16, {error, timeout}} = RES(P16), + {T17, {error, timeout}} = RES(P17), + {T18, {error, timeout}} = RES(P18), + + io:format(user, "TIMES: ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p~n", + [T01,T02,T03,T04,T05,T06,T07,T08,T09,T10,T11,T12,T13,T14,T15,T16,T17,T18]), + true = T01 > 18, true = T01 < 26, % This one is serviced right away & should timeout after 20ms + true = T02 > 37, true = T02 < 49, % This one is serviced after the 1st one has timed out, ~40ms + true = T03 > 55, true = T03 < 70, % This one is serviced after the 1st two have timed out, ~60ms + true = T04 > 73, true = T04 < 90, % This one is serviced after the 1st three have timed out, ~80ms + true = T05 > 91, true = T05 < 117, % This one might timeout inthe q, or might be serviced + + % All these will timeout in the queue + lists:foreach(fun(T) -> true = T > 97, true = T < 125 end, [T06,T07,T08,T09,T10]), + + % These test for backward compatibility + true = T11 > 17, true = T11 < 25, + true = T12 > 37, true = T12 < 48, + true = T13 > 57, true = T13 < 70, + true = T14 > 77, true = T14 < 90, + true = T15 > 17, true = T15 < 26, + true = T16 > 97, true = T16 < 106, + + true = T17 > 97, true = T17 < 106, % This one will be serviced right away & timeout after ~100ms + true = T18 > 17, true = T18 < 26, % This one will timeout in the queue waiting for the previous one ~20ms + + catch DummyServerPid ! stop, + timer:sleep(10), + receive _Msg -> ok + after 1 -> ok + end, + + stop(Pid). + +dummy_server() -> + {ok, Listen} = gen_tcp:listen(0, [binary, {packet, 4}, {active, true}]), + {ok, Port} = inet:port(Listen), + Pid = spawn(?MODULE, dummy_server, [{Listen, no_conn}]), + {ok, Pid, Port}. + +dummy_server({Listen, no_conn}) -> + {ok, Sock} = gen_tcp:accept(Listen), + dummy_server({Listen, Sock}); +dummy_server({Listen, Sock}) -> + receive + stop -> ok; + {tcp_closed, Sock} -> dummy_server({Listen, no_conn}); + _Data -> dummy_server({Listen, Sock}) % ignore requests, let them timeout + end. + -endif. From e55e01d11001cdee404cf7830d5743e230f0c4e9 Mon Sep 17 00:00:00 2001 From: seanmcevoy Date: Mon, 16 Nov 2015 21:48:12 +0000 Subject: [PATCH 2/4] tweaked timeout tests. --- src/riakc_pb_socket.erl | 135 ++++++++++++++++++++-------------------- 1 file changed, 69 insertions(+), 66 deletions(-) diff --git a/src/riakc_pb_socket.erl b/src/riakc_pb_socket.erl index aa0351f1..7cdecfa5 100644 --- a/src/riakc_pb_socket.erl +++ b/src/riakc_pb_socket.erl @@ -3980,23 +3980,23 @@ timeout_no_conn_test() -> end end, - P01 = REQ(get, {150,10}), timer:sleep(1), % sleep to ensure that spawned requests - P02 = REQ(get, {100,10}), timer:sleep(1), % are actually sent in this order - P03 = REQ(get, {150,10}), timer:sleep(1), - P04 = REQ(get, {100,10}), timer:sleep(1), - P05 = REQ(get, {150,10}), timer:sleep(1), - P06 = REQ(get, {100,10}), timer:sleep(1), - P07 = REQ(get, {150,10}), timer:sleep(1), - P08 = REQ(get, {100,10}), timer:sleep(1), - P09 = REQ(get, {150,10}), timer:sleep(1), - P10 = REQ(get, 20), timer:sleep(1), - P11 = REQ(get, 40), timer:sleep(1), - P12 = REQ(get, 60), timer:sleep(1), - P13 = REQ(get, 80), timer:sleep(1), - P14 = REQ(get, 20), timer:sleep(1), - P15 = REQ(get, 100), timer:sleep(250), 0 = queue_len(Pid), - P16 = REQ(get, {20,100}), timer:sleep(1), - P17 = REQ(get, {20,100}), + P01 = REQ(get, {1500,100}), timer:sleep(1), % sleep to ensure that spawned requests + P02 = REQ(get, {1000,100}), timer:sleep(1), % are actually sent in this order + P03 = REQ(get, {1500,100}), timer:sleep(1), + P04 = REQ(get, {1000,100}), timer:sleep(1), + P05 = REQ(get, {1500,100}), timer:sleep(1), + P06 = REQ(get, {1000,100}), timer:sleep(1), + P07 = REQ(get, {1500,100}), timer:sleep(1), + P08 = REQ(get, {1000,100}), timer:sleep(1), + P09 = REQ(get, {1500,100}), timer:sleep(1), + P10 = REQ(get, 200), timer:sleep(1), + P11 = REQ(get, 400), timer:sleep(1), + P12 = REQ(get, 600), timer:sleep(1), + P13 = REQ(get, 800), timer:sleep(1), + P14 = REQ(get, 200), timer:sleep(1), + P15 = REQ(get, 1000), timer:sleep(2500), 0 = queue_len(Pid), + P16 = REQ(get, {200,1000}), timer:sleep(1), + P17 = REQ(get, {200,1000}), {T01, {error, timeout}} = RES(P01), {T02, {error, timeout}} = RES(P02), @@ -4016,26 +4016,26 @@ timeout_no_conn_test() -> {T16, {error, timeout}} = RES(P16), {T17, {error, timeout}} = RES(P17), - % All these requests should spend ~150ms in the queue + % All these requests should spend ~1500ms in the queue io:format(user, "150ms TIMES: ~p ~p ~p ~p ~p~n", [T01,T03,T05,T07,T09]), - lists:foreach(fun(T) -> true = T > 145, true = T < 159 end, [T01,T03,T05,T07,T09]), + lists:foreach(fun(T) -> true = T > 1490, true = T < 1510 end, [T01,T03,T05,T07,T09]), - % All these requests should spend ~100ms in the queue + % All these requests should spend ~1000ms in the queue io:format(user, "100ms TIMES: ~p ~p ~p ~p~n", [T02,T04,T06,T08]), - lists:foreach(fun(T) -> true = T > 95, true = T < 109 end, [T02,T04,T06,T08]), + lists:foreach(fun(T) -> true = T > 990, true = T < 1010 end, [T02,T04,T06,T08]), % These test the old timeouts io:format(user, "TIMES: ~p ~p ~p ~p ~p ~p ~p ~p~n", [T10,T11,T12,T13,T14,T15,T16,T17]), - true = T10 > 18, true = T10 < 25, - true = T11 > 37, true = T11 < 45, - true = T12 > 55, true = T12 < 65, - true = T13 > 73, true = T13 < 85, - true = T14 > 18, true = T14 < 25, - true = T15 > 98, true = T15 < 105, + true = T10 > 190, true = T10 < 210, + true = T11 > 390, true = T11 < 410, + true = T12 > 590, true = T12 < 610, + true = T13 > 790, true = T13 < 810, + true = T14 > 190, true = T14 < 210, + true = T15 > 990, true = T15 < 1010, - % These 2 requests should spend ~20ms in the queue - true = T16 > 18, true = T16 < 25, - true = T17 > 18, true = T17 < 25, + % These 2 requests should spend ~200ms in the queue + true = T16 > 190, true = T16 < 210, + true = T17 > 190, true = T17 < 210, stop(Pid). @@ -4047,7 +4047,7 @@ timeout_conn_test() -> % so the test is variable, it may fail the first time but try it again % Set up a dummy socket to send requests on - {ok, DummyServerPid, Port} = dummy_server(), + {ok, DummyServerPid, Port} = dummy_server(noreply), {ok, Pid} = start("127.0.0.1", Port, [auto_reconnect, queue_if_disconnected]), erlang:monitor(process, DummyServerPid), Self = self(), @@ -4067,24 +4067,24 @@ timeout_conn_test() -> end end, - P01 = REQ(get, {100, 20}), timer:sleep(1), % sleep to ensure that spawned requests - P02 = REQ(get, {100, 20}), timer:sleep(1), % are actually sent in this order - P03 = REQ(get, {100, 20}), timer:sleep(1), - P04 = REQ(get, {100, 20}), timer:sleep(1), - P05 = REQ(get, {100, 20}), timer:sleep(1), - P06 = REQ(get, {100, 20}), timer:sleep(1), - P07 = REQ(get, {100, 20}), timer:sleep(1), - P08 = REQ(get, {100, 20}), timer:sleep(1), - P09 = REQ(get, {100, 20}), timer:sleep(1), - P10 = REQ(get, {100, 20}), timer:sleep(1), - P11 = REQ(get, 20), timer:sleep(1), - P12 = REQ(get, 40), timer:sleep(1), - P13 = REQ(get, 60), timer:sleep(1), - P14 = REQ(get, 80), timer:sleep(1), - P15 = REQ(get, 20), timer:sleep(1), - P16 = REQ(get, 100), timer:sleep(200), 0 = queue_len(Pid), - P17 = REQ(get, {20, 100}), timer:sleep(1), - P18 = REQ(get, {20, 100}), + P01 = REQ(get, {1000, 200}), timer:sleep(1), % sleep to ensure that spawned requests + P02 = REQ(get, {1000, 200}), timer:sleep(1), % are actually sent in this order + P03 = REQ(get, {1000, 200}), timer:sleep(1), + P04 = REQ(get, {1000, 200}), timer:sleep(1), + P05 = REQ(get, {1000, 200}), timer:sleep(1), + P06 = REQ(get, {1000, 200}), timer:sleep(1), + P07 = REQ(get, {1000, 200}), timer:sleep(1), + P08 = REQ(get, {1000, 200}), timer:sleep(1), + P09 = REQ(get, {1000, 200}), timer:sleep(1), + P10 = REQ(get, {1000, 200}), timer:sleep(1), + P11 = REQ(get, 200), timer:sleep(1), + P12 = REQ(get, 400), timer:sleep(1), + P13 = REQ(get, 600), timer:sleep(1), + P14 = REQ(get, 800), timer:sleep(1), + P15 = REQ(get, 200), timer:sleep(1), + P16 = REQ(get, 1000), timer:sleep(2000), 0 = queue_len(Pid), + P17 = REQ(get, {200, 1000}), timer:sleep(1), + P18 = REQ(get, {200, 1000}), {T01, {error, timeout}} = RES(P01), {T02, {error, timeout}} = RES(P02), @@ -4107,25 +4107,28 @@ timeout_conn_test() -> io:format(user, "TIMES: ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p~n", [T01,T02,T03,T04,T05,T06,T07,T08,T09,T10,T11,T12,T13,T14,T15,T16,T17,T18]), - true = T01 > 18, true = T01 < 26, % This one is serviced right away & should timeout after 20ms - true = T02 > 37, true = T02 < 49, % This one is serviced after the 1st one has timed out, ~40ms - true = T03 > 55, true = T03 < 70, % This one is serviced after the 1st two have timed out, ~60ms - true = T04 > 73, true = T04 < 90, % This one is serviced after the 1st three have timed out, ~80ms - true = T05 > 91, true = T05 < 117, % This one might timeout inthe q, or might be serviced - - % All these will timeout in the queue - lists:foreach(fun(T) -> true = T > 97, true = T < 125 end, [T06,T07,T08,T09,T10]), + true = T01 > 190, true = T01 < 210, % This one is serviced right away & should timeout after 200ms + true = T02 > 390, true = T02 < 410, % This one is serviced after the 1st one has timed out, ~400ms + true = T03 > 590, true = T03 < 610, % This one is serviced after the 1st two have timed out, ~600ms + true = T04 > 790, true = T04 < 810, % This one is serviced after the 1st three have timed out, ~800ms + true = T05 > 990, true = T05 < 1010, % This one is serviced after the 1st four have timed out, ~1000ms + + [HD|TL] = lists:reverse(lists:sort([T06,T07,T08,T09,T10])), + % One will have queued & been serviced, ~1200ms + true = HD > 1190, true = HD < 1210, + % All these will timeout in the queue, ~1000ms + lists:foreach(fun(T) -> true = T > 990, true = T < 1010 end, TL), % These test for backward compatibility - true = T11 > 17, true = T11 < 25, - true = T12 > 37, true = T12 < 48, - true = T13 > 57, true = T13 < 70, - true = T14 > 77, true = T14 < 90, - true = T15 > 17, true = T15 < 26, - true = T16 > 97, true = T16 < 106, - - true = T17 > 97, true = T17 < 106, % This one will be serviced right away & timeout after ~100ms - true = T18 > 17, true = T18 < 26, % This one will timeout in the queue waiting for the previous one ~20ms + true = T11 > 190, true = T11 < 210, + true = T12 > 390, true = T12 < 410, + true = T13 > 590, true = T13 < 610, + true = T14 > 790, true = T14 < 810, + true = T15 > 190, true = T15 < 210, + true = T16 > 990, true = T16 < 1010, + + true = T17 > 990, true = T17 < 1010, % This one will be serviced right away & timeout after ~100ms + true = T18 > 190, true = T18 < 210, % This one will timeout in the queue waiting for the previous one ~20ms catch DummyServerPid ! stop, timer:sleep(10), From c51affbe0254e6bbe1d07c4be64c356fe6beae5f Mon Sep 17 00:00:00 2001 From: seanmcevoy Date: Fri, 20 Nov 2015 15:55:33 +0000 Subject: [PATCH 3/4] added overload demo --- src/riakc_pb_socket.erl | 100 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 93 insertions(+), 7 deletions(-) diff --git a/src/riakc_pb_socket.erl b/src/riakc_pb_socket.erl index 7cdecfa5..2cd9f18e 100644 --- a/src/riakc_pb_socket.erl +++ b/src/riakc_pb_socket.erl @@ -4138,20 +4138,106 @@ timeout_conn_test() -> stop(Pid). -dummy_server() -> +overload_demo_test() -> + {ok, DummyServerPid, Port} = dummy_server({50, <<10>>}), + {ok, Pid} = start("127.0.0.1", Port, [auto_reconnect, queue_if_disconnected]), + timer:sleep(50), + erlang:monitor(process, DummyServerPid), + + Self = self(), + + REQ = fun(Func, TO) -> + erlang:spawn(fun() -> + Info = (catch apply(?MODULE, Func, + [Pid, <<"qwer">>, <<"qwer">>, [], TO])), + Self ! {self(), Info} + end) + end, + + RES = fun(RPid) -> + receive + {RPid, Result} -> Result + after + 4000 -> error + end + end, + + TEST = fun(TO) -> + + PidList = lists:foldl(fun(_,Acc) -> + timer:sleep(45), + [REQ(get, TO) | Acc] + end, [], lists:seq(1,200)), + timer:sleep(100), + + ReplyList = lists:foldl(fun(RPid,Acc) -> + [RES(RPid) | Acc] + end, [], PidList), + + Replies = lists:foldl( + fun({error,Reply},Acc) -> + case lists:keyfind(Reply, 1, Acc) of + false -> [{Reply, 1} | Acc]; + {Reply, C} -> + lists:keyreplace(Reply, 1, Acc, {Reply, C+1}) + end + end, [], ReplyList), + TimeOuts = case lists:keyfind(timeout, 1, Replies) of + {_,TOV} -> TOV; + false -> 0 + end, + NotFounds = case lists:keyfind(notfound, 1, Replies) of + {_,NFV} -> NFV; + false -> 0 + end, + + io:format(user, " With timeout: ~p we got ~p timeouts and ~p replies~n", + [TO, TimeOuts, NotFounds]), + {TimeOuts, NotFounds} + + end, + + {OldTO, OldNF} = TEST(60), + {NewTO, NewNF} = TEST({5,55}), + + % the worst-case response time for both timeout mechanisms is the same, 60ms + % using the old mechanism we typically get 2 or 3 responses + % and the link timesout as long as it remains overloaded + true = OldTO > 190, + true = OldNF < 10, + % but using the new mechanism we get get an almost 66% success rate + % consistent during the overloaded state + true = NewTO < 80, + true = NewNF > 120, + + catch DummyServerPid ! stop, + timer:sleep(10), + receive _Msg -> ok % io:format(user, "MSG: ~p~n", [_Msg]) + after 1 -> ok % io:format(user, "NO MSG: ~p~n", [process_info(DummyServerPid, messages)]) + end, + + stop(Pid). + +dummy_server(Directive) -> {ok, Listen} = gen_tcp:listen(0, [binary, {packet, 4}, {active, true}]), {ok, Port} = inet:port(Listen), - Pid = spawn(?MODULE, dummy_server, [{Listen, no_conn}]), + Pid = spawn(?MODULE, dummy_server_loop, [{Listen, no_conn, Directive}]), {ok, Pid, Port}. -dummy_server({Listen, no_conn}) -> +dummy_server_loop({Listen, no_conn, Directive}) -> {ok, Sock} = gen_tcp:accept(Listen), - dummy_server({Listen, Sock}); -dummy_server({Listen, Sock}) -> + dummy_server_loop({Listen, Sock, Directive}); +dummy_server_loop({Listen, Sock, Directive}) -> receive stop -> ok; - {tcp_closed, Sock} -> dummy_server({Listen, no_conn}); - _Data -> dummy_server({Listen, Sock}) % ignore requests, let them timeout + {tcp_closed, Sock} -> dummy_server_loop({Listen, no_conn, Directive}); + _Data -> + case Directive of + noreply -> dummy_server_loop({Listen, Sock, Directive}); % ignore requests, let them timeout + {SleepMs, Reply} -> + spawn(fun() -> timer:sleep(SleepMs), gen_tcp:send(Sock, Reply) end), + dummy_server_loop({Listen, Sock, Directive}) + end end. -endif. From 785085322916a11feaf7eaecd35b9c66f0b3d2ab Mon Sep 17 00:00:00 2001 From: seanmcevoy Date: Tue, 8 Dec 2015 11:52:59 +0000 Subject: [PATCH 4/4] tweaked timeout test tolerances --- src/riakc_pb_socket.erl | 156 ++++++++++++++++++++-------------------- 1 file changed, 78 insertions(+), 78 deletions(-) diff --git a/src/riakc_pb_socket.erl b/src/riakc_pb_socket.erl index 2cd9f18e..193f8360 100644 --- a/src/riakc_pb_socket.erl +++ b/src/riakc_pb_socket.erl @@ -4018,24 +4018,24 @@ timeout_no_conn_test() -> % All these requests should spend ~1500ms in the queue io:format(user, "150ms TIMES: ~p ~p ~p ~p ~p~n", [T01,T03,T05,T07,T09]), - lists:foreach(fun(T) -> true = T > 1490, true = T < 1510 end, [T01,T03,T05,T07,T09]), + lists:foreach(fun(T) -> true = T > 1490, true = T < 1515 end, [T01,T03,T05,T07,T09]), % All these requests should spend ~1000ms in the queue io:format(user, "100ms TIMES: ~p ~p ~p ~p~n", [T02,T04,T06,T08]), - lists:foreach(fun(T) -> true = T > 990, true = T < 1010 end, [T02,T04,T06,T08]), + lists:foreach(fun(T) -> true = T > 990, true = T < 1015 end, [T02,T04,T06,T08]), % These test the old timeouts io:format(user, "TIMES: ~p ~p ~p ~p ~p ~p ~p ~p~n", [T10,T11,T12,T13,T14,T15,T16,T17]), - true = T10 > 190, true = T10 < 210, - true = T11 > 390, true = T11 < 410, - true = T12 > 590, true = T12 < 610, - true = T13 > 790, true = T13 < 810, - true = T14 > 190, true = T14 < 210, - true = T15 > 990, true = T15 < 1010, + true = T10 > 190, true = T10 < 215, + true = T11 > 390, true = T11 < 415, + true = T12 > 590, true = T12 < 615, + true = T13 > 790, true = T13 < 815, + true = T14 > 190, true = T14 < 215, + true = T15 > 990, true = T15 < 1015, % These 2 requests should spend ~200ms in the queue - true = T16 > 190, true = T16 < 210, - true = T17 > 190, true = T17 < 210, + true = T16 > 190, true = T16 < 215, + true = T17 > 190, true = T17 < 215, stop(Pid). @@ -4105,41 +4105,41 @@ timeout_conn_test() -> {T17, {error, timeout}} = RES(P17), {T18, {error, timeout}} = RES(P18), + catch DummyServerPid ! stop, + timer:sleep(10), + receive _Msg -> ok + after 1 -> ok + end, + io:format(user, "TIMES: ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p~n", [T01,T02,T03,T04,T05,T06,T07,T08,T09,T10,T11,T12,T13,T14,T15,T16,T17,T18]), - true = T01 > 190, true = T01 < 210, % This one is serviced right away & should timeout after 200ms - true = T02 > 390, true = T02 < 410, % This one is serviced after the 1st one has timed out, ~400ms - true = T03 > 590, true = T03 < 610, % This one is serviced after the 1st two have timed out, ~600ms - true = T04 > 790, true = T04 < 810, % This one is serviced after the 1st three have timed out, ~800ms - true = T05 > 990, true = T05 < 1010, % This one is serviced after the 1st four have timed out, ~1000ms + true = T01 > 190, true = T01 < 215, % This one is serviced right away & should timeout after 200ms + true = T02 > 390, true = T02 < 415, % This one is serviced after the 1st one has timed out, ~400ms + true = T03 > 590, true = T03 < 615, % This one is serviced after the 1st two have timed out, ~600ms + true = T04 > 790, true = T04 < 815, % This one is serviced after the 1st three have timed out, ~800ms + true = T05 > 990, true = T05 < 1015, % This one is serviced after the 1st four have timed out, ~1000ms [HD|TL] = lists:reverse(lists:sort([T06,T07,T08,T09,T10])), % One will have queued & been serviced, ~1200ms - true = HD > 1190, true = HD < 1210, + true = HD > 1190, true = HD < 1215, % All these will timeout in the queue, ~1000ms - lists:foreach(fun(T) -> true = T > 990, true = T < 1010 end, TL), + lists:foreach(fun(T) -> true = T > 990, true = T < 1015 end, TL), % These test for backward compatibility - true = T11 > 190, true = T11 < 210, - true = T12 > 390, true = T12 < 410, - true = T13 > 590, true = T13 < 610, - true = T14 > 790, true = T14 < 810, - true = T15 > 190, true = T15 < 210, - true = T16 > 990, true = T16 < 1010, + true = T11 > 190, true = T11 < 215, + true = T12 > 390, true = T12 < 415, + true = T13 > 590, true = T13 < 615, + true = T14 > 790, true = T14 < 815, + true = T15 > 190, true = T15 < 215, + true = T16 > 990, true = T16 < 1015, - true = T17 > 990, true = T17 < 1010, % This one will be serviced right away & timeout after ~100ms - true = T18 > 190, true = T18 < 210, % This one will timeout in the queue waiting for the previous one ~20ms - - catch DummyServerPid ! stop, - timer:sleep(10), - receive _Msg -> ok - after 1 -> ok - end, + true = T17 > 990, true = T17 < 1015, % This one will be serviced right away & timeout after ~100ms + true = T18 > 190, true = T18 < 215, % This one will timeout in the queue waiting for the previous one ~20ms stop(Pid). overload_demo_test() -> - {ok, DummyServerPid, Port} = dummy_server({50, <<10>>}), + {ok, DummyServerPid, Port} = dummy_server({5, <<10>>}), {ok, Pid} = start("127.0.0.1", Port, [auto_reconnect, queue_if_disconnected]), timer:sleep(50), erlang:monitor(process, DummyServerPid), @@ -4164,58 +4164,58 @@ overload_demo_test() -> TEST = fun(TO) -> - PidList = lists:foldl(fun(_,Acc) -> - timer:sleep(45), - [REQ(get, TO) | Acc] - end, [], lists:seq(1,200)), - timer:sleep(100), - - ReplyList = lists:foldl(fun(RPid,Acc) -> - [RES(RPid) | Acc] - end, [], PidList), - - Replies = lists:foldl( - fun({error,Reply},Acc) -> - case lists:keyfind(Reply, 1, Acc) of - false -> [{Reply, 1} | Acc]; - {Reply, C} -> - lists:keyreplace(Reply, 1, Acc, {Reply, C+1}) - end - end, [], ReplyList), - TimeOuts = case lists:keyfind(timeout, 1, Replies) of - {_,TOV} -> TOV; - false -> 0 - end, - NotFounds = case lists:keyfind(notfound, 1, Replies) of - {_,NFV} -> NFV; - false -> 0 - end, - - io:format(user, " With timeout: ~p we got ~p timeouts and ~p replies~n", - [TO, TimeOuts, NotFounds]), - {TimeOuts, NotFounds} - - end, - - {OldTO, OldNF} = TEST(60), - {NewTO, NewNF} = TEST({5,55}), - - % the worst-case response time for both timeout mechanisms is the same, 60ms - % using the old mechanism we typically get 2 or 3 responses - % and the link timesout as long as it remains overloaded - true = OldTO > 190, - true = OldNF < 10, - % but using the new mechanism we get get an almost 66% success rate - % consistent during the overloaded state - true = NewTO < 80, - true = NewNF > 120, - + PidList = lists:foldl(fun(_,Acc) -> + timer:sleep(4), + [REQ(get, TO) | Acc] + end, [], lists:seq(1,200)), + timer:sleep(20), + + ReplyList = lists:foldl(fun(RPid,Acc) -> + [RES(RPid) | Acc] + end, [], PidList), + + Replies = lists:foldl( + fun({error,Reply},Acc) -> + case lists:keyfind(Reply, 1, Acc) of + false -> [{Reply, 1} | Acc]; + {Reply, C} -> + lists:keyreplace(Reply, 1, Acc, {Reply, C+1}) + end + end, [], ReplyList), + TimeOuts = case lists:keyfind(timeout, 1, Replies) of + {_,TOV} -> TOV; + false -> 0 + end, + NotFounds = case lists:keyfind(notfound, 1, Replies) of + {_,NFV} -> NFV; + false -> 0 + end, + + io:format(user, " With timeout: ~p we got ~p timeouts and ~p replies~n", + [TO, TimeOuts, NotFounds]), + {TimeOuts, NotFounds} + + end, + + {OldTO, OldNF} = TEST(9), + {NewTO, NewNF} = TEST({1,8}), + catch DummyServerPid ! stop, timer:sleep(10), receive _Msg -> ok % io:format(user, "MSG: ~p~n", [_Msg]) after 1 -> ok % io:format(user, "NO MSG: ~p~n", [process_info(DummyServerPid, messages)]) end, + % the worst-case response time for both timeout mechanisms is the same, 9ms + % using the old mechanism we typically get ~10 responses + % and the rest timeout as long as the link remains overloaded + true = OldTO > 170, + true = OldNF < 30, + % but using the new mechanism we get get an almost 66% success rate + % consistent during the overloaded state + true = NewTO < 80, + true = NewNF > 120, + stop(Pid). dummy_server(Directive) ->