Skip to content

Commit

Permalink
catch noproc instead of doing alive check
Browse files Browse the repository at this point in the history
the alive check does not work because a race condition still exists.
  • Loading branch information
fmcgeough committed Dec 9, 2024
1 parent 90e9aca commit 18c0dab
Showing 1 changed file with 11 additions and 16 deletions.
27 changes: 11 additions & 16 deletions src/brod_group_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@
{lo_cmd_stabilize, AttemptCount, Reason}).

-define(INITIAL_MEMBER_ID, <<>>).
-define(CALL_MEMBER(MemberPid, EXPR),
try
EXPR
catch
exit:{noproc, {gen_server, call, [MemberPid | _]}} ->
exit({shutdown, member_down})
end).

-type config() :: brod:group_config().
-type ts() :: erlang:timestamp().
Expand Down Expand Up @@ -462,15 +469,6 @@ terminate(Reason, #state{ connection = Connection

%%%_* Internal Functions =======================================================

% Exit the process if MemberPid is set to pid but it's not
% currently alive
-spec ensure_member_pid_alive(pid()) -> ok.
ensure_member_pid_alive(MemberPid) ->
case brod_utils:is_pid_alive(MemberPid) of
true -> ok;
false -> exit(member_pid_shutdown)
end.

-spec discover_coordinator(state()) -> {ok, state()}.
discover_coordinator(#state{ client = Client
, connection = Connection0
Expand Down Expand Up @@ -508,8 +506,7 @@ stabilize(#state{ rejoin_delay_seconds = RejoinDelaySeconds
log(State0, info, "re-joining group, reason:~p", [Reason]),

%% 1. unsubscribe all currently assigned partitions
ok = ensure_member_pid_alive(MemberPid),
ok = MemberModule:assignments_revoked(MemberPid),
?CALL_MEMBER(MemberPid, MemberModule:assignments_revoked(MemberPid)),

%% 2. some brod_group_member implementations may wait for messages
%% to finish processing when assignments_revoked is called.
Expand Down Expand Up @@ -684,9 +681,8 @@ sync_group(#state{ groupId = GroupId
%% get my partition assignments
Assignment = kpro:find(assignment, RspBody),
TopicAssignments = get_topic_assignments(State, Assignment),
ok = ensure_member_pid_alive(MemberPid),
ok = MemberModule:assignments_received(MemberPid, MemberId,
GenerationId, TopicAssignments),
?CALL_MEMBER(MemberPid,
MemberModule:assignments_received(MemberPid, MemberId, GenerationId, TopicAssignments)),
NewState = State#state{is_in_group = true},
log(NewState, info, "assignments received:~s",
[format_assignments(TopicAssignments)]),
Expand Down Expand Up @@ -845,8 +841,7 @@ assign_partitions(State) when ?IS_LEADER(State) ->
Assignments =
case Strategy =:= callback_implemented of
true ->
ok = ensure_member_pid_alive(MemberPid),
MemberModule:assign_partitions(MemberPid, Members, AllPartitions);
?CALL_MEMBER(MemberPid, MemberModule:assign_partitions(MemberPid, Members, AllPartitions));
false ->
do_assign_partitions(Strategy, Members, AllPartitions)
end,
Expand Down

0 comments on commit 18c0dab

Please sign in to comment.