Skip to content

Commit

Permalink
Merge pull request #11 from enjolras1205/master
Browse files Browse the repository at this point in the history
fix watcher recv packet bug
  • Loading branch information
zhongwencool authored Jan 9, 2020
2 parents 2e9997d + 5baf9c9 commit 038000e
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 6 deletions.
4 changes: 2 additions & 2 deletions src/eetcd_http2_keeper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ connect(State = #state{cluster = Cluster}, RetryN, Errors) ->
{ok, http2} ->
case whereis(?ETCD_HTTP2_CLIENT) of
% sync close old conn if it exist
Pid when is_pid(Pid) ->
gun:close(Pid);
OldPid when is_pid(OldPid) ->
gun:close(OldPid);
_ -> ok
end,
true = register(?ETCD_HTTP2_CLIENT, Pid),
Expand Down
25 changes: 23 additions & 2 deletions src/eetcd_watch_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

-include("eetcd.hrl").

-record(state, {stream_ref, client_ref, client_pid, watch_id, callback, ignore_create = true, ignore_cancel = true}).
-record(state, {stream_ref, client_ref, client_pid, watch_id, callback, data, ignore_create = true, ignore_cancel = true}).

%%%===================================================================
%%% API
Expand All @@ -34,6 +34,7 @@ init([Request, Callback, Options]) ->
client_ref = ClientRef,
client_pid = Pid,
callback = Callback,
data = <<>>,
ignore_create = proplists:get_value(ignore_create, Options, true),
ignore_cancel = proplists:get_value(ignore_cancel, Options, true)
}}.
Expand All @@ -57,7 +58,7 @@ handle_info({gun_response, _Pid, Ref, nofin, 200, _Headers}, State = #state{stre
{noreply, State};

handle_info({gun_data, _Pid, Ref, nofin, Data}, State = #state{stream_ref = Ref}) ->
handle_change_event(State, Data);
handle_gun_data(State, Data);

handle_info({gun_error, Pid, StreamRef, Reason}, State =
#state{callback = Callback, watch_id = WatchId}) ->
Expand Down Expand Up @@ -132,3 +133,23 @@ run_callback(Callback, Resp) ->

info_watcher_owner_terminate(#state{callback = Callback}, Reason) ->
run_callback(Callback, {eetcd_watcher_exit, self(), Reason}).

handle_gun_data(State = #state{data = OldData}, NewData) when byte_size(NewData) + byte_size(OldData) > 5 ->
CurrentData = <<OldData/binary, NewData/binary>>,
<<Compact:8, Length:32, Binary/binary>> = CurrentData,
if
byte_size(Binary) < Length ->
%% current data not completion, wait for next gun_data msg
{noreply, State#state{data = CurrentData}};
true ->
<<OnePacket:Length/binary, LeftBinary/binary>> = Binary,
NewState = State#state{data = LeftBinary},
case handle_change_event(NewState, <<Compact:8, Length:32, OnePacket/binary>>) of
{stop, _, _} = Resp ->
Resp;
{noreply, NewState1} ->
handle_gun_data(NewState1, <<>>)
end
end;
handle_gun_data(State = #state{data = OldData}, NewData) ->
{noreply, State#state{data = <<OldData/binary, NewData/binary>>}}.
24 changes: 22 additions & 2 deletions test/eetcd_watch_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

-export([watch_one_key/1, watch_multi_keys/1,
watch_with_start_revision/1, watch_with_filters/1, watch_with_create_cancel_event/1,
watch_with_prev_kv/1, watch_with_watch_id/1]).
watch_with_prev_kv/1, watch_with_watch_id/1, watch_with_huge_value/1]).

-include("router_pb.hrl").

Expand All @@ -16,7 +16,7 @@ suite() ->
all() ->
[
watch_one_key, watch_multi_keys, watch_with_start_revision, watch_with_filters,
watch_with_prev_kv, watch_with_watch_id, watch_with_create_cancel_event
watch_with_prev_kv, watch_with_watch_id, watch_with_create_cancel_event, watch_with_huge_value
].

groups() ->
Expand Down Expand Up @@ -211,6 +211,26 @@ watch_with_watch_id(_Config) ->
ok = eetcd:unwatch(WatchPid2),
ok.

%% watch large value data
watch_with_huge_value(_Config) ->
Key = <<"etcd_key">>,
Pid = self(),
Callback = fun(Res) -> erlang:send(Pid, Res) end,
{ok, WatchPid} = eetcd:watch(#'Etcd.WatchCreateRequest'{key = Key}, Callback),
watch_loop([233333, 1, 13, 99, 122, 1222, 40000, 12345, 67890, 999999, 2, 3, 4, 5, 33, 57, 157, 999, 99999], Key),

ok = eetcd:unwatch(WatchPid),
ok.

watch_loop([], _) ->
ok;
watch_loop([Head | Tail], Key) ->
Value = list_to_binary([100 || _ <- lists:seq(1, Head)]),
eetcd_kv:put(#'Etcd.PutRequest'{key = Key, value = Value}),
#'Etcd.WatchResponse'{events = [#'mvccpb.Event'{type = 'PUT',
kv = #'mvccpb.KeyValue'{key = Key, value = Value}}]} = flush(),
watch_loop(Tail, Key).

%% fragment enables splitting large revisions into multiple watch responses.
%%watch_with_fragment(_Config) ->
%% ok.
Expand Down

0 comments on commit 038000e

Please sign in to comment.