Skip to content

Commit

Permalink
Test read_ahead true|false performance
Browse files Browse the repository at this point in the history
  • Loading branch information
RaimoNiskanen committed Sep 5, 2024
1 parent 1d4d203 commit 0e943f4
Showing 1 changed file with 132 additions and 74 deletions.
206 changes: 132 additions & 74 deletions lib/kernel/test/gen_tcp_socket_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,34 +28,41 @@ all() ->
[{group, smoketest}].

groups() ->
[{smoketest, [{group,small}]},
{benchmark, [{group,small}, {group,medium},
{group,large}, {group,huge}]},
[{smoketest, [{group,small}]},
{benchmark, [{group,small}, {group,medium},
{group,large}, {group,huge}]},
%%
{dev, [{group,dev_direct},
{group,dev_inet}, {group,dev_socket}]},
{dev_inet, testcases(dev)},
{dev_socket, testcases(dev)},
{dev_direct, testcases(direct)},
{dev, backend_groups(dev)},
{dev_inet, testcases(dev)},
{dev_socket, testcases(dev)},
{dev_direct, testcases(direct)},
{dev_inet_nra, testcases(dev)}, % _nra: no read_ahead,
{dev_socket_nra, testcases(dev)}, % i.e {read_ahead,false}
{dev_direct_nra, testcases(direct)},
%%
{small, backend_groups()},
{medium, backend_groups()},
{large, backend_groups()},
{huge, backend_groups()},
%%
{inet, testcases(active)},
{socket, testcases(active)},
{direct, testcases(direct)}].

backend_groups() ->
[{group,inet}, {group,socket}, {group,direct}].
{small, backend_groups(normal)},
{medium, backend_groups(normal)},
{large, backend_groups(normal)},
{huge, backend_groups(normal)},
{inet, testcases(active)},
{socket, testcases(active)},
{direct, testcases(direct)},
{inet_nra, testcases(active)},
{socket_nra, testcases(active)},
{direct_nra, testcases(direct)}].

backend_groups(dev) ->
[{group,dev_inet}, {group,dev_socket}, {group,dev_direct},
{group,dev_inet_nra}, {group,dev_socket_nra}, {group,dev_direct_nra}];
backend_groups(normal) ->
[{group,inet}, {group,socket}, {group,direct},
{group,inet_nra}, {group,socket_nra}, {group,direct_nra}].

testcases(active) ->
[active_raw, active_false, active_true,
active_once, active_1, active_5, active_20];
testcases(dev) ->
[active_raw, active_false, active_true,
active_once, active_20];
[active_raw, active_false, active_true, active_once, active_20];
testcases(direct) ->
[socket_raw, socket_packet, socket_packet_buf, socket_packet_cheat].

Expand All @@ -67,6 +74,8 @@ testcases(direct) ->
-define(BUFSIZE_EXP, 17).
-define(BUFSIZE, (1 bsl ?BUFSIZE_EXP)).

-define(DBG(Term), dbg(?FUNCTION_NAME, ?LINE, begin Term end)).

init_per_suite(Config) ->
case socket:is_supported(protocols, tcp) of
true ->
Expand All @@ -85,40 +94,48 @@ end_per_suite(_Config) ->

init_per_group(Nm, Config) ->
case Nm of
%% 'burden' shifts the number of packets left
smoketest -> [{burden,0} | Config];
benchmark -> [{burden,2} | Config]; % 4 times the total size
benchmark -> [{burden,2} | Config]; % 4 times the number of packets
dev ->
init_per_group_size(?BUFSIZE_EXP + 4, 0, [{burden,3} | Config]);
%%
dev -> init_per_group(dev_size, [{burden,3} | Config]);
dev_inet -> init_per_group(inet, Config);
dev_socket -> init_per_group(socket, Config);
dev_direct -> init_per_group(direct, Config);
%%
dev_size ->
init_per_group_size(
?BUFSIZE_EXP + 4, ?NORM_EXP - (?BUFSIZE_EXP + 4), Config);
%% For small packets we cannot have the total size 2^?NORM_EXP
%% since the message passing overhead (latency) starts to dominate
%% so we have to subtract some from the exponent to get
%% roughly the same running time
small -> init_per_group_size(7, ?NORM_EXP - 7 - 3, Config);
medium -> init_per_group_size(10, ?NORM_EXP - 10 - 1, Config);
large -> init_per_group_size(15, ?NORM_EXP - 15, Config);
huge ->
init_per_group_size(
?BUFSIZE_EXP + 2, ?NORM_EXP - (?BUFSIZE_EXP + 2), Config);
small -> init_per_group_size(7, -3, Config);
medium -> init_per_group_size(10, -1, Config);
large -> init_per_group_size(15, 0, Config);
huge -> init_per_group_size(?BUFSIZE_EXP + 2, 0, Config);
%%
inet_nra -> init_per_group_nra(inet, Config);
socket_nra -> init_per_group_nra(socket, Config);
direct_nra -> init_per_group_nra(direct, Config);
dev_inet_nra -> init_per_group_nra(dev_inet, Config);
dev_socket_nra -> init_per_group_nra(dev_socket, Config);
dev_direct_nra -> init_per_group_nra(dev_direct, Config);
_ when Nm =:= inet;
Nm =:= socket;
Nm =:= direct -> [{backend,Nm} | Config]
end.

init_per_group_size(K, L, Config) ->
init_per_group_size(K, Adj, Config) ->
{_, Burden} = proplists:lookup(burden, Config),
M = L + Burden,
%%
%% 2^?NORM_EXP = Number of bytes to transfer, unadjusted
%% 2^(?NORM_EXP + Burden + Adj) = -"-, adjusted
%%
%% 2^K = Mean packet size
%% 2^(K+1) = Max packet size
%% N = 2^M = Number of packets;
%%
M = ?NORM_EXP - K + Burden + Adj,
N = 1 bsl M,
%% 2^K = Mean packet size
%% 2^(K+1) = Max packet size
%% N = Number of packets;
StopTag = spawn_testdata_server(K+1, N),
%%
{MeanSize, SizeSuffix} = size_and_suffix(1 bsl K),
Expand All @@ -129,29 +146,42 @@ init_per_group_size(K, L, Config) ->
[{testdata_server, StopTag},
{testdata_size, {K, M}} | Config].

init_per_group_nra(Name, Config) ->
init_per_group(Name, [{read_ahead,false} | Config]).

end_per_group(Nm, Config) ->
case Nm of
dev -> proplists:delete(burden, end_per_group(dev_size, Config));
dev_inet -> end_per_group(inet, Config);
dev_socket -> end_per_group(socket, Config);
dev_direct -> end_per_group(direct, Config);
dev_inet -> end_per_group(inet, Config);
dev_socket -> end_per_group(socket, Config);
dev_direct -> end_per_group(direct, Config);
inet_nra -> end_per_group_nra(inet, Config);
socket_nra -> end_per_group_nra(socket, Config);
direct_nra -> end_per_group_nra(direct, Config);
dev_inet_nra -> end_per_group_nra(dev_inet, Config);
dev_socket_nra -> end_per_group_nra(dev_socket, Config);
dev_direct_nra -> end_per_group_nra(dev_direct, Config);
%%
dev -> proplists:delete(burden, end_per_group_size(Config));
_ when Nm =:= smoketest;
Nm =:= dev;
Nm =:= benchmark -> proplists:delete(burden, Config);
_ when Nm =:= dev_size;
Nm =:= small;
_ when Nm =:= small;
Nm =:= medium;
Nm =:= large;
Nm =:= huge ->
{_, StopTag} = proplists:lookup(testdata_server, Config),
stop_testdata_server(StopTag),
proplists:delete(
testdata_server, proplists:delete(testdata_size, Config));
Nm =:= huge -> end_per_group_size(Config);
_ when Nm =:= inet;
Nm =:= socket;
Nm =:= direct -> proplists:delete(backend, Config)
Nm =:= direct -> proplists:delete(backend, Config)
end.

end_per_group_size(Config) ->
{_, StopTag} = proplists:lookup(testdata_server, Config),
stop_testdata_server(StopTag),
proplists:delete(
testdata_server, proplists:delete(testdata_size, Config)).

end_per_group_nra(Name, Config) ->
proplists:delete(read_ahead, end_per_group(Name, Config)).


size_and_suffix(P) ->
size_and_suffix(P, 1, ["", "K", "M", "T", "Z"]).
Expand Down Expand Up @@ -202,13 +232,15 @@ tc2active(TC) ->
end.

xfer(Config, TC) when is_list(Config) ->
{_, Backend} = proplists:lookup(backend, Config),
{_, BindAddr} = proplists:lookup(bind_addr, Config),
{_, TestdataSize} = proplists:lookup(testdata_size, Config),
run_xfer(TC, Backend, BindAddr, TestdataSize, testdata()).
Params =
#{ Key => Value ||
{Key, Value} <- Config,
lists:member(
Key, [backend, read_ahead, bind_addr, testdata_size]) },
run_xfer(TC, Params, testdata()).

run_xfer(
TC, Backend, BindAddr, {K, _} = TestdataSize,
TC, #{ bind_addr := BindAddr } = Params,
#{iovecs := Iovecs,
packet_sizes := PacketSizes,
total_size := TotalSize}) ->
Expand All @@ -220,15 +252,13 @@ run_xfer(
fun () ->
try
%% Send iovecs efficiently
{ok, L} =
gen_tcp:listen(
0, [{ifaddr,BindAddr}, {sndbuf, 2 bsl K}]),
{ok, L} = gen_tcp:listen(0, [{ifaddr,BindAddr}]),
{ok, {IP,Port}} = inet:sockname(L),
Sockaddr =
#{family => inet, addr => IP, port => Port},
Sockaddr = #{family => inet, addr => IP, port => Port},
Parent ! {Tag, Sockaddr},
{ok, A} = gen_tcp:accept(L),
ok = gen_tcp:close(L),
ok = inet:setopts(A, [{sndbuf, ?BUFSIZE}]),
send_loop(A, Iovecs),
ok = gen_tcp:close(A)
catch Class : Reason : Stacktrace ->
Expand All @@ -241,10 +271,9 @@ run_xfer(
receive
{Tag, Sockaddr} ->
ct:pal("try connect to ~p"
"~n Backend: ~p"
"~n TC: ~p"
"~n", [Sockaddr, Backend, TC]),
C = case connect(Backend, Sockaddr, TC) of
"~n", [Sockaddr, TC]),
C = case connect(Params, Sockaddr, TC) of
{ok, CSock} ->
CSock;
{error, eaddrnotavail = CReason} ->
Expand All @@ -257,7 +286,7 @@ run_xfer(
assert({ok, TotalSize}, recv_loop(C, PacketSizes, TC)),
T2 = erlang:monotonic_time(),
T = erlang:convert_time_unit(T2 - T1, native, millisecond),
report_MByte_s(Backend, TestdataSize, TC, TotalSize, T)
report_MByte_s(Params, TC, TotalSize, T)
catch Class : Reason : Stacktrace ->
ct:pal(
"Receiver crash [~w] ~w : ~p~n ~p~n",
Expand All @@ -270,7 +299,7 @@ run_xfer(
%%
erlang:raise(Class, Reason, Stacktrace)
after
close(Backend, C),
close(Params, C),
receive {'DOWN',Mref,_,_,_} -> ok end
end;
{'DOWN',Mref,_,_,Reason} ->
Expand All @@ -284,7 +313,7 @@ send_loop(S, [Iovec | Iovecs]) ->
send_loop(S, Iovecs).


connect(direct, Sockaddr, _) ->
connect(#{ backend := direct }, Sockaddr, _) ->
{ok, S} = socket:open(?DOMAIN, stream),
ok = socket:bind(S, any),
ok = socket:setopt(S, {socket,rcvbuf}, ?BUFSIZE),
Expand All @@ -296,29 +325,45 @@ connect(direct, Sockaddr, _) ->
Error ->
Error
end;
connect(Backend, Sockaddr, active_raw) -> % {active,true}, {packet,raw}
connect(#{ backend := Backend } = Params, Sockaddr, active_raw) ->
%% {active,true}, {packet,raw}
Opts =
[{inet_backend,Backend}, binary, {active,true},
?DOMAIN, {recbuf,?BUFSIZE}],
?DOMAIN, {recbuf,?BUFSIZE}] ++
case Params of
#{ read_ahead := false } -> [{read_ahead,false}];
#{} -> []
end,
io:format("gen_tcp:connect(~p, ~p).~n", [Sockaddr, Opts]),
gen_tcp:connect(Sockaddr, Opts);
connect(Backend, Sockaddr, TC) ->
connect(#{ backend := Backend } = Params, Sockaddr, TC) ->
Opts =
[{inet_backend,Backend}, binary, {active,tc2active(TC)}, {packet,4},
?DOMAIN, {recbuf,?BUFSIZE}],
?DOMAIN, {recbuf,?BUFSIZE}] ++
case Params of
#{ read_ahead := false } -> [{read_ahead,false}];
#{} -> []
end,
io:format("gen_tcp:connect(~p, ~p).~n", [Sockaddr, Opts]),
gen_tcp:connect(Sockaddr, Opts).


close(direct, S) ->
close(#{ backend := direct }, S) ->
socket:close(S);
close(_, S) ->
close(#{ backend := _}, S) ->
gen_tcp:close(S).


report_MByte_s(Backend, {K, M}, TC, Size, Time) ->
report_MByte_s(
#{ backend := Backend, testdata_size := {K, M} } = Params, TC, Size, Time) ->
ct:log("Size: ~w. Time: ~w.", [Size, Time]),
Name = io_lib:format("~w 2^(~w+~w)-~w", [Backend, K, M, TC]),
ReadAhead =
case Params of
#{ read_ahead := false } -> nra;
#{} -> ra
end,
Name =
io_lib:format("~w-~w 2^(~w+~w)-~w", [Backend, ReadAhead, K, M, TC]),
{Value, Suffix} = size_and_suffix(Size * 1000 / Time),
report(Name, Value, Suffix++"Byte/s").

Expand Down Expand Up @@ -665,3 +710,16 @@ term_to_string(Term) ->
-compile({inline, [assert/2]}).
assert(X, X) -> ok;
assert(X, Y) -> error({assert, X, Y}).


-ifdef(undefined).
ts() -> ts(erlang:system_info(start_time)).

ts(TS) ->
erlang:convert_time_unit(st() - TS, native, microsecond).

st() -> erlang:monotonic_time().

dbg(Function, Line, Term) ->
erlang:display({self(), {?MODULE, Function, Line}, Term}).
-endif.

0 comments on commit 0e943f4

Please sign in to comment.