From 13d4a31011b23bed448bbcecf1033a729677d244 Mon Sep 17 00:00:00 2001 From: Raimo Niskanen Date: Tue, 20 Aug 2024 18:36:35 +0200 Subject: [PATCH 1/8] Re-implement gen_tcp_socket to not read ahead --- lib/kernel/src/gen_tcp_socket.erl | 352 ++++++++++++++++++++++++++++-- 1 file changed, 330 insertions(+), 22 deletions(-) diff --git a/lib/kernel/src/gen_tcp_socket.erl b/lib/kernel/src/gen_tcp_socket.erl index 6f208145a30c..26d266fcb9d5 100644 --- a/lib/kernel/src/gen_tcp_socket.erl +++ b/lib/kernel/src/gen_tcp_socket.erl @@ -1350,7 +1350,8 @@ server_read_opts() -> header => 0, deliver => term, start_opts => [], % Just to make it settable - line_delimiter => $\n}, + line_delimiter => $\n, + read_ahead => false}, server_read_write_opts()). -compile({inline, [server_write_opts/0]}). server_write_opts() -> @@ -1955,16 +1956,23 @@ handle_event(Type, Content, #connect{} = State, P_D) -> %% State: #connect{} %% ------- -%% State: 'connected' +%% State: 'connected' | #recv{} handle_event( {call, From}, {recv, Length, Timeout}, State, {P, D}) -> %% ?DBG([recv, {length, Length}, {timeout, Timeout}, {state, State}]), case State of 'connected' -> - handle_recv( - P, D#{recv_length => Length, recv_from => From}, - [{{timeout, recv}, Timeout, recv}]); + Packet = maps:get(packet, D), + if + Packet =/= raw, Packet =/= 0, 0 < Length -> + %% Nonzero Length not allowed in packet mode (non-raw) + {keep_state_and_data, [{reply, From, {error, einval}}]}; + true -> + handle_recv( + P, D#{recv_length => Length, recv_from => From}, + [{{timeout, recv}, Timeout, recv}]) + end; #recv{} -> case maps:get(active, D) of false -> @@ -1977,7 +1985,7 @@ handle_event( end end; -%% State: 'connected' +%% State: 'connected' | #recv{} %% ------- %% State: #recv{} @@ -2000,16 +2008,7 @@ handle_event( #recv{info = ?completion_info(CompletionRef)} = _State, {#params{socket = Socket} = P, D}) -> %% ?DBG(['completion msg', {socket, Socket}, {ref, CompletionRef}]), - case CompletionStatus of - {ok, <>} -> - D_1 = D#{buffer := buffer(Data, maps:get(buffer, D))}, - handle_recv(P, D_1, []); - {error, {Reason, <>}} -> - D_1 = D#{buffer := buffer(Data, maps:get(buffer, D))}, - handle_recv_error(P, D_1, [], Reason); - {error, Reason} -> - handle_recv_error(P, D, [], Reason) - end; + handle_recv(P, D, [], CompletionStatus); handle_event( info, ?socket_abort(Socket, CompletionRef, Reason), @@ -2259,6 +2258,294 @@ handle_connected(P, D, ActionsR) -> end. +handle_recv(P, D, ActionsR) -> + handle_recv(P, D, ActionsR, recv). + +handle_recv( + P, #{packet := Packet, recv_length := Length} = D, ActionsR, CS) -> + if + Packet =:= raw; + Packet =:= 0 -> + %% Length is 0 meaning "what's available" + %% or > 0 meaning exactly that many bytes + handle_recv_raw(P, D, ActionsR, Length, CS); + true -> + handle_recv_packet(P, D, ActionsR, CS) + end. + +handle_recv_raw(P, #{buffer := Buffer} = D, ActionsR, Length, CS) -> + handle_recv_raw(P, D, ActionsR, Length, Buffer, CS). + +handle_recv_raw(P, D, ActionsR, Length, Buffer, recv) -> + Size = iolist_size(Buffer), + if + 0 < Length, Length =< Size -> + %% We have more buffered than requested + %% + {Data, NewBuffer} = + split_binary(condense_buffer(Buffer), Length), + handle_recv_deliver(P, D#{buffer := NewBuffer}, [], Data); + + Length == 0, 0 < Size -> + %% We have some buffered and "what's available" requested + handle_recv_deliver( + P, D#{buffer := <<>>}, ActionsR, condense_buffer(Buffer)); + + true -> + %% Less buffered than requested + %% or empty buffer and "what's available" requested + %% + %% i.e Length == 0, Size == 0; + %% 0 < Length, Size < Length + %% In both cases this works: + N = Length - Size, + case socket_recv(P#params.socket, N) of + {ok, <>} -> + handle_recv_deliver( + P, D#{buffer := <<>>}, ActionsR, + condense_buffer(Data, Buffer)); + + {select, {?select_info(_) = SelectInfo, Data}} -> + if + 0 < Length -> + %% Need to wait for the rest of the data + {next_state, + #recv{info = SelectInfo}, + {P, D#{buffer := buffer(Data, Buffer)}}, + reverse(ActionsR)}; + true -> %% Length == 0, Size == 0 + %% We take what we just got + %% and cancel the async recv + Socket = P#params.socket, + case socket:cancel(Socket, SelectInfo) of + ok -> + handle_recv_deliver( + P, D, ActionsR, Data); + {error, Reason} -> + handle_recv_error( + P, D, ActionsR, Reason, Data) + end + end; + + {select, ?select_info(_) = SelectInfo} -> + %% ?DBG(['recv select']), + {next_state, + #recv{info = SelectInfo}, {P, D}, reverse(ActionsR)}; + + {completion, ?completion_info(_) = CompletionInfo} -> + %% ?DBG(['recv completion']), + {next_state, + #recv{info = CompletionInfo}, {P, D}, reverse(ActionsR)}; + + {error, {Reason, <>}} -> + %% ?DBG({'recv error', Reason, byte_size(Data)}), + if + 0 < Length -> + %% We didn't get all data we requested + handle_recv_error( + P, D#{buffer := buffer(Data, Buffer)}, + ActionsR, Reason); + true -> + %% Deliver what we got, then error + handle_recv_error(P, D, ActionsR, Reason, Data) + end; + {error, Reason} -> + %% ?DBG({'recv error', Reason}), + handle_recv_error(P, D, ActionsR, Reason) + end + end; +handle_recv_raw(P, D, ActionsR, Length, Buffer, CompletionStatus) -> + case CompletionStatus of + {ok, <>} -> + handle_recv_raw( + P, D, ActionsR, Length, buffer(Data, Buffer), recv); + {error, {Reason, <>}} -> + if + 0 < Length -> + %% We didn't get all data we requested + handle_recv_error( + P, D#{buffer := buffer(Data, Buffer)}, + ActionsR, Reason); + true -> + %% Deliver "what's available", then error + handle_recv_error(P, D, ActionsR, Reason, Data) + end; + {error, Reason} -> + handle_recv_error(P, D#{buffer := Buffer}, [], Reason) + end. + + +handle_recv_packet( + P, #{recv_length := Length, buffer := Buffer} = D, ActionsR, recv) -> + if + 0 < Length -> + %% We know how much we need for a packet + handle_recv_more(P, D, ActionsR, Length, Buffer); + true -> + handle_recv_decode(P, D, ActionsR, condense_buffer(Buffer)) + end; +handle_recv_packet(P, #{buffer := Buffer} = D, ActionsR, CompletionStatus) -> + case CompletionStatus of + {ok, <>} -> + handle_recv_decode( + P, D, ActionsR, condense_buffer(Data, Buffer)); + {error, {Reason, <>}} -> + handle_recv_error_decode( + P, D, ActionsR, Reason, condense_buffer(Data, Buffer)); + {error, Reason} -> + handle_recv_error(P, D, ActionsR, Reason) + end. + +handle_recv_more(P, D, ActionsR, Length, Buffer) -> + Size = iolist_size(Buffer), + if + Length =< Size -> + %% We have more buffered than we need + %% + handle_recv_decode(P, D, ActionsR, condense_buffer(Buffer)); + true -> + N = Length - Size, + case socket_recv(P#params.socket, N) of + {ok, <>} -> + handle_recv_decode( + P, D, ActionsR, condense_buffer(Data, Buffer)); + + {select, {?select_info(_) = SelectInfo, Data}} -> + %% Need to wait for the rest of the data + {next_state, + #recv{info = SelectInfo}, + {P, D#{buffer := buffer(Data, Buffer)}}, + reverse(ActionsR)}; + + {select, ?select_info(_) = SelectInfo} -> + %% ?DBG(['recv select']), + {next_state, + #recv{info = SelectInfo}, + {P, D#{buffer := Buffer}}, + reverse(ActionsR)}; + + {completion, ?completion_info(_) = CompletionInfo} -> + %% ?DBG(['recv completion']), + {next_state, + #recv{info = CompletionInfo}, + {P, D#{buffer := Buffer}}, + reverse(ActionsR)}; + + {error, {Reason, <>}} -> + %% ?DBG({'recv error', Reason, byte_size(Data)}), + handle_recv_error_decode( + P, D, ActionsR, Reason, condense_buffer(Data, Buffer)); + {error, Reason} -> + %% ?DBG({'recv error', Reason}), + handle_recv_error(P, D, ActionsR, Reason) + end + end. + +handle_recv_decode(P, D, ActionsR, Data) -> + %% ?DBG({}), + case decode_packet(D, Data) of + {D_1, ok, Decoded} -> + handle_recv_deliver(P, D_1, ActionsR, Decoded); + {D_1, more, Length} -> + handle_recv_more(P, D_1, ActionsR, Length, Data); + {D_1, error, invalid} -> + handle_recv_error(P, D_1, ActionsR, emsgsize); + {D_1, error, Reason} -> + handle_recv_error(P, D_1, ActionsR, Reason) + end. + +handle_recv_error_decode(P, D, ActionsR, Reason, Data) -> + case decode_packet(D, Data) of + {D_1, ok, Decoded} -> + handle_recv_error(P, D_1, ActionsR, Reason, Decoded); + {D_1, error, invalid} -> + handle_recv_error(P, D_1, ActionsR, emsgsize); + {D_1, _, _} -> + handle_recv_error(P, D_1, ActionsR, Reason) + end. + +decode_packet( + #{packet := (PacketType = line), + line_delimiter := LineDelimiter, + packet_size := PacketSize} = D, + Data) -> + %% + decode_packet( + D, Data, PacketType, + [{packet_size, PacketSize}, + {line_delimiter, LineDelimiter}, + {line_length, PacketSize}]); +decode_packet( + #{packet := http, + recv_httph := true, + packet_size := PacketSize} = D, + Data) -> + %% + decode_packet(D, Data, httph, [{packet_size, PacketSize}]); +decode_packet( + #{packet := http_bin, + recv_httph := true, + packet_size := PacketSize} = D, + Data) -> + %% + decode_packet(D, Data, httph_bin, [{packet_size, PacketSize}]); +decode_packet( + #{packet := PacketType, + packet_size := PacketSize} = D, + Data) -> + %% + decode_packet(D, Data, PacketType, [{packet_size, PacketSize}]). + +decode_packet(D, Data, PacketType, Options) -> + case + erlang:decode_packet(PacketType, Data, Options) + of + {ok, Decoded, Rest} -> + %% ?DBG({ok, PacketType, byte_size(Decoded)}), + {D#{buffer := Rest}, ok, Decoded}; + Other -> + decode_packet_common(D, Data, PacketType, Other) + end. + +decode_packet_common(D, Data, PacketType, Other) -> + case Other of + {more, undefined} -> + Length = packet_header_length(PacketType, Data), + {D, more, Length}; + {more, Length} -> + {D#{recv_length := Length}, more, Length}; + {error, Reason} -> + {D, error, Reason} + end. + +packet_header_length(PacketType, Data) -> + case PacketType of + raw -> error(badarg, [PacketType, Data]); + 0 -> error(badarg, [PacketType, Data]); + 1 -> 1; + 2 -> 2; + 4 -> 4; + cdr -> 12; + sunrm -> 4; + fcgi -> 8; + tpkt -> 4; + ssl -> 5; + ssl_tls -> 5; + %% For these variable length headers/footers we guess one more + %% since this function is only called if Data is too short, + %% which can be very inefficient, but we consider it to be + %% misuse to combine read_ahead=false with such header types + asn1 -> + max(2, iolist_size(Data) + 1); + _ -> % http, line, etc + iolist_size(Data) + 1 + end. + + + + + +-ifdef(undefined). handle_recv( P, #{packet := Packet, recv_length := Length, buffer := Buffer} = D, ActionsR) @@ -2409,14 +2696,14 @@ decode_packet(D, Other) -> {error, Reason} -> {D, error, Reason} end. +-endif. + + handle_recv_deliver(P, D, ActionsR, Data) -> handle_connected(P, recv_data_deliver(P, D, ActionsR, Data)). -handle_recv_error(P, {D, ActionsR}, Reason) -> - handle_recv_error(P, D, ActionsR, Reason). -%% handle_recv_error(P, D, ActionsR, Reason) -> handle_recv_error(P, D, ActionsR, Reason, undefined). %% @@ -2545,8 +2832,18 @@ handle_active(P, D, State, ActionsR) -> case State of 'connected' -> handle_connected(P, D, reverse(ActionsR)); + #recv{info = Info} -> + case D of + #{active := false} -> + %% Cancel recv in progress + _ = socket_cancel(P#params.socket, Info), + {next_state, 'connected', + {P, D}, reverse(ActionsR)}; + #{active := _} -> + {keep_state, {P, D}, reverse(ActionsR)} + end; _ -> - {next_state, State, {P, D}, reverse(ActionsR)} + {keep_state, {P, D}, reverse(ActionsR)} end. %% ------------------------------------------------------------------------- @@ -2652,14 +2949,23 @@ next_packet(D, Packet, Data, Active) -> buffer(Data, <<>>) -> Data; buffer(Data, Buffer) -> - [Data | Buffer]. + if + is_binary(Buffer) -> + [Data, Buffer]; + is_list(Buffer) -> + [Data | Buffer] + end. %% Condense buffer into a Binary -compile({inline, [condense_buffer/1]}). condense_buffer(Bin) when is_binary(Bin) -> Bin; condense_buffer([Bin]) when is_binary(Bin) -> Bin; condense_buffer(Buffer) -> - iolist_to_binary(reverse_improper(Buffer, [])). + iolist_to_binary(reverse(Buffer)). + +condense_buffer(Data, Buffer) -> + condense_buffer(buffer(Data, Buffer)). + deliver_data(Data, Mode, Header, Packet) -> if @@ -3117,11 +3423,13 @@ reverse([A], L) -> [A | L]; reverse([A, B], L) -> [B, A | L]; reverse(L1, L2) -> lists:reverse(L1, L2). +-ifdef(undefined). %% Reverse but allow improper list reverse_improper([H | T], Acc) -> reverse_improper(T, [H | Acc]); reverse_improper([], Acc) -> Acc; reverse_improper(T, Acc) -> [T | Acc]. +-endif. is_map_keys([], #{}) -> false; From a4104972575a8fb48e232fb9d745db42f82fdf94 Mon Sep 17 00:00:00 2001 From: Raimo Niskanen Date: Wed, 21 Aug 2024 16:53:01 +0200 Subject: [PATCH 2/8] Implement read_ahead option for gen_tcp_socket --- lib/kernel/src/gen_tcp_socket.erl | 187 +++++++++++++++++++----------- 1 file changed, 122 insertions(+), 65 deletions(-) diff --git a/lib/kernel/src/gen_tcp_socket.erl b/lib/kernel/src/gen_tcp_socket.erl index 26d266fcb9d5..b8abab3b4597 100644 --- a/lib/kernel/src/gen_tcp_socket.erl +++ b/lib/kernel/src/gen_tcp_socket.erl @@ -2277,16 +2277,15 @@ handle_recv_raw(P, #{buffer := Buffer} = D, ActionsR, Length, CS) -> handle_recv_raw(P, D, ActionsR, Length, Buffer, CS). handle_recv_raw(P, D, ActionsR, Length, Buffer, recv) -> - Size = iolist_size(Buffer), + BufferSize = iolist_size(Buffer), if - 0 < Length, Length =< Size -> + 0 < Length, Length =< BufferSize -> %% We have more buffered than requested - %% {Data, NewBuffer} = split_binary(condense_buffer(Buffer), Length), handle_recv_deliver(P, D#{buffer := NewBuffer}, [], Data); - Length == 0, 0 < Size -> + Length == 0, 0 < BufferSize -> %% We have some buffered and "what's available" requested handle_recv_deliver( P, D#{buffer := <<>>}, ActionsR, condense_buffer(Buffer)); @@ -2295,36 +2294,37 @@ handle_recv_raw(P, D, ActionsR, Length, Buffer, recv) -> %% Less buffered than requested %% or empty buffer and "what's available" requested %% - %% i.e Length == 0, Size == 0; - %% 0 < Length, Size < Length + %% i.e Length == 0, BufferSize == 0; + %% 0 < Length, BufferSize < Length %% In both cases this works: - N = Length - Size, - case socket_recv(P#params.socket, N) of + N = Length - BufferSize, % How much to recv + case socket_recv(P#params.socket, read_size(D, N)) of {ok, <>} -> - handle_recv_deliver( - P, D#{buffer := <<>>}, ActionsR, - condense_buffer(Data, Buffer)); + handle_recv_raw( + P, D, ActionsR, Length, buffer(Data, Buffer), recv); - {select, {?select_info(_) = SelectInfo, Data}} -> + {select, {?select_info(_) = SelectInfo, <>}} -> if - 0 < Length -> - %% Need to wait for the rest of the data - {next_state, - #recv{info = SelectInfo}, - {P, D#{buffer := buffer(Data, Buffer)}}, - reverse(ActionsR)}; - true -> %% Length == 0, Size == 0 - %% We take what we just got - %% and cancel the async recv + Length =< BufferSize + byte_size(Data) -> + %% Enough data; cancel the async recv + %% and use what we have Socket = P#params.socket, case socket:cancel(Socket, SelectInfo) of ok -> - handle_recv_deliver( - P, D, ActionsR, Data); + handle_recv_raw( + P, D, ActionsR, Length, + buffer(Data, Buffer), recv); {error, Reason} -> - handle_recv_error( - P, D, ActionsR, Reason, Data) - end + handle_recv_raw_error_deliver( + P, D, ActionsR, Length, + Reason, condense_buffer(Data, Buffer)) + end; + true -> + %% Need to wait for the rest of the data + {next_state, + #recv{info = SelectInfo}, + {P, D#{buffer := buffer(Data, Buffer)}}, + reverse(ActionsR)} end; {select, ?select_info(_) = SelectInfo} -> @@ -2340,15 +2340,17 @@ handle_recv_raw(P, D, ActionsR, Length, Buffer, recv) -> {error, {Reason, <>}} -> %% ?DBG({'recv error', Reason, byte_size(Data)}), if - 0 < Length -> - %% We didn't get all data we requested + Length < BufferSize + byte_size(Data) -> + %% Enough data + handle_recv_raw_error_deliver( + P, D, ActionsR, Length, + Reason, condense_buffer(Data, Buffer)); + true -> handle_recv_error( P, D#{buffer := buffer(Data, Buffer)}, - ActionsR, Reason); - true -> - %% Deliver what we got, then error - handle_recv_error(P, D, ActionsR, Reason, Data) + ActionsR, Reason) end; + {error, Reason} -> %% ?DBG({'recv error', Reason}), handle_recv_error(P, D, ActionsR, Reason) @@ -2368,54 +2370,90 @@ handle_recv_raw(P, D, ActionsR, Length, Buffer, CompletionStatus) -> ActionsR, Reason); true -> %% Deliver "what's available", then error - handle_recv_error(P, D, ActionsR, Reason, Data) + handle_recv_error( + P, recv_data_deliver(P, D, ActionsR, Data), Reason) end; {error, Reason} -> handle_recv_error(P, D#{buffer := Buffer}, [], Reason) end. +handle_recv_raw_error_deliver(P, D, ActionsR, Length, Reason, Data) -> + if + 0 < Length -> % Request length + {NewData, NewBuffer} = split_binary(Data, Length), + handle_recv_error( + P, + recv_data_deliver( + P, D#{buffer := NewBuffer}, ActionsR, NewData), + Reason); + Data =/= <<>> -> % Length == 0 + handle_recv_error( + P, recv_data_deliver(P, D#{buffer := <<>>}, ActionsR, Data), + Reason); + Data =:= <<>> -> % Length == 0 + handle_recv_error(P, D#{buffer := Data}, ActionsR, Reason) + end. + handle_recv_packet( P, #{recv_length := Length, buffer := Buffer} = D, ActionsR, recv) -> if 0 < Length -> %% We know how much we need for a packet - handle_recv_more(P, D, ActionsR, Length, Buffer); + handle_recv_packet_more(P, D, ActionsR, Length, Buffer); true -> - handle_recv_decode(P, D, ActionsR, condense_buffer(Buffer)) + handle_recv_packet_decode(P, D, ActionsR, condense_buffer(Buffer)) end; handle_recv_packet(P, #{buffer := Buffer} = D, ActionsR, CompletionStatus) -> case CompletionStatus of {ok, <>} -> - handle_recv_decode( + handle_recv_packet_decode( P, D, ActionsR, condense_buffer(Data, Buffer)); {error, {Reason, <>}} -> - handle_recv_error_decode( + handle_recv_packet_error_decode( P, D, ActionsR, Reason, condense_buffer(Data, Buffer)); {error, Reason} -> handle_recv_error(P, D, ActionsR, Reason) end. -handle_recv_more(P, D, ActionsR, Length, Buffer) -> - Size = iolist_size(Buffer), +handle_recv_packet_more(P, D, ActionsR, Length, Buffer) -> + BufferSize = iolist_size(Buffer), if - Length =< Size -> + Length =< BufferSize -> %% We have more buffered than we need %% - handle_recv_decode(P, D, ActionsR, condense_buffer(Buffer)); + handle_recv_packet_decode( + P, D, ActionsR, condense_buffer(Buffer)); true -> - N = Length - Size, - case socket_recv(P#params.socket, N) of + N = Length - BufferSize, + case socket_recv(P#params.socket, read_size(D, N)) of {ok, <>} -> - handle_recv_decode( - P, D, ActionsR, condense_buffer(Data, Buffer)); + handle_recv_packet_more( + P, D, ActionsR, Length, buffer(Data, Buffer)); {select, {?select_info(_) = SelectInfo, Data}} -> - %% Need to wait for the rest of the data - {next_state, - #recv{info = SelectInfo}, - {P, D#{buffer := buffer(Data, Buffer)}}, - reverse(ActionsR)}; + if + Length =< BufferSize + byte_size(Data) -> + %% Enough data; cancel the async recv + %% and use what we have + Socket = P#params.socket, + case socket:cancel(Socket, SelectInfo) of + ok -> + handle_recv_packet_more( + P, D, ActionsR, Length, + buffer(Data, Buffer)); + {error, Reason} -> + handle_recv_packet_error_decode( + P, D, ActionsR, Reason, + condense_buffer(Data, Buffer)) + end; + true -> + %% Need to wait for the rest of the data + {next_state, + #recv{info = SelectInfo}, + {P, D#{buffer := buffer(Data, Buffer)}}, + reverse(ActionsR)} + end; {select, ?select_info(_) = SelectInfo} -> %% ?DBG(['recv select']), @@ -2433,31 +2471,41 @@ handle_recv_more(P, D, ActionsR, Length, Buffer) -> {error, {Reason, <>}} -> %% ?DBG({'recv error', Reason, byte_size(Data)}), - handle_recv_error_decode( - P, D, ActionsR, Reason, condense_buffer(Data, Buffer)); + if + Length < BufferSize + byte_size(Data) -> + %% Enough data + handle_recv_packet_error_decode( + P, D, ActionsR, Reason, + condense_buffer(Data, Buffer)); + true -> + handle_recv_error( + P, D#{buffer := buffer(Data, Buffer)}, + ActionsR, Reason) + end; {error, Reason} -> %% ?DBG({'recv error', Reason}), handle_recv_error(P, D, ActionsR, Reason) end end. -handle_recv_decode(P, D, ActionsR, Data) -> +handle_recv_packet_decode(P, D, ActionsR, Data) -> %% ?DBG({}), case decode_packet(D, Data) of {D_1, ok, Decoded} -> handle_recv_deliver(P, D_1, ActionsR, Decoded); {D_1, more, Length} -> - handle_recv_more(P, D_1, ActionsR, Length, Data); + handle_recv_packet_more(P, D_1, ActionsR, Length, Data); {D_1, error, invalid} -> handle_recv_error(P, D_1, ActionsR, emsgsize); {D_1, error, Reason} -> handle_recv_error(P, D_1, ActionsR, Reason) end. -handle_recv_error_decode(P, D, ActionsR, Reason, Data) -> +handle_recv_packet_error_decode(P, D, ActionsR, Reason, Data) -> case decode_packet(D, Data) of {D_1, ok, Decoded} -> - handle_recv_error(P, D_1, ActionsR, Reason, Decoded); + handle_recv_error( + P, recv_data_deliver(P, D_1, ActionsR, Decoded), Reason); {D_1, error, invalid} -> handle_recv_error(P, D_1, ActionsR, emsgsize); {D_1, _, _} -> @@ -2497,18 +2545,10 @@ decode_packet( decode_packet(D, Data, PacketType, [{packet_size, PacketSize}]). decode_packet(D, Data, PacketType, Options) -> - case - erlang:decode_packet(PacketType, Data, Options) - of + case erlang:decode_packet(PacketType, Data, Options) of {ok, Decoded, Rest} -> %% ?DBG({ok, PacketType, byte_size(Decoded)}), {D#{buffer := Rest}, ok, Decoded}; - Other -> - decode_packet_common(D, Data, PacketType, Other) - end. - -decode_packet_common(D, Data, PacketType, Other) -> - case Other of {more, undefined} -> Length = packet_header_length(PacketType, Data), {D, more, Length}; @@ -2542,6 +2582,20 @@ packet_header_length(PacketType, Data) -> end. +%% How much to read given read_ahead, {otp,rcvbuf} and request size N +read_size(D, N) -> + case + N == 0 % "What's available" requested + orelse + (maps:get(read_ahead, D) % Read ahead configured + andalso + %% Request smaller than rcvbuf + N < maps:get({otp,rcvbuf}, D, ?RECV_BUFFER_SIZE_DEFAULT)) + of + true -> 0; + false -> N + end. + @@ -2704,6 +2758,9 @@ handle_recv_deliver(P, D, ActionsR, Data) -> handle_connected(P, recv_data_deliver(P, D, ActionsR, Data)). +handle_recv_error(P, {D, ActionsR}, Reason) -> + handle_recv_error(P, D, ActionsR, Reason). + handle_recv_error(P, D, ActionsR, Reason) -> handle_recv_error(P, D, ActionsR, Reason, undefined). %% From 85b57f96d56275ed499bd7f62b309df7ede01d22 Mon Sep 17 00:00:00 2001 From: Raimo Niskanen Date: Fri, 23 Aug 2024 17:32:30 +0200 Subject: [PATCH 3/8] Honour `{read_ahead,true}` and make it default again --- lib/kernel/src/gen_tcp_socket.erl | 526 ++++++++---------------------- 1 file changed, 139 insertions(+), 387 deletions(-) diff --git a/lib/kernel/src/gen_tcp_socket.erl b/lib/kernel/src/gen_tcp_socket.erl index b8abab3b4597..3998f9a5b803 100644 --- a/lib/kernel/src/gen_tcp_socket.erl +++ b/lib/kernel/src/gen_tcp_socket.erl @@ -2261,257 +2261,169 @@ handle_connected(P, D, ActionsR) -> handle_recv(P, D, ActionsR) -> handle_recv(P, D, ActionsR, recv). -handle_recv( - P, #{packet := Packet, recv_length := Length} = D, ActionsR, CS) -> - if - Packet =:= raw; - Packet =:= 0 -> - %% Length is 0 meaning "what's available" - %% or > 0 meaning exactly that many bytes - handle_recv_raw(P, D, ActionsR, Length, CS); - true -> - handle_recv_packet(P, D, ActionsR, CS) - end. - -handle_recv_raw(P, #{buffer := Buffer} = D, ActionsR, Length, CS) -> - handle_recv_raw(P, D, ActionsR, Length, Buffer, CS). - -handle_recv_raw(P, D, ActionsR, Length, Buffer, recv) -> +%% CS is: +%% recv -> no async recv in progress - either a new +%% socket passive mode recv request, an +%% active mode socket reached 'connected', +%% or a select message was received +%% so it is time for a retry +%% {recv, Reason} -> a socket operation has failed, but we will handle +%% the buffer content before returning the error +%% CompletionStatus -> the result of an async recv operation +%% +handle_recv(P, #{buffer := Buffer} = D, ActionsR, CS) -> BufferSize = iolist_size(Buffer), - if - 0 < Length, Length =< BufferSize -> - %% We have more buffered than requested - {Data, NewBuffer} = - split_binary(condense_buffer(Buffer), Length), - handle_recv_deliver(P, D#{buffer := NewBuffer}, [], Data); - - Length == 0, 0 < BufferSize -> - %% We have some buffered and "what's available" requested - handle_recv_deliver( - P, D#{buffer := <<>>}, ActionsR, condense_buffer(Buffer)); - - true -> - %% Less buffered than requested - %% or empty buffer and "what's available" requested - %% - %% i.e Length == 0, BufferSize == 0; - %% 0 < Length, BufferSize < Length - %% In both cases this works: - N = Length - BufferSize, % How much to recv - case socket_recv(P#params.socket, read_size(D, N)) of - {ok, <>} -> - handle_recv_raw( - P, D, ActionsR, Length, buffer(Data, Buffer), recv); - - {select, {?select_info(_) = SelectInfo, <>}} -> - if - Length =< BufferSize + byte_size(Data) -> - %% Enough data; cancel the async recv - %% and use what we have - Socket = P#params.socket, - case socket:cancel(Socket, SelectInfo) of - ok -> - handle_recv_raw( - P, D, ActionsR, Length, - buffer(Data, Buffer), recv); - {error, Reason} -> - handle_recv_raw_error_deliver( - P, D, ActionsR, Length, - Reason, condense_buffer(Data, Buffer)) - end; - true -> - %% Need to wait for the rest of the data - {next_state, - #recv{info = SelectInfo}, - {P, D#{buffer := buffer(Data, Buffer)}}, - reverse(ActionsR)} - end; - - {select, ?select_info(_) = SelectInfo} -> - %% ?DBG(['recv select']), - {next_state, - #recv{info = SelectInfo}, {P, D}, reverse(ActionsR)}; + case CS of + recv -> + handle_recv(P, D, ActionsR, Buffer, BufferSize, CS); + {recv, _} -> + handle_recv(P, D, ActionsR, Buffer, BufferSize, CS); - {completion, ?completion_info(_) = CompletionInfo} -> - %% ?DBG(['recv completion']), - {next_state, - #recv{info = CompletionInfo}, {P, D}, reverse(ActionsR)}; - - {error, {Reason, <>}} -> - %% ?DBG({'recv error', Reason, byte_size(Data)}), - if - Length < BufferSize + byte_size(Data) -> - %% Enough data - handle_recv_raw_error_deliver( - P, D, ActionsR, Length, - Reason, condense_buffer(Data, Buffer)); - true -> - handle_recv_error( - P, D#{buffer := buffer(Data, Buffer)}, - ActionsR, Reason) - end; - - {error, Reason} -> - %% ?DBG({'recv error', Reason}), - handle_recv_error(P, D, ActionsR, Reason) - end - end; -handle_recv_raw(P, D, ActionsR, Length, Buffer, CompletionStatus) -> - case CompletionStatus of + %% CompletionStatus {ok, <>} -> - handle_recv_raw( - P, D, ActionsR, Length, buffer(Data, Buffer), recv); + handle_recv( + P, D, ActionsR, buffer(Data, Buffer), + BufferSize + byte_size(Data), recv); {error, {Reason, <>}} -> - if - 0 < Length -> - %% We didn't get all data we requested - handle_recv_error( - P, D#{buffer := buffer(Data, Buffer)}, - ActionsR, Reason); - true -> - %% Deliver "what's available", then error - handle_recv_error( - P, recv_data_deliver(P, D, ActionsR, Data), Reason) - end; - {error, Reason} -> - handle_recv_error(P, D#{buffer := Buffer}, [], Reason) - end. - -handle_recv_raw_error_deliver(P, D, ActionsR, Length, Reason, Data) -> - if - 0 < Length -> % Request length - {NewData, NewBuffer} = split_binary(Data, Length), - handle_recv_error( - P, - recv_data_deliver( - P, D#{buffer := NewBuffer}, ActionsR, NewData), - Reason); - Data =/= <<>> -> % Length == 0 - handle_recv_error( - P, recv_data_deliver(P, D#{buffer := <<>>}, ActionsR, Data), - Reason); - Data =:= <<>> -> % Length == 0 - handle_recv_error(P, D#{buffer := Data}, ActionsR, Reason) - end. - - -handle_recv_packet( - P, #{recv_length := Length, buffer := Buffer} = D, ActionsR, recv) -> - if - 0 < Length -> - %% We know how much we need for a packet - handle_recv_packet_more(P, D, ActionsR, Length, Buffer); - true -> - handle_recv_packet_decode(P, D, ActionsR, condense_buffer(Buffer)) - end; -handle_recv_packet(P, #{buffer := Buffer} = D, ActionsR, CompletionStatus) -> - case CompletionStatus of - {ok, <>} -> - handle_recv_packet_decode( - P, D, ActionsR, condense_buffer(Data, Buffer)); - {error, {Reason, <>}} -> - handle_recv_packet_error_decode( - P, D, ActionsR, Reason, condense_buffer(Data, Buffer)); + handle_recv( + P, D, ActionsR, buffer(Data, Buffer), + BufferSize + byte_size(Data), {recv, Reason}); {error, Reason} -> handle_recv_error(P, D, ActionsR, Reason) end. -handle_recv_packet_more(P, D, ActionsR, Length, Buffer) -> - BufferSize = iolist_size(Buffer), +handle_recv( + P, #{packet := Packet, recv_length := Length} = D, + ActionsR, Buffer, BufferSize, CS) -> if - Length =< BufferSize -> - %% We have more buffered than we need - %% - handle_recv_packet_decode( - P, D, ActionsR, condense_buffer(Buffer)); - true -> - N = Length - BufferSize, - case socket_recv(P#params.socket, read_size(D, N)) of - {ok, <>} -> - handle_recv_packet_more( - P, D, ActionsR, Length, buffer(Data, Buffer)); - - {select, {?select_info(_) = SelectInfo, Data}} -> - if - Length =< BufferSize + byte_size(Data) -> - %% Enough data; cancel the async recv - %% and use what we have - Socket = P#params.socket, - case socket:cancel(Socket, SelectInfo) of - ok -> - handle_recv_packet_more( - P, D, ActionsR, Length, - buffer(Data, Buffer)); - {error, Reason} -> - handle_recv_packet_error_decode( - P, D, ActionsR, Reason, - condense_buffer(Data, Buffer)) - end; - true -> - %% Need to wait for the rest of the data - {next_state, - #recv{info = SelectInfo}, - {P, D#{buffer := buffer(Data, Buffer)}}, - reverse(ActionsR)} - end; + BufferSize < Length; + Length == 0, BufferSize == 0 -> + case CS of + recv -> + handle_recv_more( + P, D, ActionsR, Buffer, BufferSize, Length); + {recv, Reason} -> + handle_recv_error(P, D, ActionsR, Reason) + end; - {select, ?select_info(_) = SelectInfo} -> - %% ?DBG(['recv select']), - {next_state, - #recv{info = SelectInfo}, - {P, D#{buffer := Buffer}}, - reverse(ActionsR)}; + Packet =:= raw; + Packet =:= 0 -> + Data = condense_buffer(Buffer), + {Data_1, Buffer_1} = + if + 0 < Length -> %% Length =< BufferSize + split_binary(Data, Length); + true -> %% Length == 0, 0 < BufferSize + {Data, <<>>} + end, + D_1 = D#{buffer := Buffer_1}, + case CS of + recv -> + handle_connected( + P, recv_data_deliver(P, D_1, ActionsR, Data_1)); + {recv, Reason} -> + handle_recv_error( + P, recv_data_deliver(P, D_1, ActionsR, Data_1), Reason) + end; - {completion, ?completion_info(_) = CompletionInfo} -> - %% ?DBG(['recv completion']), - {next_state, - #recv{info = CompletionInfo}, - {P, D#{buffer := Buffer}}, - reverse(ActionsR)}; - - {error, {Reason, <>}} -> - %% ?DBG({'recv error', Reason, byte_size(Data)}), - if - Length < BufferSize + byte_size(Data) -> - %% Enough data - handle_recv_packet_error_decode( - P, D, ActionsR, Reason, - condense_buffer(Data, Buffer)); - true -> - handle_recv_error( - P, D#{buffer := buffer(Data, Buffer)}, - ActionsR, Reason) - end; - {error, Reason} -> - %% ?DBG({'recv error', Reason}), - handle_recv_error(P, D, ActionsR, Reason) - end + true -> + Data = condense_buffer(Buffer), + handle_recv_packet(P, D, ActionsR, Data, BufferSize, CS) end. -handle_recv_packet_decode(P, D, ActionsR, Data) -> - %% ?DBG({}), +handle_recv_packet(P, D, ActionsR, Data, Size, recv) -> case decode_packet(D, Data) of {D_1, ok, Decoded} -> - handle_recv_deliver(P, D_1, ActionsR, Decoded); + handle_connected( + P, recv_data_deliver(P, D_1, ActionsR, Decoded)); {D_1, more, Length} -> - handle_recv_packet_more(P, D_1, ActionsR, Length, Data); + handle_recv_more( + P, D_1, ActionsR, Data, Size, Length); {D_1, error, invalid} -> handle_recv_error(P, D_1, ActionsR, emsgsize); {D_1, error, Reason} -> handle_recv_error(P, D_1, ActionsR, Reason) - end. - -handle_recv_packet_error_decode(P, D, ActionsR, Reason, Data) -> + end; +handle_recv_packet( + P, D, ActionsR, Data, _Size, {recv, Reason}) -> case decode_packet(D, Data) of {D_1, ok, Decoded} -> handle_recv_error( - P, recv_data_deliver(P, D_1, ActionsR, Decoded), Reason); - {D_1, error, invalid} -> - handle_recv_error(P, D_1, ActionsR, emsgsize); - {D_1, _, _} -> + P, recv_data_deliver(P, D_1, ActionsR, Decoded), + Reason); + {D_1, more, _} -> + handle_recv_error(P, D_1, ActionsR, Reason); + {D_1, error, _} -> handle_recv_error(P, D_1, ActionsR, Reason) end. + +handle_recv_more( + P, D, ActionsR, Buffer, BufferSize, Length) -> + %% Less buffered than requested + %% or nothing buffered and "what's available"|unknown requested + %% + %% I.e 0 < BufferSize, BufferSize < Length; + %% BufferSize == 0, Length == 0 -> + %% In both cases this works: + N = Length - BufferSize, % How much to recv + case socket_recv(P#params.socket, read_size(D, N)) of + {ok, <>} -> + handle_recv( + P, D, ActionsR, buffer(Data, Buffer), + BufferSize + byte_size(Data), recv); + + {select, {?select_info(_) = SelectInfo, <>}} -> + Buffer_1 = buffer(Data, Buffer), + BufferSize_1 = BufferSize + byte_size(Data), + if + Length =< BufferSize_1 -> + %% Enough data; cancel the async recv + %% and use what we have + Socket = P#params.socket, + case socket:cancel(Socket, SelectInfo) of + ok -> + handle_recv( + P, D, ActionsR, Buffer_1, BufferSize_1, recv); + {error, Reason} -> + handle_recv( + P, D, ActionsR, Buffer_1, BufferSize_1, + {recv, Reason}) + end; + true -> + %% Need to wait for the rest of the data + {next_state, + #recv{info = SelectInfo}, + {P,D#{buffer := Buffer_1}}, + reverse(ActionsR)} + end; + + {select, ?select_info(_) = SelectInfo} -> + %% ?DBG(['recv select']), + {next_state, + #recv{info = SelectInfo}, + {P, D#{buffer := Buffer}}, + reverse(ActionsR)}; + + {completion, ?completion_info(_) = CompletionInfo} -> + %% ?DBG(['recv completion']), + {next_state, + #recv{info = CompletionInfo}, + {P, D#{buffer := Buffer}}, + reverse(ActionsR)}; + + {error, {Reason, <>}} -> + %% ?DBG({'recv error', Reason, byte_size(Data)}), + handle_recv( + P, D, ActionsR, buffer(Data, Buffer), + BufferSize + byte_size(Data), {recv, Reason}); + + {error, Reason} -> + %% ?DBG({'recv error', Reason}), + handle_recv_error(P, D, ActionsR, Reason) + end. + + decode_packet( #{packet := (PacketType = line), line_delimiter := LineDelimiter, @@ -2597,167 +2509,6 @@ read_size(D, N) -> end. - - --ifdef(undefined). -handle_recv( - P, #{packet := Packet, recv_length := Length, buffer := Buffer} = D, - ActionsR) - when Packet =:= raw; - Packet =:= 0 -> - Size = iolist_size(Buffer), - if - 0 < Length, Length =< Size -> - %% Deliver part of buffered data - {Data, NewBuffer} = - split_binary(condense_buffer(Buffer), Length), - handle_recv_deliver( - P, D#{buffer := NewBuffer}, [], Data); - 0 < Length -> - %% Need to receive more data - handle_recv_more(P, D, Size - Length, ActionsR); - 0 < Size -> % Length == 0 - %% Deliver all buffered data - Data = condense_buffer(Buffer), - handle_recv_deliver(P, D#{buffer := <<>>}, ActionsR, Data); - true -> % Length == 0, Size == 0 - %% Need to receive more data - handle_recv_more(P, D, Length, ActionsR) - end; -handle_recv(P, D, ActionsR) -> - handle_recv_packet(P, D, ActionsR). - -handle_recv_more(P, #{buffer := Buffer} = D, Length, ActionsR) -> - Size = maps:get({otp,rcvbuf}, D, ?RECV_BUFFER_SIZE_DEFAULT), - case - socket_recv( - P#params.socket, - if - Size < Length -> Length; - true -> 0 - end) - of - {ok, <>} -> - %% ?DBG({socket_recv, Length, byte_size(Data)}), - handle_recv(P, D#{buffer := [Data | Buffer]}, ActionsR); - {select, {?select_info(_) = SelectInfo, <>}} -> - %% ?DBG({socket_recv, Length, - %% {select_info, SelectInfo, byte_size(Data)}}), - {next_state, - #recv{info = SelectInfo}, - {P, D#{buffer := [Data | Buffer]}}, - reverse(ActionsR)}; - {select, ?select_info(_) = SelectInfo} -> - %% ?DBG({socket_recv, Length, {select_info, SelectInfo}}), - {next_state, - #recv{info = SelectInfo}, - {P, D}, - reverse(ActionsR)}; - {completion, ?completion_info(_) = CompletionInfo} -> - %% ?DBG({socket_recv, Length, {completion_info, CompletionInfo}}), - {next_state, - #recv{info = CompletionInfo}, - {P, D}, - reverse(ActionsR)}; - {error, {Reason, <>}} -> - %% ?DBG({socket_recv, Length, {error, Reason, byte_size(Data)}}), - handle_recv_error_packet( - P, D#{buffer := [Data | Buffer]}, ActionsR, Reason); - {error, Reason} -> - %% ?DBG({socket_recv, Length, {error, Reason}}), - handle_recv_error(P, D, ActionsR, Reason) - end. - -handle_recv_packet(P, D, ActionsR) -> - %% ?DBG({}), - case decode_packet(D) of - {D_1, ok, Decoded} -> - handle_recv_deliver(P, D_1, ActionsR, Decoded); - {D_1, more, Missing} -> - handle_recv_more(P, D_1, Missing, ActionsR); - {D_1, error, invalid} -> - handle_recv_error(P, D_1, ActionsR, emsgsize); - {D_1, error, Reason} -> - handle_recv_error(P, D_1, ActionsR, Reason) - end. - -handle_recv_error_packet(P, D, ActionsR, Reason) -> - case decode_packet(D) of - {D_1, ok, Decoded} -> - handle_recv_error( - P, recv_data_deliver(P, D_1, ActionsR, Decoded), - Reason); - {D_1, error, invalid} -> - handle_recv_error(P, D_1, ActionsR, emsgsize); - {D_1, _, _} -> - handle_recv_error(P, D_1, ActionsR, Reason) - end. - -decode_packet( - #{packet := (PacketType = line), - line_delimiter := LineDelimiter, - packet_size := PacketSize, - buffer := Buffer} = D) -> - %% - decode_packet( - D, PacketType, Buffer, - [{packet_size, PacketSize}, - {line_delimiter, LineDelimiter}, - {line_length, PacketSize}]); -decode_packet( - #{packet := http, - recv_httph := true, - packet_size := PacketSize, - buffer := Buffer} = D) -> - %% - decode_packet(D, httph, Buffer, [{packet_size, PacketSize}]); -decode_packet( - #{packet := http_bin, - recv_httph := true, - packet_size := PacketSize, - buffer := Buffer} = D) -> - %% - decode_packet(D, httph_bin, Buffer, [{packet_size, PacketSize}]); -decode_packet( - #{packet := PacketType, - packet_size := PacketSize, - buffer := Buffer} = D) -> - %% - decode_packet(D, PacketType, Buffer, [{packet_size, PacketSize}]). -%% -decode_packet(D, PacketType, Buffer, Options) -> - CondensedBuffer = condense_buffer(Buffer), - case - erlang:decode_packet(PacketType, CondensedBuffer, Options) - of - {ok, Decoded, Rest} -> - %% ?DBG({ok, PacketType, byte_size(Decoded)}), - {D#{buffer := Rest}, ok, Decoded}; - Other when is_binary(Buffer) -> - %% ?DBG({Other, PacketType, byte_size(CondensedBuffer)}), - decode_packet(D, Other); - Other when is_list(Buffer) -> - %% ?DBG({Other, PacketType, byte_size(CondensedBuffer)}), - decode_packet(D#{buffer := CondensedBuffer}, Other) - end. -%% -decode_packet(D, Other) -> - case Other of - {more, undefined} -> - {D, more, 0}; - {more, Length} -> - {D, more, Length - byte_size(maps:get(buffer, D))}; - {error, Reason} -> - {D, error, Reason} - end. --endif. - - - -handle_recv_deliver(P, D, ActionsR, Data) -> - handle_connected(P, recv_data_deliver(P, D, ActionsR, Data)). - - handle_recv_error(P, {D, ActionsR}, Reason) -> handle_recv_error(P, D, ActionsR, Reason). @@ -3020,9 +2771,10 @@ condense_buffer([Bin]) when is_binary(Bin) -> Bin; condense_buffer(Buffer) -> iolist_to_binary(reverse(Buffer)). +-ifdef(undefined). condense_buffer(Data, Buffer) -> condense_buffer(buffer(Data, Buffer)). - +-endif. deliver_data(Data, Mode, Header, Packet) -> if From b6a3b732dbc59a944b61196dd507b22147a1e078 Mon Sep 17 00:00:00 2001 From: Raimo Niskanen Date: Thu, 29 Aug 2024 17:40:51 +0200 Subject: [PATCH 4/8] Try to recv peek the header - failed (Linux bug?) I saw a strange behaviour of the MGS_PEEK flag on Linux (Ubuntu 22.04 LTS). When using MSG_PEEK through `socket` to try to avoid concatenating the header and body binaries, instead peek the header then receive the whole packet, this happened (header 2, incoming <<0,1,65>>, but one character at the time, through `gen_tcp_echo_SUITE`'s slow echo server: * recv 0 -> select ... * recv 0 -> <<0>> We are missing one byte for the header and try to peek it * recv 1 peek -> timeout * recv 1 -> select ... New attempt to peek the header * recv 1 peek -> <<1>> We conclude the length field is 1 and we are missing 2 bytes * recv 2 -> select ... Here we don't save the state that we want 2 bytes so we start over with the <<0>> we have and, again, peek for the rest of the header. * recv 1 peek -> <<1,0>> ??? we asked for 1 byte, what is the <<0>>? Here the packet decode succeeds surprisingly since we were supposed to only have the header, so we fall back to the generic path which receives one byte at the time. * recv 1 -> <<65>> ??? where did the <<1>> go? * recv 65 -> select ... We are lost and think the length field is 65= recv N peek means `socket:recv(_, N, [peek], 0)` (timeout 0). We didn't store data from a peek, just use it to figure out the packet length for a recv without peek. --- lib/kernel/src/gen_tcp_socket.erl | 143 +++++++++++++++++++++--------- 1 file changed, 101 insertions(+), 42 deletions(-) diff --git a/lib/kernel/src/gen_tcp_socket.erl b/lib/kernel/src/gen_tcp_socket.erl index 3998f9a5b803..0e87b1fe1556 100644 --- a/lib/kernel/src/gen_tcp_socket.erl +++ b/lib/kernel/src/gen_tcp_socket.erl @@ -887,11 +887,15 @@ socket_send_error(Result) -> end. --compile({inline, [socket_recv/2]}). +-compile({inline, [socket_recv/2, socket_recv_peek/2]}). socket_recv(Socket, Length) -> Result = socket:recv(Socket, Length, [], nowait), %% ?DBG({Socket, Length, Result}), Result. +socket_recv_peek(Socket, Length) -> + Result = socket:recv(Socket, Length, [peek], 0), + %% ?DBG({Socket, Length, Result}), + Result. -compile({inline, [socket_close/1]}). socket_close(Socket) -> @@ -2273,6 +2277,7 @@ handle_recv(P, D, ActionsR) -> %% handle_recv(P, #{buffer := Buffer} = D, ActionsR, CS) -> BufferSize = iolist_size(Buffer), + %% ?DBG(CS), case CS of recv -> handle_recv(P, D, ActionsR, Buffer, BufferSize, CS); @@ -2293,8 +2298,9 @@ handle_recv(P, #{buffer := Buffer} = D, ActionsR, CS) -> end. handle_recv( - P, #{packet := Packet, recv_length := Length} = D, + P, #{packet := PacketType, recv_length := Length} = D, ActionsR, Buffer, BufferSize, CS) -> + %% ?DBG({D, Buffer, BufferSize, CS}), if BufferSize < Length; Length == 0, BufferSize == 0 -> @@ -2306,8 +2312,8 @@ handle_recv( handle_recv_error(P, D, ActionsR, Reason) end; - Packet =:= raw; - Packet =:= 0 -> + PacketType =:= raw; + PacketType =:= 0 -> Data = condense_buffer(Buffer), {Data_1, Buffer_1} = if @@ -2327,39 +2333,96 @@ handle_recv( end; true -> - Data = condense_buffer(Buffer), - handle_recv_packet(P, D, ActionsR, Data, BufferSize, CS) + MinHdrLen = packet_header_length(PacketType), + if + BufferSize < MinHdrLen -> + handle_recv_packet_peek( + P, D, ActionsR, Buffer, BufferSize, MinHdrLen, CS); + true -> + handle_recv_packet( + P, D, ActionsR, Buffer, BufferSize, CS) + end end. -handle_recv_packet(P, D, ActionsR, Data, Size, recv) -> +handle_recv_packet(P, D, ActionsR, Buffer, BufferSize, recv = _CS) -> + %% ?DBG({Buffer, BufferSize, _CS}), + Data = condense_buffer(Buffer), case decode_packet(D, Data) of - {D_1, ok, Decoded} -> + {ok, Decoded, Rest} -> + D_1 = D#{buffer := Rest}, handle_connected( P, recv_data_deliver(P, D_1, ActionsR, Decoded)); - {D_1, more, Length} -> + {more, undefined} -> + %% Odd bad case - try one byte more + %% + %% Can be very inefficient, but the only way to not read ahead + %% for example for packet=line, so to combine such packet types + %% with read_ahead=false would be considered as misuse + handle_recv_more( + P, D, ActionsR, Data, BufferSize, BufferSize + 1); + {more, Length} -> handle_recv_more( - P, D_1, ActionsR, Data, Size, Length); - {D_1, error, invalid} -> - handle_recv_error(P, D_1, ActionsR, emsgsize); - {D_1, error, Reason} -> - handle_recv_error(P, D_1, ActionsR, Reason) + P, D, ActionsR, Data, BufferSize, Length); + {error, invalid} -> + handle_recv_error(P, D, ActionsR, emsgsize); + {error, Reason} -> + handle_recv_error(P, D, ActionsR, Reason) end; handle_recv_packet( - P, D, ActionsR, Data, _Size, {recv, Reason}) -> + P, D, ActionsR, Buffer, _BufferSize, {recv, Reason} = _CS) -> + %% ?DBG({Buffer, _BufferSize, _CS}), + Data = condense_buffer(Buffer), case decode_packet(D, Data) of - {D_1, ok, Decoded} -> + {ok, Decoded, Rest} -> + D_1 = D#{buffer := Rest}, handle_recv_error( P, recv_data_deliver(P, D_1, ActionsR, Decoded), Reason); - {D_1, more, _} -> - handle_recv_error(P, D_1, ActionsR, Reason); - {D_1, error, _} -> - handle_recv_error(P, D_1, ActionsR, Reason) + {more, _} -> + handle_recv_error(P, D, ActionsR, Reason); + {error, _} -> + handle_recv_error(P, D, ActionsR, Reason) end. +handle_recv_packet_peek( + P, D, ActionsR, Buffer, BufferSize, MinHdrLen, recv = _CS) -> + %% ?DBG({Buffer, BufferSize, MinHdrLen, _CS}), + case socket_recv_peek(P#params.socket, MinHdrLen-BufferSize) of + {ok, <>} -> + Header = condense_buffer(Data, Buffer), + case decode_packet(D, Header) of + {more, undefined} -> + %% Odd bad case - try one byte more, see above + handle_recv_more( + P, D, ActionsR, Buffer, BufferSize, MinHdrLen + 1); + {more, PacketLength} -> + handle_recv_more( + P, D, ActionsR, Buffer, BufferSize, PacketLength); + _ -> + %% Fall back to generic path + handle_recv_packet( + P, D, ActionsR, Buffer, BufferSize, recv) + end; + + %% Fallbacks when not enough data could be peeked + {error, {timeout, _}} -> + handle_recv_packet(P, D, ActionsR, Buffer, BufferSize, recv); + {error, timeout} -> + handle_recv_packet(P, D, ActionsR, Buffer, BufferSize, recv); + + {error, {Reason, _}} -> + handle_recv_error(P, D, ActionsR, Reason); + {error, Reason} -> + handle_recv_error(P, D, ActionsR, Reason) + end; +handle_recv_packet_peek( + P, D, ActionsR, Buffer, BufferSize, _MinHdrLen, {recv, _} = CS) -> + %% ?DBG({Buffer, BufferSize, _MinHdrLen, CS}), + handle_recv_packet(P, D, ActionsR, Buffer, BufferSize, CS). handle_recv_more( P, D, ActionsR, Buffer, BufferSize, Length) -> + %% ?DBG({Buffer, BufferSize, Length}), %% Less buffered than requested %% or nothing buffered and "what's available"|unknown requested %% @@ -2427,35 +2490,36 @@ handle_recv_more( decode_packet( #{packet := (PacketType = line), line_delimiter := LineDelimiter, - packet_size := PacketSize} = D, + packet_size := PacketSize}, Data) -> %% - decode_packet( - D, Data, PacketType, + erlang:decode_packet( + PacketType, Data, [{packet_size, PacketSize}, {line_delimiter, LineDelimiter}, {line_length, PacketSize}]); decode_packet( #{packet := http, recv_httph := true, - packet_size := PacketSize} = D, + packet_size := PacketSize}, Data) -> %% - decode_packet(D, Data, httph, [{packet_size, PacketSize}]); + erlang:decode_packet(httph, Data, [{packet_size, PacketSize}]); decode_packet( #{packet := http_bin, recv_httph := true, - packet_size := PacketSize} = D, + packet_size := PacketSize}, Data) -> %% - decode_packet(D, Data, httph_bin, [{packet_size, PacketSize}]); + erlang:decode_packet(httph_bin, Data, [{packet_size, PacketSize}]); decode_packet( #{packet := PacketType, - packet_size := PacketSize} = D, + packet_size := PacketSize}, Data) -> %% - decode_packet(D, Data, PacketType, [{packet_size, PacketSize}]). + erlang:decode_packet(PacketType, Data, [{packet_size, PacketSize}]). +-ifdef(undefined). decode_packet(D, Data, PacketType, Options) -> case erlang:decode_packet(PacketType, Data, Options) of {ok, Decoded, Rest} -> @@ -2469,11 +2533,13 @@ decode_packet(D, Data, PacketType, Options) -> {error, Reason} -> {D, error, Reason} end. +-endif. -packet_header_length(PacketType, Data) -> +-compile({inline, [packet_header_length/1]}). +packet_header_length(PacketType) -> case PacketType of - raw -> error(badarg, [PacketType, Data]); - 0 -> error(badarg, [PacketType, Data]); + raw -> error(badarg, [PacketType]); + 0 -> error(badarg, [PacketType]); 1 -> 1; 2 -> 2; 4 -> 4; @@ -2483,14 +2549,8 @@ packet_header_length(PacketType, Data) -> tpkt -> 4; ssl -> 5; ssl_tls -> 5; - %% For these variable length headers/footers we guess one more - %% since this function is only called if Data is too short, - %% which can be very inefficient, but we consider it to be - %% misuse to combine read_ahead=false with such header types - asn1 -> - max(2, iolist_size(Data) + 1); - _ -> % http, line, etc - iolist_size(Data) + 1 + asn1 -> 2; + _ -> 1 end. @@ -2671,6 +2731,7 @@ recv_data_deliver( #{mode := Mode, header := Header, deliver := Deliver, packet := Packet} = D, ActionsR, Data) -> + %% ?DBG(Data), %% %% ?DBG([{owner, Owner}, %% {mode, Mode}, @@ -2771,10 +2832,8 @@ condense_buffer([Bin]) when is_binary(Bin) -> Bin; condense_buffer(Buffer) -> iolist_to_binary(reverse(Buffer)). --ifdef(undefined). condense_buffer(Data, Buffer) -> condense_buffer(buffer(Data, Buffer)). --endif. deliver_data(Data, Mode, Header, Packet) -> if From 74790e3854f4ca610c6128cdac797a601f12b4e4 Mon Sep 17 00:00:00 2001 From: Raimo Niskanen Date: Fri, 30 Aug 2024 15:50:49 +0200 Subject: [PATCH 5/8] Don't try to recv peek the header --- lib/kernel/src/gen_tcp_socket.erl | 48 ++++--------------------------- 1 file changed, 5 insertions(+), 43 deletions(-) diff --git a/lib/kernel/src/gen_tcp_socket.erl b/lib/kernel/src/gen_tcp_socket.erl index 0e87b1fe1556..cea9740bcbd6 100644 --- a/lib/kernel/src/gen_tcp_socket.erl +++ b/lib/kernel/src/gen_tcp_socket.erl @@ -887,15 +887,11 @@ socket_send_error(Result) -> end. --compile({inline, [socket_recv/2, socket_recv_peek/2]}). +-compile({inline, [socket_recv/2]}). socket_recv(Socket, Length) -> Result = socket:recv(Socket, Length, [], nowait), %% ?DBG({Socket, Length, Result}), Result. -socket_recv_peek(Socket, Length) -> - Result = socket:recv(Socket, Length, [peek], 0), - %% ?DBG({Socket, Length, Result}), - Result. -compile({inline, [socket_close/1]}). socket_close(Socket) -> @@ -2336,8 +2332,8 @@ handle_recv( MinHdrLen = packet_header_length(PacketType), if BufferSize < MinHdrLen -> - handle_recv_packet_peek( - P, D, ActionsR, Buffer, BufferSize, MinHdrLen, CS); + handle_recv_more( + P, D, ActionsR, Buffer, BufferSize, MinHdrLen); true -> handle_recv_packet( P, D, ActionsR, Buffer, BufferSize, CS) @@ -2384,42 +2380,6 @@ handle_recv_packet( handle_recv_error(P, D, ActionsR, Reason) end. -handle_recv_packet_peek( - P, D, ActionsR, Buffer, BufferSize, MinHdrLen, recv = _CS) -> - %% ?DBG({Buffer, BufferSize, MinHdrLen, _CS}), - case socket_recv_peek(P#params.socket, MinHdrLen-BufferSize) of - {ok, <>} -> - Header = condense_buffer(Data, Buffer), - case decode_packet(D, Header) of - {more, undefined} -> - %% Odd bad case - try one byte more, see above - handle_recv_more( - P, D, ActionsR, Buffer, BufferSize, MinHdrLen + 1); - {more, PacketLength} -> - handle_recv_more( - P, D, ActionsR, Buffer, BufferSize, PacketLength); - _ -> - %% Fall back to generic path - handle_recv_packet( - P, D, ActionsR, Buffer, BufferSize, recv) - end; - - %% Fallbacks when not enough data could be peeked - {error, {timeout, _}} -> - handle_recv_packet(P, D, ActionsR, Buffer, BufferSize, recv); - {error, timeout} -> - handle_recv_packet(P, D, ActionsR, Buffer, BufferSize, recv); - - {error, {Reason, _}} -> - handle_recv_error(P, D, ActionsR, Reason); - {error, Reason} -> - handle_recv_error(P, D, ActionsR, Reason) - end; -handle_recv_packet_peek( - P, D, ActionsR, Buffer, BufferSize, _MinHdrLen, {recv, _} = CS) -> - %% ?DBG({Buffer, BufferSize, _MinHdrLen, CS}), - handle_recv_packet(P, D, ActionsR, Buffer, BufferSize, CS). - handle_recv_more( P, D, ActionsR, Buffer, BufferSize, Length) -> %% ?DBG({Buffer, BufferSize, Length}), @@ -2832,8 +2792,10 @@ condense_buffer([Bin]) when is_binary(Bin) -> Bin; condense_buffer(Buffer) -> iolist_to_binary(reverse(Buffer)). +-ifdef(undefined). condense_buffer(Data, Buffer) -> condense_buffer(buffer(Data, Buffer)). +-endif. deliver_data(Data, Mode, Header, Packet) -> if From 525e8590e7835d34b99824e1ec2d6471173d265a Mon Sep 17 00:00:00 2001 From: Raimo Niskanen Date: Fri, 30 Aug 2024 15:52:19 +0200 Subject: [PATCH 6/8] Test read_ahead = false --- lib/kernel/test/gen_tcp_echo_SUITE.erl | 229 ++++++++++++++++--------- lib/kernel/test/kernel_test_lib.erl | 35 ++-- 2 files changed, 167 insertions(+), 97 deletions(-) diff --git a/lib/kernel/test/gen_tcp_echo_SUITE.erl b/lib/kernel/test/gen_tcp_echo_SUITE.erl index de399cf4a440..f6d9fcd11bb5 100644 --- a/lib/kernel/test/gen_tcp_echo_SUITE.erl +++ b/lib/kernel/test/gen_tcp_echo_SUITE.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 1997-2021. All Rights Reserved. +%% Copyright Ericsson AB 1997-2024. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -35,17 +35,35 @@ -define(LINE_LENGTH, 1023). % (default value of gen_tcp option 'recbuf') - 1 suite() -> - [{ct_hooks,[ts_install_cth]}, - {timetrap,{minutes,5}}]. + [{ct_hooks, [ts_install_cth]}, + {timetrap, {minutes,1}}]. + +all() -> + case kernel_test_lib:test_inet_backends() of + true -> + [{group, inet_backend_inet}, + {group, inet_backend_socket}]; + false -> + [{group, inet_backend_default}] + end. -all() -> +groups() -> + [{inet_backend_default, [{group, read_ahead}, {group, no_read_ahead}]}, + {inet_backend_socket, [{group, read_ahead}, {group, no_read_ahead}]}, + {inet_backend_inet, [{group, read_ahead}, {group, no_read_ahead}]}, + %% + {read_ahead, [{group, no_delay_send}, {group, delay_send}]}, + {no_read_ahead, [{group, no_delay_send}, {group, delay_send}]}, + %% + {no_delay_send, testcases()}, + {delay_send, testcases()}]. + +testcases() -> [active_echo, passive_echo, active_once_echo, slow_active_echo, slow_passive_echo, limit_active_echo, limit_passive_echo, large_limit_active_echo, large_limit_passive_echo]. -groups() -> - []. init_per_suite(Config) -> Config. @@ -53,140 +71,183 @@ init_per_suite(Config) -> end_per_suite(_Config) -> ok. -init_per_group(_GroupName, Config) -> - Config. -end_per_group(_GroupName, Config) -> - Config. +init_per_group(Name, Config) -> + case Name of + inet_backend_default -> + kernel_test_lib:config_inet_backend(Config, default); + inet_backend_socket -> + init_per_group_inet_backend(socket, Config); + inet_backend_inet -> + init_per_group_inet_backend(inet, Config); + %% + no_read_ahead -> [{read_ahead, false} | Config]; + delay_send -> [{delay_send, true} | Config]; + _ -> Config + end. +init_per_group_inet_backend(Backend, Config) -> + case kernel_test_lib:explicit_inet_backend() of + true -> + %% Contradicting kernel variables - skip group + {skip, "explicit inet backend"}; + false -> + kernel_test_lib:config_inet_backend(Config, Backend) + end. + +end_per_group(Name, Config) -> + case Name of + no_read_ahead -> lists:keydelete(read_ahead, 1, Config); + delay_send -> lists:keydelete(delay_send, 1, Config); + _ -> Config + end. -init_per_testcase(_Func, Config) -> + +init_per_testcase(Name, Config) -> + _ = + case Name of + slow_active_echo -> ct:timetrap({minutes, 5}); + slow_passive_echo -> ct:timetrap({minutes, 5}); + _ -> ok + end, Config. -end_per_testcase(_Func, _Config) -> +end_per_testcase(_Name, _Config) -> ok. + +sockopts(Config) -> + [Option || + {Name, _} = Option <- Config, + lists:member(Name, [read_ahead, delay_send])]. + + %% Test sending packets of various sizes and various packet types %% to the echo port and receiving them again (socket in active mode). active_echo(Config) when is_list(Config) -> - echo_test([], fun active_echo/4, [{echo, fun echo_server/0}]). + echo_test(Config, [], fun active_echo/4, [{echo, fun echo_server/0}]). %% Test sending packets of various sizes and various packet types %% to the echo port and receiving them again (socket in passive mode). passive_echo(Config) when is_list(Config) -> - echo_test([{active, false}], fun passive_echo/4, - [{echo, fun echo_server/0}]). + echo_test( + Config, [{active, false}], fun passive_echo/4, + [{echo, fun echo_server/0}]). %% Test sending packets of various sizes and various packet types %% to the echo port and receiving them again (socket in active once mode). active_once_echo(Config) when is_list(Config) -> - echo_test([{active, once}], fun active_once_echo/4, - [{echo, fun echo_server/0}]). + echo_test( + Config, [{active, once}], fun active_once_echo/4, + [{echo, fun echo_server/0}]). %% Test sending packets of various sizes and various packet types %% to the echo port and receiving them again (socket in active mode). %% The echo server is a special one that delays between every character. slow_active_echo(Config) when is_list(Config) -> - echo_test([], fun active_echo/4, - [slow_echo, {echo, fun slow_echo_server/0}]). + echo_test( + Config, [], fun active_echo/4, + [slow_echo, {echo, fun slow_echo_server/0}]). %% Test sending packets of various sizes and various packet types %% to an echo server and receiving them again (socket in passive mode). %% The echo server is a special one that delays between every character. slow_passive_echo(Config) when is_list(Config) -> - echo_test([{active, false}], fun passive_echo/4, - [slow_echo, {echo, fun slow_echo_server/0}]). + echo_test( + Config, [{active, false}], fun passive_echo/4, + [slow_echo, {echo, fun slow_echo_server/0}]). %% Test sending packets of various sizes and various packet types %% to the echo port and receiving them again (socket in active mode) %% with packet_size limitation. limit_active_echo(Config) when is_list(Config) -> - echo_test([{packet_size, 10}], - fun active_echo/4, - [{packet_size, 10}, {echo, fun echo_server/0}]). + echo_test( + Config, [{packet_size, 10}], fun active_echo/4, + [{packet_size, 10}, {echo, fun echo_server/0}]). %% Test sending packets of various sizes and various packet types %% to the echo port and receiving them again (socket in passive mode) %% with packet_size limitation. limit_passive_echo(Config) when is_list(Config) -> - echo_test([{packet_size, 10},{active, false}], - fun passive_echo/4, - [{packet_size, 10}, {echo, fun echo_server/0}]). + echo_test( + Config, [{packet_size, 10},{active, false}], fun passive_echo/4, + [{packet_size, 10}, {echo, fun echo_server/0}]). %% Test sending packets of various sizes and various packet types %% to the echo port and receiving them again (socket in active mode) %% with large packet_size limitation. large_limit_active_echo(Config) when is_list(Config) -> - echo_test([{packet_size, 10}], - fun active_echo/4, - [{packet_size, (1 bsl 32)-1}, - {echo, fun echo_server/0}]). + echo_test( + Config, [{packet_size, 10}], fun active_echo/4, + [{packet_size, (1 bsl 32)-1}, {echo, fun echo_server/0}]). %% Test sending packets of various sizes and various packet types %% to the echo port and receiving them again (socket in passive mode) %% with large packet_size limitation. large_limit_passive_echo(Config) when is_list(Config) -> - echo_test([{packet_size, 10},{active, false}], - fun passive_echo/4, - [{packet_size, (1 bsl 32) -1}, - {echo, fun echo_server/0}]). - -echo_test(SockOpts, EchoFun, Config0) -> - echo_test_1(SockOpts, EchoFun, Config0), - io:format("\nrepeating test with {delay_send,true}"), - echo_test_1([{delay_send,true}|SockOpts], EchoFun, Config0). - -echo_test_1(SockOpts, EchoFun, Config0) -> - EchoSrvFun = proplists:get_value(echo, Config0), + echo_test( + Config, [{packet_size, 10},{active, false}], fun passive_echo/4, + [{packet_size, (1 bsl 32) -1}, {echo, fun echo_server/0}]). + +echo_test(Config, SockOpts_0, EchoFun, EchoOpts_0) -> + SockOpts = SockOpts_0 ++ sockopts(Config), + ct:log("SockOpts = ~p.", [SockOpts]), + EchoSrvFun = proplists:get_value(echo, EchoOpts_0), {ok, EchoPort} = EchoSrvFun(), - Config = [{echo_port, EchoPort}|Config0], - - echo_packet([{packet, 1}|SockOpts], EchoFun, Config), - echo_packet([{packet, 2}|SockOpts], EchoFun, Config), - echo_packet([{packet, 4}|SockOpts], EchoFun, Config), - echo_packet([{packet, sunrm}|SockOpts], EchoFun, Config), - echo_packet([{packet, cdr}|SockOpts], EchoFun, - [{type, {cdr, big}}|Config]), - echo_packet([{packet, cdr}|SockOpts], EchoFun, - [{type, {cdr, little}}|Config]), + EchoOpts = [{echo_port, EchoPort}|EchoOpts_0], + + echo_packet(Config, [{packet, 1}|SockOpts], EchoFun, EchoOpts), + echo_packet(Config, [{packet, 2}|SockOpts], EchoFun, EchoOpts), + echo_packet(Config, [{packet, 4}|SockOpts], EchoFun, EchoOpts), + echo_packet(Config, [{packet, sunrm}|SockOpts], EchoFun, EchoOpts), + echo_packet(Config, [{packet, cdr}|SockOpts], EchoFun, + [{type, {cdr, big}}|EchoOpts]), + echo_packet(Config, [{packet, cdr}|SockOpts], EchoFun, + [{type, {cdr, little}}|EchoOpts]), case lists:keymember(packet_size, 1, SockOpts) of false -> %% This is cheating, we should test that packet_size %% also works for line and http. - echo_packet([{packet, line}|SockOpts], EchoFun, Config), - echo_packet([{packet, http}|SockOpts], EchoFun, Config), - echo_packet([{packet, http_bin}|SockOpts], EchoFun, Config); + echo_packet( + Config, [{packet, line}|SockOpts], EchoFun, EchoOpts), + echo_packet( + Config, [{packet, http}|SockOpts], EchoFun, EchoOpts), + echo_packet( + Config, [{packet, http_bin}|SockOpts], EchoFun, EchoOpts); true -> ok end, - echo_packet([{packet, tpkt}|SockOpts], EchoFun, Config), + echo_packet(Config, [{packet, tpkt}|SockOpts], EchoFun, EchoOpts), ShortTag = [16#E0], LongTag = [16#1F, 16#83, 16#27], - echo_packet([{packet, asn1}|SockOpts], EchoFun, - [{type, {asn1, short, ShortTag}}|Config]), - echo_packet([{packet, asn1}|SockOpts], EchoFun, - [{type, {asn1, long, ShortTag}}|Config]), - echo_packet([{packet, asn1}|SockOpts], EchoFun, - [{type, {asn1, short, LongTag}}|Config]), - echo_packet([{packet, asn1}|SockOpts], EchoFun, - [{type, {asn1, long, LongTag}}|Config]), + echo_packet(Config, [{packet, asn1}|SockOpts], EchoFun, + [{type, {asn1, short, ShortTag}}|EchoOpts]), + echo_packet(Config, [{packet, asn1}|SockOpts], EchoFun, + [{type, {asn1, long, ShortTag}}|EchoOpts]), + echo_packet(Config, [{packet, asn1}|SockOpts], EchoFun, + [{type, {asn1, short, LongTag}}|EchoOpts]), + echo_packet(Config, [{packet, asn1}|SockOpts], EchoFun, + [{type, {asn1, long, LongTag}}|EchoOpts]), ok. -echo_packet(SockOpts, EchoFun, Opts) -> - Type = case lists:keysearch(type, 1, Opts) of - {value, {type, T}} -> +echo_packet(Config, SockOpts, EchoFun, EchoOpts) -> + Type = case lists:keyfind(type, 1, EchoOpts) of + {_, T} -> T; - _ -> - {value, {packet, T}} = lists:keysearch(packet, 1, SockOpts), + false -> + {_, T} = lists:keyfind(packet, 1, SockOpts), T end, %% Connect to the echo server. - EchoPort = proplists:get_value(echo_port, Opts), - {ok, Echo} = gen_tcp:connect(localhost, EchoPort, SockOpts), + EchoPort = proplists:get_value(echo_port, EchoOpts), + {ok, Echo} = + kernel_test_lib:connect(Config, localhost, EchoPort, SockOpts), + + ct:pal("Echo socket: ~w", [Echo]), - SlowEcho = lists:member(slow_echo, Opts), + SlowEcho = lists:member(slow_echo, EchoOpts), case Type of http -> @@ -194,7 +255,7 @@ echo_packet(SockOpts, EchoFun, Opts) -> http_bin -> echo_packet_http(Echo, Type, EchoFun); _ -> - echo_packet0(Echo, Type, EchoFun, SlowEcho, Opts) + echo_packet0(Echo, Type, EchoFun, SlowEcho, EchoOpts) end. echo_packet_http(Echo, Type, EchoFun) -> @@ -205,11 +266,11 @@ echo_packet_http(Echo, Type, EchoFun) -> P2 = http_response(), EchoFun(Echo, Type, P2, http_reply(P2, Type)). -echo_packet0(Echo, Type, EchoFun, SlowEcho, Opts) -> +echo_packet0(Echo, Type, EchoFun, SlowEcho, EchoOpts) -> PacketSize = - case lists:keysearch(packet_size, 1, Opts) of - {value,{packet_size,Sz}} when Sz < 10 -> Sz; - {value,{packet_size,_}} -> 10; + case lists:keyfind(packet_size, 1, EchoOpts) of + {_,Sz} when Sz < 10 -> Sz; + {_,_} -> 10; false -> 0 end, ct:log("echo_packet0[~w] ~p", [self(), PacketSize]), @@ -265,7 +326,7 @@ echo_packet1(EchoSock, Type, EchoFun, Size) -> false -> ok; Packet -> - io:format("Type ~p, size ~p, time ~p", + ct:log("Type ~p, size ~p, time ~p", [Type, Size, time()]), case EchoFun(EchoSock, Type, Packet, [Packet]) of ok -> @@ -278,7 +339,7 @@ echo_packet1(EchoSock, Type, EchoFun, Size) -> {error, emsgsize} -> case Size of {N, Max} when N > Max -> - io:format(" Blocked!"); + ct:log(" Blocked!"); _ -> ct:fail( {packet_blocked, Size}) @@ -301,7 +362,7 @@ active_recv(Sock, Type, [PacketEcho|Tail]) -> _ -> tcp end, receive Recv->Recv end, - %%io:format("Active received: ~p\n",[Recv]), + %%ct:log("Active received: ~p\n",[Recv]), case Recv of {Tag, Sock, PacketEcho} -> active_recv(Sock, Type, Tail); @@ -323,12 +384,12 @@ passive_recv(_, []) -> ok; passive_recv(Sock, [PacketEcho | Tail]) -> Recv = gen_tcp:recv(Sock, 0), - %%io:format("Passive received: ~p\n",[Recv]), + %%ct:log("Passive received: ~p\n",[Recv]), case Recv of {ok, PacketEcho} -> passive_recv(Sock, Tail); {ok, Bad} -> - io:format("Expected: ~p\nGot: ~p\n",[PacketEcho,Bad]), + ct:log("Expected: ~p\nGot: ~p\n",[PacketEcho,Bad]), ct:fail({wrong_data, Bad, PacketEcho}); {error,PacketEcho} -> passive_recv(Sock, Tail); % expected error @@ -575,7 +636,7 @@ http_reply(Bin, Type) -> http_bin -> httph_bin end, Ret = lists:reverse(http_reply(Rest,[Line],HType)), - io:format("HTTP: ~p\n",[Ret]), + ct:log("HTTP: ~p\n",[Ret]), Ret. http_reply(<<>>, Acc, _) -> diff --git a/lib/kernel/test/kernel_test_lib.erl b/lib/kernel/test/kernel_test_lib.erl index bb0c19292d5d..079aeefe33a4 100644 --- a/lib/kernel/test/kernel_test_lib.erl +++ b/lib/kernel/test/kernel_test_lib.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2020-2023. All Rights Reserved. +%% Copyright Ericsson AB 2020-2024. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -31,7 +31,8 @@ inet_backend_opts/1, explicit_inet_backend/0, test_inet_backends/0, - which_inet_backend/1]). + which_inet_backend/1, + config_inet_backend/2]). -export([start_node/2, start_node/3, stop_node/1]). -export([f/2, @@ -2630,16 +2631,16 @@ open(Config, Port, Opts) -> inet_backend_opts(Config) when is_list(Config) -> - case lists:keysearch(socket_create_opts, 1, Config) of - {value, {socket_create_opts, InetBackendOpts}} -> + case lists:keyfind(socket_create_opts, 1, Config) of + {_, InetBackendOpts} -> InetBackendOpts; false -> [] end. is_socket_backend(Config) when is_list(Config) -> - case lists:keysearch(socket_create_opts, 1, Config) of - {value, {socket_create_opts, [{inet_backend, socket}]}} -> + case lists:keyfind(socket_create_opts, 1, Config) of + {_, [{inet_backend, socket}]} -> true; _ -> false @@ -2649,8 +2650,8 @@ is_socket_backend(Config) when is_list(Config) -> explicit_inet_backend() -> case application:get_all_env(kernel) of Env when is_list(Env) -> - case lists:keysearch(inet_backend, 1, Env) of - {value, {inet_backend, _}} -> + case lists:keyfind(inet_backend, 1, Env) of + {_, _} -> true; _ -> false @@ -2663,8 +2664,8 @@ explicit_inet_backend() -> test_inet_backends() -> case application:get_all_env(kernel) of Env when is_list(Env) -> - case lists:keysearch(test_inet_backends, 1, Env) of - {value, {test_inet_backends, true}} -> + case lists:keyfind(test_inet_backends, 1, Env) of + {_, true} -> true; _ -> false @@ -2674,13 +2675,21 @@ test_inet_backends() -> end. which_inet_backend(Config) -> - case lists:keysearch(socket_create_opts, 1, Config) of - {value, {socket_create_opts, [{inet_backend, Backend}]}} -> + case lists:keyfind(socket_create_opts, 1, Config) of + {_, [{inet_backend, Backend}]} -> Backend; _ -> default end. - + +config_inet_backend(Config, Backend) -> + if + Backend =:= default -> + []; + Backend =:= socket; + Backend =:= inet -> + [{socket_create_opts, [{inet_backend, Backend}]}] + end ++ lists:keydelete(socket_create_opts, 1, Config). proxy_call(F, Timeout, Default) From 1d4d2038518c062b3089d9fa1c60345d9869eb66 Mon Sep 17 00:00:00 2001 From: Raimo Niskanen Date: Mon, 2 Sep 2024 17:18:56 +0200 Subject: [PATCH 7/8] Document option `read_ahead` --- lib/kernel/src/inet.erl | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/lib/kernel/src/inet.erl b/lib/kernel/src/inet.erl index 0628385b9702..155bf4759bd4 100644 --- a/lib/kernel/src/inet.erl +++ b/lib/kernel/src/inet.erl @@ -1300,6 +1300,27 @@ The following options are available: - **`{raw, Protocol, OptionNum, ValueBin}`** - See below. +- **`{read_ahead, Boolean}`** [](){: #option-read_ahead } - + If set to `false` avoids reading ahead from the OS socket layer. + The default for this option is `true` which speeds up packet header parsing. + Setting `false` has a performance penalty because the packet header + has to be read first, to know exactly how many bytes to read for the body, + which roughly doubles the number of read operations. + + The use of this option is essential for example before switching to kTLS + which activates OS socket layer encryption and decryption by setting + special (raw) socket options. So if the Erlang socket layer has read ahead, + it has read bytes that was for the OS socket layer to decrypt, + which makes packet decryption derail for the connection. + + > #### Warning {: .warning } + > + > For packet modes that doesn't have the packet length at a fixed location + > in a packet header, such as `line` or `asn1`, not reading ahead + > can become very inefficient since sometimes the only way to accomplish + > this is to read one byte at the time until the length + > or packet end is found. + - **`{read_packets, Integer}` (UDP sockets)** [](){: #option-read_packets } - Sets the maximum number of UDP packets to read without intervention from the socket when data is available. When this many packets From 0e943f491e3b499a86a141fb1d13e1150c13319c Mon Sep 17 00:00:00 2001 From: Raimo Niskanen Date: Tue, 3 Sep 2024 17:31:44 +0200 Subject: [PATCH 8/8] Test read_ahead true|false performance --- lib/kernel/test/gen_tcp_socket_SUITE.erl | 206 +++++++++++++++-------- 1 file changed, 132 insertions(+), 74 deletions(-) diff --git a/lib/kernel/test/gen_tcp_socket_SUITE.erl b/lib/kernel/test/gen_tcp_socket_SUITE.erl index 8f7062ca36ec..baf1468203db 100644 --- a/lib/kernel/test/gen_tcp_socket_SUITE.erl +++ b/lib/kernel/test/gen_tcp_socket_SUITE.erl @@ -28,34 +28,41 @@ all() -> [{group, smoketest}]. groups() -> - [{smoketest, [{group,small}]}, - {benchmark, [{group,small}, {group,medium}, - {group,large}, {group,huge}]}, + [{smoketest, [{group,small}]}, + {benchmark, [{group,small}, {group,medium}, + {group,large}, {group,huge}]}, %% - {dev, [{group,dev_direct}, - {group,dev_inet}, {group,dev_socket}]}, - {dev_inet, testcases(dev)}, - {dev_socket, testcases(dev)}, - {dev_direct, testcases(direct)}, + {dev, backend_groups(dev)}, + {dev_inet, testcases(dev)}, + {dev_socket, testcases(dev)}, + {dev_direct, testcases(direct)}, + {dev_inet_nra, testcases(dev)}, % _nra: no read_ahead, + {dev_socket_nra, testcases(dev)}, % i.e {read_ahead,false} + {dev_direct_nra, testcases(direct)}, %% - {small, backend_groups()}, - {medium, backend_groups()}, - {large, backend_groups()}, - {huge, backend_groups()}, - %% - {inet, testcases(active)}, - {socket, testcases(active)}, - {direct, testcases(direct)}]. - -backend_groups() -> - [{group,inet}, {group,socket}, {group,direct}]. + {small, backend_groups(normal)}, + {medium, backend_groups(normal)}, + {large, backend_groups(normal)}, + {huge, backend_groups(normal)}, + {inet, testcases(active)}, + {socket, testcases(active)}, + {direct, testcases(direct)}, + {inet_nra, testcases(active)}, + {socket_nra, testcases(active)}, + {direct_nra, testcases(direct)}]. + +backend_groups(dev) -> + [{group,dev_inet}, {group,dev_socket}, {group,dev_direct}, + {group,dev_inet_nra}, {group,dev_socket_nra}, {group,dev_direct_nra}]; +backend_groups(normal) -> + [{group,inet}, {group,socket}, {group,direct}, + {group,inet_nra}, {group,socket_nra}, {group,direct_nra}]. testcases(active) -> [active_raw, active_false, active_true, active_once, active_1, active_5, active_20]; testcases(dev) -> - [active_raw, active_false, active_true, - active_once, active_20]; + [active_raw, active_false, active_true, active_once, active_20]; testcases(direct) -> [socket_raw, socket_packet, socket_packet_buf, socket_packet_cheat]. @@ -67,6 +74,8 @@ testcases(direct) -> -define(BUFSIZE_EXP, 17). -define(BUFSIZE, (1 bsl ?BUFSIZE_EXP)). +-define(DBG(Term), dbg(?FUNCTION_NAME, ?LINE, begin Term end)). + init_per_suite(Config) -> case socket:is_supported(protocols, tcp) of true -> @@ -85,40 +94,48 @@ end_per_suite(_Config) -> init_per_group(Nm, Config) -> case Nm of + %% 'burden' shifts the number of packets left smoketest -> [{burden,0} | Config]; - benchmark -> [{burden,2} | Config]; % 4 times the total size + benchmark -> [{burden,2} | Config]; % 4 times the number of packets + dev -> + init_per_group_size(?BUFSIZE_EXP + 4, 0, [{burden,3} | Config]); %% - dev -> init_per_group(dev_size, [{burden,3} | Config]); dev_inet -> init_per_group(inet, Config); dev_socket -> init_per_group(socket, Config); dev_direct -> init_per_group(direct, Config); %% - dev_size -> - init_per_group_size( - ?BUFSIZE_EXP + 4, ?NORM_EXP - (?BUFSIZE_EXP + 4), Config); %% For small packets we cannot have the total size 2^?NORM_EXP %% since the message passing overhead (latency) starts to dominate %% so we have to subtract some from the exponent to get %% roughly the same running time - small -> init_per_group_size(7, ?NORM_EXP - 7 - 3, Config); - medium -> init_per_group_size(10, ?NORM_EXP - 10 - 1, Config); - large -> init_per_group_size(15, ?NORM_EXP - 15, Config); - huge -> - init_per_group_size( - ?BUFSIZE_EXP + 2, ?NORM_EXP - (?BUFSIZE_EXP + 2), Config); + small -> init_per_group_size(7, -3, Config); + medium -> init_per_group_size(10, -1, Config); + large -> init_per_group_size(15, 0, Config); + huge -> init_per_group_size(?BUFSIZE_EXP + 2, 0, Config); %% + inet_nra -> init_per_group_nra(inet, Config); + socket_nra -> init_per_group_nra(socket, Config); + direct_nra -> init_per_group_nra(direct, Config); + dev_inet_nra -> init_per_group_nra(dev_inet, Config); + dev_socket_nra -> init_per_group_nra(dev_socket, Config); + dev_direct_nra -> init_per_group_nra(dev_direct, Config); _ when Nm =:= inet; Nm =:= socket; Nm =:= direct -> [{backend,Nm} | Config] end. -init_per_group_size(K, L, Config) -> +init_per_group_size(K, Adj, Config) -> {_, Burden} = proplists:lookup(burden, Config), - M = L + Burden, + %% + %% 2^?NORM_EXP = Number of bytes to transfer, unadjusted + %% 2^(?NORM_EXP + Burden + Adj) = -"-, adjusted + %% + %% 2^K = Mean packet size + %% 2^(K+1) = Max packet size + %% N = 2^M = Number of packets; + %% + M = ?NORM_EXP - K + Burden + Adj, N = 1 bsl M, - %% 2^K = Mean packet size - %% 2^(K+1) = Max packet size - %% N = Number of packets; StopTag = spawn_testdata_server(K+1, N), %% {MeanSize, SizeSuffix} = size_and_suffix(1 bsl K), @@ -129,29 +146,42 @@ init_per_group_size(K, L, Config) -> [{testdata_server, StopTag}, {testdata_size, {K, M}} | Config]. +init_per_group_nra(Name, Config) -> + init_per_group(Name, [{read_ahead,false} | Config]). + end_per_group(Nm, Config) -> case Nm of - dev -> proplists:delete(burden, end_per_group(dev_size, Config)); - dev_inet -> end_per_group(inet, Config); - dev_socket -> end_per_group(socket, Config); - dev_direct -> end_per_group(direct, Config); + dev_inet -> end_per_group(inet, Config); + dev_socket -> end_per_group(socket, Config); + dev_direct -> end_per_group(direct, Config); + inet_nra -> end_per_group_nra(inet, Config); + socket_nra -> end_per_group_nra(socket, Config); + direct_nra -> end_per_group_nra(direct, Config); + dev_inet_nra -> end_per_group_nra(dev_inet, Config); + dev_socket_nra -> end_per_group_nra(dev_socket, Config); + dev_direct_nra -> end_per_group_nra(dev_direct, Config); + %% + dev -> proplists:delete(burden, end_per_group_size(Config)); _ when Nm =:= smoketest; - Nm =:= dev; Nm =:= benchmark -> proplists:delete(burden, Config); - _ when Nm =:= dev_size; - Nm =:= small; + _ when Nm =:= small; Nm =:= medium; Nm =:= large; - Nm =:= huge -> - {_, StopTag} = proplists:lookup(testdata_server, Config), - stop_testdata_server(StopTag), - proplists:delete( - testdata_server, proplists:delete(testdata_size, Config)); + Nm =:= huge -> end_per_group_size(Config); _ when Nm =:= inet; Nm =:= socket; - Nm =:= direct -> proplists:delete(backend, Config) + Nm =:= direct -> proplists:delete(backend, Config) end. +end_per_group_size(Config) -> + {_, StopTag} = proplists:lookup(testdata_server, Config), + stop_testdata_server(StopTag), + proplists:delete( + testdata_server, proplists:delete(testdata_size, Config)). + +end_per_group_nra(Name, Config) -> + proplists:delete(read_ahead, end_per_group(Name, Config)). + size_and_suffix(P) -> size_and_suffix(P, 1, ["", "K", "M", "T", "Z"]). @@ -202,13 +232,15 @@ tc2active(TC) -> end. xfer(Config, TC) when is_list(Config) -> - {_, Backend} = proplists:lookup(backend, Config), - {_, BindAddr} = proplists:lookup(bind_addr, Config), - {_, TestdataSize} = proplists:lookup(testdata_size, Config), - run_xfer(TC, Backend, BindAddr, TestdataSize, testdata()). + Params = + #{ Key => Value || + {Key, Value} <- Config, + lists:member( + Key, [backend, read_ahead, bind_addr, testdata_size]) }, + run_xfer(TC, Params, testdata()). run_xfer( - TC, Backend, BindAddr, {K, _} = TestdataSize, + TC, #{ bind_addr := BindAddr } = Params, #{iovecs := Iovecs, packet_sizes := PacketSizes, total_size := TotalSize}) -> @@ -220,15 +252,13 @@ run_xfer( fun () -> try %% Send iovecs efficiently - {ok, L} = - gen_tcp:listen( - 0, [{ifaddr,BindAddr}, {sndbuf, 2 bsl K}]), + {ok, L} = gen_tcp:listen(0, [{ifaddr,BindAddr}]), {ok, {IP,Port}} = inet:sockname(L), - Sockaddr = - #{family => inet, addr => IP, port => Port}, + Sockaddr = #{family => inet, addr => IP, port => Port}, Parent ! {Tag, Sockaddr}, {ok, A} = gen_tcp:accept(L), ok = gen_tcp:close(L), + ok = inet:setopts(A, [{sndbuf, ?BUFSIZE}]), send_loop(A, Iovecs), ok = gen_tcp:close(A) catch Class : Reason : Stacktrace -> @@ -241,10 +271,9 @@ run_xfer( receive {Tag, Sockaddr} -> ct:pal("try connect to ~p" - "~n Backend: ~p" "~n TC: ~p" - "~n", [Sockaddr, Backend, TC]), - C = case connect(Backend, Sockaddr, TC) of + "~n", [Sockaddr, TC]), + C = case connect(Params, Sockaddr, TC) of {ok, CSock} -> CSock; {error, eaddrnotavail = CReason} -> @@ -257,7 +286,7 @@ run_xfer( assert({ok, TotalSize}, recv_loop(C, PacketSizes, TC)), T2 = erlang:monotonic_time(), T = erlang:convert_time_unit(T2 - T1, native, millisecond), - report_MByte_s(Backend, TestdataSize, TC, TotalSize, T) + report_MByte_s(Params, TC, TotalSize, T) catch Class : Reason : Stacktrace -> ct:pal( "Receiver crash [~w] ~w : ~p~n ~p~n", @@ -270,7 +299,7 @@ run_xfer( %% erlang:raise(Class, Reason, Stacktrace) after - close(Backend, C), + close(Params, C), receive {'DOWN',Mref,_,_,_} -> ok end end; {'DOWN',Mref,_,_,Reason} -> @@ -284,7 +313,7 @@ send_loop(S, [Iovec | Iovecs]) -> send_loop(S, Iovecs). -connect(direct, Sockaddr, _) -> +connect(#{ backend := direct }, Sockaddr, _) -> {ok, S} = socket:open(?DOMAIN, stream), ok = socket:bind(S, any), ok = socket:setopt(S, {socket,rcvbuf}, ?BUFSIZE), @@ -296,29 +325,45 @@ connect(direct, Sockaddr, _) -> Error -> Error end; -connect(Backend, Sockaddr, active_raw) -> % {active,true}, {packet,raw} +connect(#{ backend := Backend } = Params, Sockaddr, active_raw) -> + %% {active,true}, {packet,raw} Opts = [{inet_backend,Backend}, binary, {active,true}, - ?DOMAIN, {recbuf,?BUFSIZE}], + ?DOMAIN, {recbuf,?BUFSIZE}] ++ + case Params of + #{ read_ahead := false } -> [{read_ahead,false}]; + #{} -> [] + end, io:format("gen_tcp:connect(~p, ~p).~n", [Sockaddr, Opts]), gen_tcp:connect(Sockaddr, Opts); -connect(Backend, Sockaddr, TC) -> +connect(#{ backend := Backend } = Params, Sockaddr, TC) -> Opts = [{inet_backend,Backend}, binary, {active,tc2active(TC)}, {packet,4}, - ?DOMAIN, {recbuf,?BUFSIZE}], + ?DOMAIN, {recbuf,?BUFSIZE}] ++ + case Params of + #{ read_ahead := false } -> [{read_ahead,false}]; + #{} -> [] + end, io:format("gen_tcp:connect(~p, ~p).~n", [Sockaddr, Opts]), gen_tcp:connect(Sockaddr, Opts). -close(direct, S) -> +close(#{ backend := direct }, S) -> socket:close(S); -close(_, S) -> +close(#{ backend := _}, S) -> gen_tcp:close(S). -report_MByte_s(Backend, {K, M}, TC, Size, Time) -> +report_MByte_s( + #{ backend := Backend, testdata_size := {K, M} } = Params, TC, Size, Time) -> ct:log("Size: ~w. Time: ~w.", [Size, Time]), - Name = io_lib:format("~w 2^(~w+~w)-~w", [Backend, K, M, TC]), + ReadAhead = + case Params of + #{ read_ahead := false } -> nra; + #{} -> ra + end, + Name = + io_lib:format("~w-~w 2^(~w+~w)-~w", [Backend, ReadAhead, K, M, TC]), {Value, Suffix} = size_and_suffix(Size * 1000 / Time), report(Name, Value, Suffix++"Byte/s"). @@ -665,3 +710,16 @@ term_to_string(Term) -> -compile({inline, [assert/2]}). assert(X, X) -> ok; assert(X, Y) -> error({assert, X, Y}). + + +-ifdef(undefined). +ts() -> ts(erlang:system_info(start_time)). + +ts(TS) -> + erlang:convert_time_unit(st() - TS, native, microsecond). + +st() -> erlang:monotonic_time(). + +dbg(Function, Line, Term) -> + erlang:display({self(), {?MODULE, Function, Line}, Term}). +-endif.