Skip to content

Commit

Permalink
erts: Use processes_iterator/0, processes_next/1
Browse files Browse the repository at this point in the history
  • Loading branch information
lucioleKi committed Jan 27, 2025
1 parent d72f66f commit b6ff680
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 73 deletions.
8 changes: 7 additions & 1 deletion erts/emulator/beam/erl_ptab.c
Original file line number Diff line number Diff line change
Expand Up @@ -1478,7 +1478,12 @@ erts_ptab_processes_next(Process *c_p, ErtsPTab *ptab, Uint first)
Eterm* hp;
Eterm *hp_end;

#ifdef DEBUG
int max_pids = 1;
#else
int max_pids = MAX(ERTS_BIF_REDS_LEFT(c_p), 1);
#endif

int num_pids = 0;
int n = max_pids * ERTS_PTAB_REDS_MULTIPLIER;
limit = MIN(ptab->r.o.max, first+n);
Expand All @@ -1500,7 +1505,7 @@ erts_ptab_processes_next(Process *c_p, ErtsPTab *ptab, Uint first)
return THE_NON_VALUE;
}

need = n * 2;
need = 3 + max_pids * 2;
hp = HAlloc(c_p, need); /* we need two heap words for each id */
hp_end = hp + need;
res = make_list(hp);
Expand All @@ -1524,6 +1529,7 @@ erts_ptab_processes_next(Process *c_p, ErtsPTab *ptab, Uint first)
scanned = (i - first) / ERTS_PTAB_REDS_MULTIPLIER + 1;

res = TUPLE2(hp, make_small(i), res);
hp += 3;
HRelease(c_p, hp_end, hp);

BUMP_REDS(c_p, scanned);
Expand Down
Binary file modified erts/preloaded/ebin/erts_code_purger.beam
Binary file not shown.
Binary file modified erts/preloaded/ebin/erts_literal_area_collector.beam
Binary file not shown.
Binary file modified erts/preloaded/ebin/init.beam
Binary file not shown.
46 changes: 25 additions & 21 deletions erts/preloaded/src/erts_code_purger.erl
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ do_purge(Mod, Reqs) ->
false ->
{{false, false}, Reqs};
true ->
{DidKill, NewReqs} = check_proc_code(erlang:processes(),
{DidKill, NewReqs} = check_proc_code(erlang:processes_iterator(),
Mod, true, Reqs),
true = erts_internal:purge_module(Mod, complete),
{{true, DidKill}, NewReqs}
Expand All @@ -144,7 +144,7 @@ do_soft_purge(Mod, Reqs) ->
false ->
{true, Reqs};
true ->
{PurgeOp, NewReqs} = check_proc_code(erlang:processes(),
{PurgeOp, NewReqs} = check_proc_code(erlang:processes_iterator(),
Mod, false, Reqs),
{erts_internal:purge_module(Mod, PurgeOp), NewReqs}
end.
Expand Down Expand Up @@ -172,7 +172,7 @@ do_finish_after_on_load(Mod, Keep, Reqs) ->
Reqs;
true ->
{_DidKill, NewReqs} =
check_proc_code(erlang:processes(),
check_proc_code(erlang:processes_iterator(),
Mod, true, Reqs),
true = erts_internal:purge_module(Mod, complete),
NewReqs
Expand All @@ -181,7 +181,7 @@ do_finish_after_on_load(Mod, Keep, Reqs) ->


%%
%% check_proc_code(Pids, Mod, Hard, Preqs) - Send asynchronous
%% check_proc_code(ProcessesIterator, Mod, Hard, Preqs) - Send asynchronous
%% requests to all processes to perform a check_process_code
%% operation. Each process will check their own state and
%% reply with the result. If 'Hard' equals
Expand All @@ -208,7 +208,7 @@ do_finish_after_on_load(Mod, Keep, Reqs) ->
waiting = [],
killed = false}).

check_proc_code(Pids, Mod, Hard, PReqs) ->
check_proc_code(Iter, Mod, Hard, PReqs) ->
Tag = erlang:make_ref(),
OReqLim = erlang:system_info(outstanding_system_requests_limit),
CpcS = #cpc_static{hard = Hard,
Expand All @@ -222,26 +222,26 @@ check_proc_code(Pids, Mod, Hard, PReqs) ->
OReqLim
end,
KS = #cpc_kill{outstanding_limit = KillLimit},
cpc_receive(CpcS, cpc_make_requests(CpcS, KS, 0, Pids), KS, []).
cpc_receive(CpcS, cpc_make_requests(CpcS, KS, 0, Iter), KS, []).

cpc_receive(#cpc_static{hard = true} = CpcS,
{0, []},
{0, none},
#cpc_kill{outstanding = [], waiting = [], killed = Killed},
PReqs) ->
%% No outstanding cpc requests. We did a hard check, so result is
%% whether or not we killed any processes...
cpc_result(CpcS, PReqs, Killed);
cpc_receive(#cpc_static{hard = false} = CpcS, {0, []}, _KillState, PReqs) ->
cpc_receive(#cpc_static{hard = false} = CpcS, {0, none}, _KillState, PReqs) ->
%% No outstanding cpc requests and we did a soft check that succeeded...
cpc_result(CpcS, PReqs, complete);
cpc_receive(#cpc_static{tag = Tag} = CpcS, {NoReq, PidsLeft} = ReqInfo,
cpc_receive(#cpc_static{tag = Tag} = CpcS, {NoReq, Iter} = ReqInfo,
KillState0, PReqs) ->
receive
{check_process_code, {Tag, _Pid}, false} ->
%% Process not referring the module; done with this process...
cpc_receive(CpcS,
cpc_make_requests(CpcS, KillState0,
NoReq-1, PidsLeft),
NoReq-1, Iter),
KillState0,
PReqs);
{check_process_code, {Tag, Pid}, true} ->
Expand All @@ -258,15 +258,15 @@ cpc_receive(#cpc_static{tag = Tag} = CpcS, {NoReq, PidsLeft} = ReqInfo,
KillState1 = cpc_sched_kill(Pid, KillState0),
cpc_receive(CpcS,
cpc_make_requests(CpcS, KillState1,
NoReq-1, PidsLeft),
NoReq-1, Iter),
KillState1,
PReqs)
end;
{'DOWN', MonRef, process, _, _} ->
KillState1 = cpc_handle_down(MonRef, KillState0),
cpc_receive(CpcS,
cpc_make_requests(CpcS, KillState1,
NoReq, PidsLeft),
NoReq, Iter),
KillState1,
PReqs);
PReq when element(1, PReq) == purge;
Expand Down Expand Up @@ -341,16 +341,20 @@ cpc_request(#cpc_static{tag = Tag, module = Mod}, Pid) ->
erts_internal:request_system_task(Pid, normal,
{check_process_code, {Tag, Pid}, Mod}).

cpc_make_requests(#cpc_static{}, #cpc_kill{}, NoCpcReqs, []) ->
{NoCpcReqs, []};
cpc_make_requests(#cpc_static{}, #cpc_kill{}, NoCpcReqs, none) ->
{NoCpcReqs, none};
cpc_make_requests(#cpc_static{oreq_limit = Limit},
#cpc_kill{no_outstanding = NoKillReqs},
NoCpcReqs, Pids) when Limit =< NoCpcReqs + NoKillReqs ->
{NoCpcReqs, Pids};
NoCpcReqs, Iter) when Limit =< NoCpcReqs + NoKillReqs ->
{NoCpcReqs, Iter};
cpc_make_requests(#cpc_static{} = CpcS, #cpc_kill{} = KS,
NoCpcReqs, [Pid|Pids]) ->
cpc_request(CpcS, Pid),
cpc_make_requests(CpcS, KS, NoCpcReqs+1, Pids).
NoCpcReqs, Iter0) ->
case erlang:processes_next(Iter0) of
none -> {NoCpcReqs, none};
{Pid, Iter1} ->
cpc_request(CpcS, Pid),
cpc_make_requests(CpcS, KS, NoCpcReqs+1, Iter1)
end.

change_prio(From, Ref, Prio) ->
try
Expand Down Expand Up @@ -391,7 +395,7 @@ do_test_soft_purge(Mod, From, Ref, Reqs) ->
_ = test_progress(continued, From, Ref, TestRes),
{true, Reqs};
true ->
{PurgeOp, NewReqs} = check_proc_code(erlang:processes(),
{PurgeOp, NewReqs} = check_proc_code(erlang:processes_iterator(),
Mod, false, Reqs),
_ = test_progress(continued, From, Ref, TestRes),
{erts_internal:purge_module(Mod, PurgeOp), NewReqs}
Expand All @@ -405,7 +409,7 @@ do_test_hard_purge(Mod, From, Ref, Reqs) ->
_ = test_progress(continued, From, Ref, TestRes),
{{false, false}, Reqs};
true ->
{DidKill, NewReqs} = check_proc_code(erlang:processes(),
{DidKill, NewReqs} = check_proc_code(erlang:processes_iterator(),
Mod, true, Reqs),
_ = test_progress(continued, From, Ref, TestRes),
true = erts_internal:purge_module(Mod, complete),
Expand Down
63 changes: 36 additions & 27 deletions erts/preloaded/src/erts_literal_area_collector.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
%%
start() ->
process_flag(trap_exit, true),
msg_loop(undefined, {0, []}, 0, []).
msg_loop(undefined, {0, none}, 0, []).

%%
%% The VM will send us a 'copy_literals' message
Expand All @@ -63,16 +63,17 @@ msg_loop(Area, {Ongoing, NeedIReq} = OReqInfo, GcOutstnd, NeedGC) ->
switch_area();

%% Process (_Pid) has completed the request...
{copy_literals, {Area, _ReqType, _Pid}, ok} when Ongoing == 1,
NeedIReq == [] ->
switch_area(); %% Last process completed...
{copy_literals, {Area, init, _Pid}, ok} ->
msg_loop(Area, check_send_copy_req(Area, Ongoing-1, NeedIReq),
GcOutstnd, NeedGC);
case check_send_copy_req(Area, Ongoing-1, NeedIReq) of
{0, none} -> switch_area(); %% Last process completed...
NewOReqInfo -> msg_loop(Area, NewOReqInfo, GcOutstnd, NeedGC)
end;
{copy_literals, {Area, ReqType, _Pid}, ok} when NeedGC == [],
ReqType /= init ->
msg_loop(Area, check_send_copy_req(Area, Ongoing-1, NeedIReq),
GcOutstnd-1, []);
case check_send_copy_req(Area, Ongoing-1, NeedIReq) of
{0, none} -> switch_area(); %% Last process completed...
NewOReqInfo -> msg_loop(Area, NewOReqInfo, GcOutstnd-1, [])
end;
{copy_literals, {Area, ReqType, _Pid}, ok} when ReqType /= init ->
[{GCPid,GCWork} | NewNeedGC] = NeedGC,
send_copy_req(GCPid, Area, GCWork),
Expand Down Expand Up @@ -117,7 +118,7 @@ switch_area() ->
case Res of
false ->
%% No more areas to handle...
msg_loop(undefined, {0, []}, 0, []);
msg_loop(undefined, {0, none}, 0, []);
true ->
%% Send requests to OReqLim processes to copy
%% all live data they have referring to the
Expand All @@ -126,27 +127,35 @@ switch_area() ->
%% processes when responses comes back until
%% all processes have been handled...
Area = make_ref(),
Pids = erlang:processes(),
Iter = erlang:processes_iterator(),
OReqLim = erlang:system_info(outstanding_system_requests_limit),
msg_loop(Area, send_copy_reqs(Pids, Area, OReqLim), 0, [])
msg_loop(Area, send_copy_reqs(Iter, Area, OReqLim), 0, [])
end.

check_send_copy_req(_Area, Ongoing, []) ->
{Ongoing, []};
check_send_copy_req(Area, Ongoing, [Pid|Pids]) ->
send_copy_req(Pid, Area, init),
{Ongoing+1, Pids}.

send_copy_reqs(Ps, Area, OReqLim) ->
send_copy_reqs(Ps, Area, OReqLim, 0).

send_copy_reqs([], _Area, _OReqLim, N) ->
{N, []};
send_copy_reqs(Ps, _Area, OReqLim, N) when N >= OReqLim ->
{N, Ps};
send_copy_reqs([P|Ps], Area, OReqLim, N) ->
send_copy_req(P, Area, init),
send_copy_reqs(Ps, Area, OReqLim, N+1).
check_send_copy_req(_Area, Ongoing, none) ->
{Ongoing, none};
check_send_copy_req(Area, Ongoing, Iter0) ->
case erlang:processes_next(Iter0) of
none ->
{Ongoing, none};
{Pid, Iter1} ->
send_copy_req(Pid, Area, init),
{Ongoing+1, Iter1}
end.

send_copy_reqs(Iter, Area, OReqLim) ->
send_copy_reqs(Iter, Area, OReqLim, 0).

send_copy_reqs(Iter, _Area, OReqLim, N) when N >= OReqLim ->
{N, Iter};
send_copy_reqs(Iter0, Area, OReqLim, N) ->
case erlang:processes_next(Iter0) of
none ->
{N, none};
{Pid, Iter1} ->
send_copy_req(Pid, Area, init),
send_copy_reqs(Iter1, Area, OReqLim, N+1)
end.

send_copy_req(P, Area, How) ->
erts_literal_area_collector:send_copy_request(P, Area, How).
Expand Down
45 changes: 21 additions & 24 deletions erts/preloaded/src/init.erl
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,7 @@ clear_system(Unload,BootPid,State) ->
Logger = get_logger(State#state.kernel),
shutdown_pids(Heart,Logger,BootPid,State),
Unload andalso unload(Heart),
kill_em([Logger]),
exit(Logger,kill),
Unload andalso do_unload([logger_server]).

flush() ->
Expand Down Expand Up @@ -1065,30 +1065,27 @@ resend(_) ->
%%
%% Kill all existing pids in the system (except init and heart).
kill_all_pids(Heart,Logger) ->
case get_pids(Heart,Logger) of
[] ->
ok;
Pids ->
kill_em(Pids),
kill_all_pids(Heart,Logger) % Continue until all are really killed.
Iter = erlang:processes_iterator(),
case kill_pids(Heart, Logger, Iter, false) of
true ->
% Continue until all are really killed.
kill_all_pids(Heart, Logger);
false ->
ok
end.

kill_pids(Heart, Logger, Iter0, MorePids) ->
case erlang:processes_next(Iter0) of
none -> MorePids;
{Pid, Iter1} ->
case erts_internal:is_system_process(Pid) orelse
lists:member(Pid, [Heart, Logger, self()]) of
true -> kill_pids(Heart, Logger, Iter1, MorePids);
false ->
exit(Pid, kill),
kill_pids(Heart, Logger, Iter1, true)
end
end.

%% All except system processes.
get_pids(Heart,Logger) ->
Pids = [P || P <- processes(), not erts_internal:is_system_process(P)],
delete(Heart,Logger,self(),Pids).

delete(Heart,Logger,Init,[Heart|Pids]) -> delete(Heart,Logger,Init,Pids);
delete(Heart,Logger,Init,[Logger|Pids]) -> delete(Heart,Logger,Init,Pids);
delete(Heart,Logger,Init,[Init|Pids]) -> delete(Heart,Logger,Init,Pids);
delete(Heart,Logger,Init,[Pid|Pids]) -> [Pid|delete(Heart,Logger,Init,Pids)];
delete(_,_,_,[]) -> [].

kill_em([Pid|Pids]) ->
exit(Pid,kill),
kill_em(Pids);
kill_em([]) ->
ok.

%%
%% Kill all existing ports in the system (except the heart port),
Expand Down

0 comments on commit b6ff680

Please sign in to comment.