From cea17ee3d869d03dd1b0adcf7dea712dd77a49bc Mon Sep 17 00:00:00 2001 From: "xinglong.dxl" Date: Wed, 8 Jan 2020 20:32:19 +0800 Subject: [PATCH 1/2] old_pid_never_close_bug --- src/eetcd_http2_keeper.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/eetcd_http2_keeper.erl b/src/eetcd_http2_keeper.erl index b55da07..1eeb578 100644 --- a/src/eetcd_http2_keeper.erl +++ b/src/eetcd_http2_keeper.erl @@ -144,8 +144,8 @@ connect(State = #state{cluster = Cluster}, RetryN, Errors) -> {ok, http2} -> case whereis(?ETCD_HTTP2_CLIENT) of % sync close old conn if it exist - Pid when is_pid(Pid) -> - gun:close(Pid); + OldPid when is_pid(OldPid) -> + gun:close(OldPid); _ -> ok end, true = register(?ETCD_HTTP2_CLIENT, Pid), From 5baf9c9b9db02fc867f1af5426552dcbc75820cd Mon Sep 17 00:00:00 2001 From: "xinglong.dxl" Date: Thu, 9 Jan 2020 18:15:39 +0800 Subject: [PATCH 2/2] watcher recv packet bug --- src/eetcd_watch_worker.erl | 25 +++++++++++++++++++++++-- test/eetcd_watch_SUITE.erl | 24 ++++++++++++++++++++++-- 2 files changed, 45 insertions(+), 4 deletions(-) diff --git a/src/eetcd_watch_worker.erl b/src/eetcd_watch_worker.erl index 25b4f59..9dec64f 100644 --- a/src/eetcd_watch_worker.erl +++ b/src/eetcd_watch_worker.erl @@ -10,7 +10,7 @@ -include("eetcd.hrl"). --record(state, {stream_ref, client_ref, client_pid, watch_id, callback, ignore_create = true, ignore_cancel = true}). +-record(state, {stream_ref, client_ref, client_pid, watch_id, callback, data, ignore_create = true, ignore_cancel = true}). %%%=================================================================== %%% API @@ -34,6 +34,7 @@ init([Request, Callback, Options]) -> client_ref = ClientRef, client_pid = Pid, callback = Callback, + data = <<>>, ignore_create = proplists:get_value(ignore_create, Options, true), ignore_cancel = proplists:get_value(ignore_cancel, Options, true) }}. @@ -57,7 +58,7 @@ handle_info({gun_response, _Pid, Ref, nofin, 200, _Headers}, State = #state{stre {noreply, State}; handle_info({gun_data, _Pid, Ref, nofin, Data}, State = #state{stream_ref = Ref}) -> - handle_change_event(State, Data); + handle_gun_data(State, Data); handle_info({gun_error, Pid, StreamRef, Reason}, State = #state{callback = Callback, watch_id = WatchId}) -> @@ -132,3 +133,23 @@ run_callback(Callback, Resp) -> info_watcher_owner_terminate(#state{callback = Callback}, Reason) -> run_callback(Callback, {eetcd_watcher_exit, self(), Reason}). + +handle_gun_data(State = #state{data = OldData}, NewData) when byte_size(NewData) + byte_size(OldData) > 5 -> + CurrentData = <>, + <> = CurrentData, + if + byte_size(Binary) < Length -> + %% current data not completion, wait for next gun_data msg + {noreply, State#state{data = CurrentData}}; + true -> + <> = Binary, + NewState = State#state{data = LeftBinary}, + case handle_change_event(NewState, <>) of + {stop, _, _} = Resp -> + Resp; + {noreply, NewState1} -> + handle_gun_data(NewState1, <<>>) + end + end; +handle_gun_data(State = #state{data = OldData}, NewData) -> + {noreply, State#state{data = <>}}. diff --git a/test/eetcd_watch_SUITE.erl b/test/eetcd_watch_SUITE.erl index 6789eb0..38882d2 100644 --- a/test/eetcd_watch_SUITE.erl +++ b/test/eetcd_watch_SUITE.erl @@ -6,7 +6,7 @@ -export([watch_one_key/1, watch_multi_keys/1, watch_with_start_revision/1, watch_with_filters/1, watch_with_create_cancel_event/1, - watch_with_prev_kv/1, watch_with_watch_id/1]). + watch_with_prev_kv/1, watch_with_watch_id/1, watch_with_huge_value/1]). -include("router_pb.hrl"). @@ -16,7 +16,7 @@ suite() -> all() -> [ watch_one_key, watch_multi_keys, watch_with_start_revision, watch_with_filters, - watch_with_prev_kv, watch_with_watch_id, watch_with_create_cancel_event + watch_with_prev_kv, watch_with_watch_id, watch_with_create_cancel_event, watch_with_huge_value ]. groups() -> @@ -211,6 +211,26 @@ watch_with_watch_id(_Config) -> ok = eetcd:unwatch(WatchPid2), ok. +%% watch large value data +watch_with_huge_value(_Config) -> + Key = <<"etcd_key">>, + Pid = self(), + Callback = fun(Res) -> erlang:send(Pid, Res) end, + {ok, WatchPid} = eetcd:watch(#'Etcd.WatchCreateRequest'{key = Key}, Callback), + watch_loop([233333, 1, 13, 99, 122, 1222, 40000, 12345, 67890, 999999, 2, 3, 4, 5, 33, 57, 157, 999, 99999], Key), + + ok = eetcd:unwatch(WatchPid), + ok. + +watch_loop([], _) -> + ok; +watch_loop([Head | Tail], Key) -> + Value = list_to_binary([100 || _ <- lists:seq(1, Head)]), + eetcd_kv:put(#'Etcd.PutRequest'{key = Key, value = Value}), + #'Etcd.WatchResponse'{events = [#'mvccpb.Event'{type = 'PUT', + kv = #'mvccpb.KeyValue'{key = Key, value = Value}}]} = flush(), + watch_loop(Tail, Key). + %% fragment enables splitting large revisions into multiple watch responses. %%watch_with_fragment(_Config) -> %% ok.