Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

erts: Use processes_iterator/0, processes_next/1 #9182

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion erts/emulator/beam/erl_ptab.c
Original file line number Diff line number Diff line change
Expand Up @@ -1478,7 +1478,12 @@ erts_ptab_processes_next(Process *c_p, ErtsPTab *ptab, Uint first)
Eterm* hp;
Eterm *hp_end;

#ifdef DEBUG
int max_pids = 1;
#else
int max_pids = MAX(ERTS_BIF_REDS_LEFT(c_p), 1);
#endif

int num_pids = 0;
int n = max_pids * ERTS_PTAB_REDS_MULTIPLIER;
limit = MIN(ptab->r.o.max, first+n);
Expand All @@ -1500,7 +1505,7 @@ erts_ptab_processes_next(Process *c_p, ErtsPTab *ptab, Uint first)
return THE_NON_VALUE;
}

need = n * 2;
need = 3 + max_pids * 2;
hp = HAlloc(c_p, need); /* we need two heap words for each id */
hp_end = hp + need;
res = make_list(hp);
Expand All @@ -1524,6 +1529,7 @@ erts_ptab_processes_next(Process *c_p, ErtsPTab *ptab, Uint first)
scanned = (i - first) / ERTS_PTAB_REDS_MULTIPLIER + 1;

res = TUPLE2(hp, make_small(i), res);
hp += 3;
HRelease(c_p, hp_end, hp);

BUMP_REDS(c_p, scanned);
Expand Down
Binary file modified erts/preloaded/ebin/erlang.beam
Binary file not shown.
Binary file modified erts/preloaded/ebin/erts_code_purger.beam
Binary file not shown.
Binary file modified erts/preloaded/ebin/erts_literal_area_collector.beam
Binary file not shown.
Binary file modified erts/preloaded/ebin/init.beam
Binary file not shown.
46 changes: 25 additions & 21 deletions erts/preloaded/src/erts_code_purger.erl
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ do_purge(Mod, Reqs) ->
false ->
{{false, false}, Reqs};
true ->
{DidKill, NewReqs} = check_proc_code(erlang:processes(),
{DidKill, NewReqs} = check_proc_code(erlang:processes_iterator(),
Mod, true, Reqs),
true = erts_internal:purge_module(Mod, complete),
{{true, DidKill}, NewReqs}
Expand All @@ -144,7 +144,7 @@ do_soft_purge(Mod, Reqs) ->
false ->
{true, Reqs};
true ->
{PurgeOp, NewReqs} = check_proc_code(erlang:processes(),
{PurgeOp, NewReqs} = check_proc_code(erlang:processes_iterator(),
Mod, false, Reqs),
{erts_internal:purge_module(Mod, PurgeOp), NewReqs}
end.
Expand Down Expand Up @@ -172,7 +172,7 @@ do_finish_after_on_load(Mod, Keep, Reqs) ->
Reqs;
true ->
{_DidKill, NewReqs} =
check_proc_code(erlang:processes(),
check_proc_code(erlang:processes_iterator(),
Mod, true, Reqs),
true = erts_internal:purge_module(Mod, complete),
NewReqs
Expand All @@ -181,7 +181,7 @@ do_finish_after_on_load(Mod, Keep, Reqs) ->


%%
%% check_proc_code(Pids, Mod, Hard, Preqs) - Send asynchronous
%% check_proc_code(ProcessesIterator, Mod, Hard, Preqs) - Send asynchronous
%% requests to all processes to perform a check_process_code
%% operation. Each process will check their own state and
%% reply with the result. If 'Hard' equals
Expand All @@ -208,7 +208,7 @@ do_finish_after_on_load(Mod, Keep, Reqs) ->
waiting = [],
killed = false}).

check_proc_code(Pids, Mod, Hard, PReqs) ->
check_proc_code(Iter, Mod, Hard, PReqs) ->
Tag = erlang:make_ref(),
OReqLim = erlang:system_info(outstanding_system_requests_limit),
CpcS = #cpc_static{hard = Hard,
Expand All @@ -222,26 +222,26 @@ check_proc_code(Pids, Mod, Hard, PReqs) ->
OReqLim
end,
KS = #cpc_kill{outstanding_limit = KillLimit},
cpc_receive(CpcS, cpc_make_requests(CpcS, KS, 0, Pids), KS, []).
cpc_receive(CpcS, cpc_make_requests(CpcS, KS, 0, Iter), KS, []).

cpc_receive(#cpc_static{hard = true} = CpcS,
{0, []},
{0, none},
#cpc_kill{outstanding = [], waiting = [], killed = Killed},
PReqs) ->
%% No outstanding cpc requests. We did a hard check, so result is
%% whether or not we killed any processes...
cpc_result(CpcS, PReqs, Killed);
cpc_receive(#cpc_static{hard = false} = CpcS, {0, []}, _KillState, PReqs) ->
cpc_receive(#cpc_static{hard = false} = CpcS, {0, none}, _KillState, PReqs) ->
%% No outstanding cpc requests and we did a soft check that succeeded...
cpc_result(CpcS, PReqs, complete);
cpc_receive(#cpc_static{tag = Tag} = CpcS, {NoReq, PidsLeft} = ReqInfo,
cpc_receive(#cpc_static{tag = Tag} = CpcS, {NoReq, Iter} = ReqInfo,
KillState0, PReqs) ->
receive
{check_process_code, {Tag, _Pid}, false} ->
%% Process not referring the module; done with this process...
cpc_receive(CpcS,
cpc_make_requests(CpcS, KillState0,
NoReq-1, PidsLeft),
NoReq-1, Iter),
KillState0,
PReqs);
{check_process_code, {Tag, Pid}, true} ->
Expand All @@ -258,15 +258,15 @@ cpc_receive(#cpc_static{tag = Tag} = CpcS, {NoReq, PidsLeft} = ReqInfo,
KillState1 = cpc_sched_kill(Pid, KillState0),
cpc_receive(CpcS,
cpc_make_requests(CpcS, KillState1,
NoReq-1, PidsLeft),
NoReq-1, Iter),
KillState1,
PReqs)
end;
{'DOWN', MonRef, process, _, _} ->
KillState1 = cpc_handle_down(MonRef, KillState0),
cpc_receive(CpcS,
cpc_make_requests(CpcS, KillState1,
NoReq, PidsLeft),
NoReq, Iter),
KillState1,
PReqs);
PReq when element(1, PReq) == purge;
Expand Down Expand Up @@ -341,16 +341,20 @@ cpc_request(#cpc_static{tag = Tag, module = Mod}, Pid) ->
erts_internal:request_system_task(Pid, normal,
{check_process_code, {Tag, Pid}, Mod}).

cpc_make_requests(#cpc_static{}, #cpc_kill{}, NoCpcReqs, []) ->
{NoCpcReqs, []};
cpc_make_requests(#cpc_static{}, #cpc_kill{}, NoCpcReqs, none) ->
{NoCpcReqs, none};
cpc_make_requests(#cpc_static{oreq_limit = Limit},
#cpc_kill{no_outstanding = NoKillReqs},
NoCpcReqs, Pids) when Limit =< NoCpcReqs + NoKillReqs ->
{NoCpcReqs, Pids};
NoCpcReqs, Iter) when Limit =< NoCpcReqs + NoKillReqs ->
{NoCpcReqs, Iter};
cpc_make_requests(#cpc_static{} = CpcS, #cpc_kill{} = KS,
NoCpcReqs, [Pid|Pids]) ->
cpc_request(CpcS, Pid),
cpc_make_requests(CpcS, KS, NoCpcReqs+1, Pids).
NoCpcReqs, Iter0) ->
case erlang:processes_next(Iter0) of
none -> {NoCpcReqs, none};
{Pid, Iter1} ->
cpc_request(CpcS, Pid),
cpc_make_requests(CpcS, KS, NoCpcReqs+1, Iter1)
end.

change_prio(From, Ref, Prio) ->
try
Expand Down Expand Up @@ -391,7 +395,7 @@ do_test_soft_purge(Mod, From, Ref, Reqs) ->
_ = test_progress(continued, From, Ref, TestRes),
{true, Reqs};
true ->
{PurgeOp, NewReqs} = check_proc_code(erlang:processes(),
{PurgeOp, NewReqs} = check_proc_code(erlang:processes_iterator(),
Mod, false, Reqs),
_ = test_progress(continued, From, Ref, TestRes),
{erts_internal:purge_module(Mod, PurgeOp), NewReqs}
Expand All @@ -405,7 +409,7 @@ do_test_hard_purge(Mod, From, Ref, Reqs) ->
_ = test_progress(continued, From, Ref, TestRes),
{{false, false}, Reqs};
true ->
{DidKill, NewReqs} = check_proc_code(erlang:processes(),
{DidKill, NewReqs} = check_proc_code(erlang:processes_iterator(),
Mod, true, Reqs),
_ = test_progress(continued, From, Ref, TestRes),
true = erts_internal:purge_module(Mod, complete),
Expand Down
63 changes: 36 additions & 27 deletions erts/preloaded/src/erts_literal_area_collector.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
%%
start() ->
process_flag(trap_exit, true),
msg_loop(undefined, {0, []}, 0, []).
msg_loop(undefined, {0, none}, 0, []).

%%
%% The VM will send us a 'copy_literals' message
Expand All @@ -63,16 +63,17 @@ msg_loop(Area, {Ongoing, NeedIReq} = OReqInfo, GcOutstnd, NeedGC) ->
switch_area();

%% Process (_Pid) has completed the request...
{copy_literals, {Area, _ReqType, _Pid}, ok} when Ongoing == 1,
NeedIReq == [] ->
switch_area(); %% Last process completed...
{copy_literals, {Area, init, _Pid}, ok} ->
msg_loop(Area, check_send_copy_req(Area, Ongoing-1, NeedIReq),
GcOutstnd, NeedGC);
case check_send_copy_req(Area, Ongoing-1, NeedIReq) of
{0, none} -> switch_area(); %% Last process completed...
NewOReqInfo -> msg_loop(Area, NewOReqInfo, GcOutstnd, NeedGC)
end;
{copy_literals, {Area, ReqType, _Pid}, ok} when NeedGC == [],
ReqType /= init ->
msg_loop(Area, check_send_copy_req(Area, Ongoing-1, NeedIReq),
GcOutstnd-1, []);
case check_send_copy_req(Area, Ongoing-1, NeedIReq) of
{0, none} -> switch_area(); %% Last process completed...
NewOReqInfo -> msg_loop(Area, NewOReqInfo, GcOutstnd-1, [])
end;
{copy_literals, {Area, ReqType, _Pid}, ok} when ReqType /= init ->
[{GCPid,GCWork} | NewNeedGC] = NeedGC,
send_copy_req(GCPid, Area, GCWork),
Expand Down Expand Up @@ -117,7 +118,7 @@ switch_area() ->
case Res of
false ->
%% No more areas to handle...
msg_loop(undefined, {0, []}, 0, []);
msg_loop(undefined, {0, none}, 0, []);
true ->
%% Send requests to OReqLim processes to copy
%% all live data they have referring to the
Expand All @@ -126,27 +127,35 @@ switch_area() ->
%% processes when responses comes back until
%% all processes have been handled...
Area = make_ref(),
Pids = erlang:processes(),
Iter = erlang:processes_iterator(),
OReqLim = erlang:system_info(outstanding_system_requests_limit),
msg_loop(Area, send_copy_reqs(Pids, Area, OReqLim), 0, [])
msg_loop(Area, send_copy_reqs(Iter, Area, OReqLim), 0, [])
end.

check_send_copy_req(_Area, Ongoing, []) ->
{Ongoing, []};
check_send_copy_req(Area, Ongoing, [Pid|Pids]) ->
send_copy_req(Pid, Area, init),
{Ongoing+1, Pids}.

send_copy_reqs(Ps, Area, OReqLim) ->
send_copy_reqs(Ps, Area, OReqLim, 0).

send_copy_reqs([], _Area, _OReqLim, N) ->
{N, []};
send_copy_reqs(Ps, _Area, OReqLim, N) when N >= OReqLim ->
{N, Ps};
send_copy_reqs([P|Ps], Area, OReqLim, N) ->
send_copy_req(P, Area, init),
send_copy_reqs(Ps, Area, OReqLim, N+1).
check_send_copy_req(_Area, Ongoing, none) ->
{Ongoing, none};
check_send_copy_req(Area, Ongoing, Iter0) ->
case erlang:processes_next(Iter0) of
none ->
{Ongoing, none};
{Pid, Iter1} ->
send_copy_req(Pid, Area, init),
{Ongoing+1, Iter1}
end.

send_copy_reqs(Iter, Area, OReqLim) ->
send_copy_reqs(Iter, Area, OReqLim, 0).

send_copy_reqs(Iter, _Area, OReqLim, N) when N >= OReqLim ->
{N, Iter};
send_copy_reqs(Iter0, Area, OReqLim, N) ->
case erlang:processes_next(Iter0) of
none ->
{N, none};
{Pid, Iter1} ->
send_copy_req(Pid, Area, init),
send_copy_reqs(Iter1, Area, OReqLim, N+1)
end.

send_copy_req(P, Area, How) ->
erts_literal_area_collector:send_copy_request(P, Area, How).
Expand Down
45 changes: 21 additions & 24 deletions erts/preloaded/src/init.erl
Original file line number Diff line number Diff line change
Expand Up @@ -947,7 +947,7 @@ clear_system(Unload,BootPid,State) ->
Logger = get_logger(State#state.kernel),
shutdown_pids(Heart,Logger,BootPid,State),
Unload andalso unload(Heart),
kill_em([Logger]),
exit(Logger,kill),
Unload andalso do_unload([logger_server]).

flush() ->
Expand Down Expand Up @@ -1060,30 +1060,27 @@ resend(_) ->
%%
%% Kill all existing pids in the system (except init and heart).
kill_all_pids(Heart,Logger) ->
case get_pids(Heart,Logger) of
[] ->
ok;
Pids ->
kill_em(Pids),
kill_all_pids(Heart,Logger) % Continue until all are really killed.
Iter = erlang:processes_iterator(),
case kill_pids(Heart, Logger, Iter, false) of
true ->
% Continue until all are really killed.
kill_all_pids(Heart, Logger);
false ->
ok
end.

kill_pids(Heart, Logger, Iter0, MorePids) ->
case erlang:processes_next(Iter0) of
none -> MorePids;
{Pid, Iter1} ->
case erts_internal:is_system_process(Pid) orelse
lists:member(Pid, [Heart, Logger, self()]) of
true -> kill_pids(Heart, Logger, Iter1, MorePids);
false ->
exit(Pid, kill),
kill_pids(Heart, Logger, Iter1, true)
end
end.

%% All except system processes.
get_pids(Heart,Logger) ->
Pids = [P || P <- processes(), not erts_internal:is_system_process(P)],
delete(Heart,Logger,self(),Pids).

delete(Heart,Logger,Init,[Heart|Pids]) -> delete(Heart,Logger,Init,Pids);
delete(Heart,Logger,Init,[Logger|Pids]) -> delete(Heart,Logger,Init,Pids);
delete(Heart,Logger,Init,[Init|Pids]) -> delete(Heart,Logger,Init,Pids);
delete(Heart,Logger,Init,[Pid|Pids]) -> [Pid|delete(Heart,Logger,Init,Pids)];
delete(_,_,_,[]) -> [].

kill_em([Pid|Pids]) ->
exit(Pid,kill),
kill_em(Pids);
kill_em([]) ->
ok.

%%
%% Kill all existing ports in the system (except the heart port),
Expand Down
Loading