Skip to content

Commit

Permalink
fix(reconnect): handle more event where could do reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
qzhuyan committed Nov 17, 2023
1 parent 0bb6f36 commit 49afd0f
Showing 1 changed file with 15 additions and 4 deletions.
19 changes: 15 additions & 4 deletions src/emqtt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1122,7 +1122,7 @@ connected({call, From}, clientid, #state{clientid = ClientId}) ->
connected({call, From}, {subscribe, Properties, Topics}, State) ->
connected({call, From}, {subscribe, default_via(State), Properties, Topics}, State);
connected({call, From}, SubReq = {subscribe, Via0, Properties, Topics},
State = #state{last_packet_id = PacketId, subscriptions = Subscriptions}) ->
State = #state{reconnect = Re, last_packet_id = PacketId, subscriptions = Subscriptions}) ->
{Via, State1} = maybe_new_stream(Via0, State),
case send(Via, ?SUBSCRIBE_PACKET(PacketId, Properties, Topics), State1) of
{ok, NewState} ->
Expand All @@ -1132,6 +1132,8 @@ connected({call, From}, SubReq = {subscribe, Via0, Properties, Topics},
maps:put(Topic, Opts, Acc)
end, Subscriptions, Topics),
{keep_state, ensure_ack_timer(add_call(Call,NewState#state{subscriptions = Subscriptions1}))};
Error = {error, Reason} when ?NEED_RECONNECT(Re) ->
next_reconnect(State, [{reply, From, Error}]);
Error = {error, Reason} ->
{stop_and_reply, Reason, [{reply, From, Error}]}
end;
Expand Down Expand Up @@ -1435,9 +1437,13 @@ handle_event(info, {Closed, _Sock}, connected, #state{ reconnect = Re} = State)
when ?SOCK_CLOSED(Closed) andalso ?NEED_RECONNECT(Re) ->
next_reconnect(State#state{socket = undefined});

handle_event(info, {Closed, _Sock}, waiting_for_connack, #state{ reconnect = Re} = State)
when ?SOCK_CLOSED(Closed) andalso ?NEED_RECONNECT(Re) ->
next_reconnect(State#state{socket = undefined});

handle_event(info, {Closed, Sock}, StateName, State)
when Closed =:= tcp_closed; Closed =:= ssl_closed ->
?LOG(debug, "socket_closed", #{event => Closed, state => StateName, sock => Sock,
?LOG(error, "socket_closed", #{event => Closed, state => StateName, sock => Sock,
inuse => State#state.socket}, State),
{stop, {shutdown, Closed}, State};

Expand Down Expand Up @@ -2087,23 +2093,28 @@ reason_code_name(16#A1) -> subscription_identifiers_not_supported;
reason_code_name(16#A2) -> wildcard_subscriptions_not_supported;
reason_code_name(_Code) -> unknown_error.


next_reconnect(State) ->
next_reconnect(State, []).
next_reconnect(#state{retry_timer = RetryTimer,
keepalive_timer = KeepAliveTimer,
sock_opts = OldSockOpts,
reconnect = Reconnect,
reconnect_timeout = Timeout,
ack_timer = AckTimer,
socket = OldSocket,
conn_mod = ConnMod
} = State) ->
} = State, Actions) ->
ok = close_socket(ConnMod, OldSocket),
ok = cancel_timer(RetryTimer),
ok = cancel_timer(KeepAliveTimer),
ok = cancel_timer(AckTimer),
{next_state, reconnect, State#state{socket = undefined,
sock_opts = proplists:delete(handle, OldSockOpts),
retry_timer = undefined,
keepalive_timer = undefined
},
{state_timeout, Timeout, Reconnect}}.
[{state_timeout, Timeout, Reconnect} | Actions]}.

close_socket(_, undefined) ->
ok;
Expand Down

0 comments on commit 49afd0f

Please sign in to comment.