diff --git a/src/brod_group_coordinator.erl b/src/brod_group_coordinator.erl index e419f6c9..cd7d7e86 100644 --- a/src/brod_group_coordinator.erl +++ b/src/brod_group_coordinator.erl @@ -75,6 +75,13 @@ {lo_cmd_stabilize, AttemptCount, Reason}). -define(INITIAL_MEMBER_ID, <<>>). +-define(CALL_MEMBER(MemberPid, EXPR), + try + EXPR + catch + exit:{noproc, {gen_server, call, [MemberPid | _]}} -> + exit({shutdown, member_down}) + end). -type config() :: brod:group_config(). -type ts() :: erlang:timestamp(). @@ -462,15 +469,6 @@ terminate(Reason, #state{ connection = Connection %%%_* Internal Functions ======================================================= -% Exit the process if MemberPid is set to pid but it's not -% currently alive --spec ensure_member_pid_alive(pid()) -> ok. -ensure_member_pid_alive(MemberPid) -> - case brod_utils:is_pid_alive(MemberPid) of - true -> ok; - false -> exit(member_pid_shutdown) -end. - -spec discover_coordinator(state()) -> {ok, state()}. discover_coordinator(#state{ client = Client , connection = Connection0 @@ -508,8 +506,7 @@ stabilize(#state{ rejoin_delay_seconds = RejoinDelaySeconds log(State0, info, "re-joining group, reason:~p", [Reason]), %% 1. unsubscribe all currently assigned partitions - ok = ensure_member_pid_alive(MemberPid), - ok = MemberModule:assignments_revoked(MemberPid), + ?CALL_MEMBER(MemberPid, MemberModule:assignments_revoked(MemberPid)), %% 2. some brod_group_member implementations may wait for messages %% to finish processing when assignments_revoked is called. @@ -684,9 +681,8 @@ sync_group(#state{ groupId = GroupId %% get my partition assignments Assignment = kpro:find(assignment, RspBody), TopicAssignments = get_topic_assignments(State, Assignment), - ok = ensure_member_pid_alive(MemberPid), - ok = MemberModule:assignments_received(MemberPid, MemberId, - GenerationId, TopicAssignments), + ?CALL_MEMBER(MemberPid, + MemberModule:assignments_received(MemberPid, MemberId, GenerationId, TopicAssignments)), NewState = State#state{is_in_group = true}, log(NewState, info, "assignments received:~s", [format_assignments(TopicAssignments)]), @@ -845,8 +841,7 @@ assign_partitions(State) when ?IS_LEADER(State) -> Assignments = case Strategy =:= callback_implemented of true -> - ok = ensure_member_pid_alive(MemberPid), - MemberModule:assign_partitions(MemberPid, Members, AllPartitions); + ?CALL_MEMBER(MemberPid, MemberModule:assign_partitions(MemberPid, Members, AllPartitions)); false -> do_assign_partitions(Strategy, Members, AllPartitions) end,