From a379506bc2d25857766d74a7422189e20cbabeec Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Wed, 24 Jan 2024 18:58:45 +0100 Subject: [PATCH] feat: support random bytes in payload --- src/emqtt_bench.erl | 78 +++++++++++++++++++++++++-------------------- 1 file changed, 44 insertions(+), 34 deletions(-) diff --git a/src/emqtt_bench.erl b/src/emqtt_bench.erl index 1746497..7607cdd 100644 --- a/src/emqtt_bench.erl +++ b/src/emqtt_bench.erl @@ -78,7 +78,13 @@ {size, $s, "size", {integer, 256}, "payload size"}, {message, $m, "message", string, - "set the message content for publish"}, + "Set the message content for publish. " + "Either a literal message content, or path to a file with payload template " + "specified via 'template://'. " + "Available variables: %TIMESTAMP%, %TIMESTAMPMS%, %TIMESTAMPUS%, %TIMESTAMPNS%, %UNIQUE%, %RANDOM%. " + "When using 'template://', --size option does not have effect except for when %RANDOM% placeholder " + "is used." + }, {qos, $q, "qos", {integer, 0}, "subscribe qos"}, {qoe, $Q, "qoe", {boolean, false}, @@ -348,6 +354,7 @@ main(pub, Opts) -> undefined end, start(pub, [ {payload, Payload} + , {payload_size, Size} , {limit_fun, MsgLimit} , {publish_signal_pid, PublishSignalPid} | Opts]); @@ -819,39 +826,41 @@ publish(Client, Opts) -> ok = ensure_publish_begin_time(), Flags = [{qos, proplists:get_value(qos, Opts)}, {retain, proplists:get_value(retain, Opts)}], - Payload0 = proplists:get_value(payload, Opts), - Payload = case Payload0 of - {template, Bin} -> - Now = os:system_time(nanosecond), - TsNS = integer_to_binary(Now), - TsUS = integer_to_binary(erlang:convert_time_unit(Now, nanosecond, microsecond)), - TsMS = integer_to_binary(erlang:convert_time_unit(Now, nanosecond, millisecond)), - Unique = integer_to_binary(erlang:unique_integer()), - Substitutions = - #{ <<"%TIMESTAMP%">> => TsMS - , <<"%TIMESTAMPMS%">> => TsMS - , <<"%TIMESTAMPUS%">> => TsUS - , <<"%TIMESTAMPNS%">> => TsNS - , <<"%UNIQUE%">> => Unique - }, - maps:fold( - fun(Placeholder, Val, Acc) -> binary:replace(Acc, Placeholder, Val) end, - Bin, - Substitutions); - _ -> - Payload0 - end, - %% prefix dynamic headers. - NewPayload = case proplists:get_value(payload_hdrs, Opts, []) of - [] -> Payload; - PayloadHdrs -> - with_payload_headers(PayloadHdrs, Payload) - end, - case emqtt:publish(Client, topic_opt(Opts), NewPayload, Flags) of - ok -> ok; - {ok, _} -> ok; - {error, Reason} -> {error, Reason} - end. + Size = proplists:get_value(payload_size, Opts), + Payload0 = proplists:get_value(payload, Opts), + Payload = case Payload0 of + {template, Bin} -> + Now = os:system_time(nanosecond), + TsNS = integer_to_binary(Now), + TsUS = integer_to_binary(erlang:convert_time_unit(Now, nanosecond, microsecond)), + TsMS = integer_to_binary(erlang:convert_time_unit(Now, nanosecond, millisecond)), + Unique = integer_to_binary(erlang:unique_integer()), + Substitutions = + #{ <<"%TIMESTAMP%">> => TsMS + , <<"%TIMESTAMPMS%">> => TsMS + , <<"%TIMESTAMPUS%">> => TsUS + , <<"%TIMESTAMPNS%">> => TsNS + , <<"%UNIQUE%">> => Unique + , <<"%RANDOM%">> => rand:bytes(Size) + }, + maps:fold( + fun(Placeholder, Val, Acc) -> binary:replace(Acc, Placeholder, Val) end, + Bin, + Substitutions); + _ -> + Payload0 + end, + %% prefix dynamic headers. + NewPayload = case proplists:get_value(payload_hdrs, Opts, []) of + [] -> Payload; + PayloadHdrs -> + with_payload_headers(PayloadHdrs, Payload) + end, + case emqtt:publish(Client, topic_opt(Opts), NewPayload, Flags) of + ok -> ok; + {ok, _} -> ok; + {error, Reason} -> {error, Reason} + end. session_property_opts(Opts) -> case session_property_opts(Opts, #{}) of @@ -1052,6 +1061,7 @@ loop_opts(Opts) -> lists:filter(fun({K,__V}) -> lists:member(K, [ interval_of_msg , payload + , payload_size , payload_hdrs , qos , retain