From 86a6680e8b517a89762d9f3a5360204774351db5 Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 23 Dec 2024 23:36:55 +0100 Subject: [PATCH] feat: enhanced qoe reporting - qoe also tracks `conn` and `pub` - write client qoe info to disklog (--qoelog logfile) - dump to csv from qoelog file --- src/emqtt_bench.erl | 457 ++++++++++++++++++++++---------------------- 1 file changed, 224 insertions(+), 233 deletions(-) diff --git a/src/emqtt_bench.erl b/src/emqtt_bench.erl index a201834..889c4bd 100644 --- a/src/emqtt_bench.erl +++ b/src/emqtt_bench.erl @@ -18,6 +18,7 @@ -include_lib("emqtt/include/emqtt.hrl"). -define(shared_padding_tab, emqtt_bench_shared_payload). +-define(QoELog, qoe_dlog). -export([ main/1 , main/2 @@ -46,11 +47,13 @@ "If --prefix is provided, the prefix is added otherwise " "client ID is the assigned sequence number."). --define(PUB_OPTS, +%% common opts for conn, pub and sub. +-define(COMMON_OPTS, [{help, undefined, "help", boolean, "help information"}, {dist, $d, "dist", boolean, "enable distribution port"}, + %% == Traffic related == {host, $h, "host", {string, "localhost"}, "mqtt server hostname or comma-separated hostnames"}, {port, $p, "port", {integer, 1883}, @@ -59,15 +62,75 @@ "mqtt protocol version: 3 | 4 | 5"}, {count, $c, "count", {integer, 200}, "max count of clients"}, - {startnumber, $n, "startnumber", {integer, 0}, ?STARTNUMBER_DESC}, + {conn_rate, $R, "connrate", {integer, 0}, "connection rate(/s), default: 0, fallback to use --interval"}, {interval, $i, "interval", {integer, 10}, "interval of connecting to the broker"}, - {interval_of_msg, $I, "interval_of_msg", {integer, 1000}, - "interval of publishing message(ms)"}, + {ifaddr, undefined, "ifaddr", string, + "local ipaddress or interface address"}, + {prefix, undefined, "prefix", string, ?PREFIX_DESC}, + {shortids, $s, "shortids", {boolean, false}, ?SHORTIDS_DESC}, + {startnumber, $n, "startnumber", {integer, 0}, ?STARTNUMBER_DESC}, + {num_retry_connect, undefined, "num-retry-connect", {integer, 0}, + "number of times to retry estabilishing a connection before giving up"}, + {conn_rate, $R, "connrate", {integer, 0}, "connection rate(/s), default: 0, fallback to use --interval"}, + {reconnect, undefined, "reconnect", {integer, 0}, + "max retries of reconnects. 0: disabled"}, + %% == Transport: TCP, TLS, QUIC, WS == + {ssl, $S, "ssl", {boolean, false}, + "ssl socket for connecting to server"}, + {cacertfile, undefined, "cacertfile", string, + "CA certificate for server verification"}, + {certfile, undefined, "certfile", string, + "client certificate for authentication, if required by server"}, + {keyfile, undefined, "keyfile", string, + "client private key for authentication, if required by server"}, + {quic, undefined, "quic", {string, "false"}, + "QUIC transport"}, + {ws, undefined, "ws", {boolean, false}, + "websocket transport"}, + {nst_dets_file, undefined, "load-qst", string, "load quic session tickets from dets file"}, + %% == MQTT layer == {username, $u, "username", string, "username for connecting to server"}, {password, $P, "password", string, "password for connecting to server"}, + {keepalive, $k, "keepalive", {integer, 300}, + "keep alive in seconds"}, + {clean, $C, "clean", {boolean, true}, + "clean session"}, + {expiry, $x, "session-expiry", {integer, 0}, + "Set 'Session-Expiry' for persistent sessions (seconds)"}, + %%% == Perf related == + {lowmem, $l, "lowmem", boolean, "enable low mem mode, but use more CPU"}, + {force_major_gc_interval, undefined, "force-major-gc-interval", {integer, 0}, + "interval in milliseconds in which a major GC will be forced on the " + "bench processes. a value of 0 means disabled (default). this only " + "takes effect when used together with --lowmem."}, + %%% == Monitoring & Loggings == + {qoe, $Q, "qoe", {atom, false}, + "set 'true' to enable QoE tracking. set 'dump' to dump QoE disklog to csv then exit"}, + {qoelog, undefined, "qoelog", {string, ""}, + "Write QoE event logs to the QoE disklog file for post processing"}, + {prometheus, undefined, "prometheus", undefined, + "Enable metrics collection via Prometheus. Usually used with --restapi to enable scraping endpoint." + }, + {restapi, undefined, "restapi", {string, disabled}, + "Enable REST API for monitoring and control. For now only serves /metrics. " + "Can be set to IP:Port to listen on a specific IP and Port, or just Port " + "to listen on all interfaces on that port." + }, + {log_to, undefined, "log_to", {atom, console}, + "Control where the log output goes. " + "console: directly to the console " + "null: quietly, don't output any logs." + } + ] + ). + +-define(PUB_OPTS, ?COMMON_OPTS ++ + [ + {interval_of_msg, $I, "interval_of_msg", {integer, 1000}, + "interval of publishing message(ms)"}, {topic, $t, "topic", string, "topic subscribe, support %u, %c, %i, %s variables"}, {payload_hdrs, undefined, "payload-hdrs", {string, ""}, @@ -90,38 +153,10 @@ "json file defining topics and payloads"}, {qos, $q, "qos", {integer, 0}, "subscribe qos"}, - {qoe, $Q, "qoe", {boolean, false}, - "Enable QoE tracking"}, {retain, $r, "retain", {boolean, false}, "retain message"}, - {keepalive, $k, "keepalive", {integer, 300}, - "keep alive in seconds"}, - {clean, $C, "clean", {boolean, true}, - "clean start"}, - {reconnect, undefined, "reconnect", {integer, 0}, - "max retries of reconnects. 0: disabled"}, - {expiry, $x, "session-expiry", {integer, 0}, - "Set 'Session-Expiry' for persistent sessions (seconds)"}, {limit, $L, "limit", {integer, 0}, "The max message count to publish, 0 means unlimited"}, - {ssl, $S, "ssl", {boolean, false}, - "ssl socket for connecting to server"}, - {cacertfile, undefined, "cacertfile", string, - "CA certificate for server verification"}, - {certfile, undefined, "certfile", string, - "client certificate for authentication, if required by server"}, - {keyfile, undefined, "keyfile", string, - "client private key for authentication, if required by server"}, - {ws, undefined, "ws", {boolean, false}, - "websocket transport"}, - {quic, undefined, "quic", {string, "false"}, - "QUIC transport"}, - {nst_dets_file, undefined, "load-qst", string, "load quic session tickets from dets file"}, - {ifaddr, undefined, "ifaddr", string, - "One or multiple (comma-separated) source IP addresses"}, - {prefix, undefined, "prefix", string, ?PREFIX_DESC}, - {shortids, $s, "shortids", {boolean, false}, ?SHORTIDS_DESC}, - {lowmem, $l, "lowmem", boolean, "enable low mem mode, but use more CPU"}, {inflight, $F,"inflight", {integer, 1}, "maximum inflight messages for QoS 1 an 2, value 0 for 'infinity'"}, {wait_before_publishing, $w, "wait-before-publishing", {boolean, false}, @@ -133,49 +168,15 @@ {min_random_wait, undefined,"min-random-wait", {integer, 0}, "minimum randomized period in ms that each publisher will wait before " "starting to publish (uniform distribution)"}, - {num_retry_connect, undefined, "num-retry-connect", {integer, 0}, - "number of times to retry estabilishing a connection before giving up"}, - {conn_rate, $R, "connrate", {integer, 0}, "connection rate(/s), default: 0, fallback to use --interval"}, - {force_major_gc_interval, undefined, "force-major-gc-interval", {integer, 0}, - "interval in milliseconds in which a major GC will be forced on the " - "bench processes. a value of 0 means disabled (default). this only " - "takes effect when used together with --lowmem."}, - {log_to, undefined, "log_to", {atom, console}, - "Control where the log output goes. " - "console: directly to the console " - "null: quietly, don't output any logs." - }, {retry_interval, 0, "retry-interval", {integer, 0}, "Publisher's resend interval (in seconds) if the expected " "acknowledgement for a inflight packet is not " "received within this interval. Default value 0 means no resend." - }, - {prometheus, undefined, "prometheus", undefined, - "Enable metrics collection via Prometheus. Usually used with --restapi to enable scraping endpoint." - }, - {restapi, undefined, "restapi", {string, disabled}, - "Enable REST API for monitoring and control. For now only serves /metrics. " - "Can be set to IP:Port to listen on a specific IP and Port, or just Port " - "to listen on all interfaces on that port." } ]). --define(SUB_OPTS, - [{help, undefined, "help", boolean, - "help information"}, - {dist, $d, "dist", boolean, - "enable distribution port"}, - {host, $h, "host", {string, "localhost"}, - "mqtt server hostname or comma-separated hostnames"}, - {port, $p, "port", {integer, 1883}, - "mqtt server port number"}, - {version, $V, "version", {integer, 5}, - "mqtt protocol version: 3 | 4 | 5"}, - {count, $c, "count", {integer, 200}, - "max count of clients"}, - {startnumber, $n, "startnumber", {integer, 0}, ?STARTNUMBER_DESC}, - {interval, $i, "interval", {integer, 10}, - "interval of connecting to the broker"}, +-define(SUB_OPTS, ?COMMON_OPTS ++ + [ {topic, $t, "topic", string, "topic subscribe, support %u, %c, %i variables"}, {payload_hdrs, undefined, "payload-hdrs", {string, []}, @@ -185,128 +186,10 @@ "ts: publish latency counting." }, {qos, $q, "qos", {integer, 0}, - "subscribe qos"}, - {qoe, $Q, "qoe", {boolean, false}, - "Enable QoE tracking"}, - {username, $u, "username", string, - "username for connecting to server"}, - {password, $P, "password", string, - "password for connecting to server"}, - {keepalive, $k, "keepalive", {integer, 300}, - "keep alive in seconds"}, - {clean, $C, "clean", {boolean, true}, - "clean start"}, - {reconnect, undefined, "reconnect", {integer, 0}, - "max retries of reconnects. 0: disabled"}, - {expiry, $x, "session-expiry", {integer, 0}, - "Set 'Session-Expiry' for persistent sessions (seconds)"}, - {ssl, $S, "ssl", {boolean, false}, - "ssl socket for connecting to server"}, - {cacertfile, undefined, "cacertfile", string, - "CA certificate for server verification"}, - {certfile, undefined, "certfile", string, - "client certificate for authentication, if required by server"}, - {keyfile, undefined, "keyfile", string, - "client private key for authentication, if required by server"}, - {ws, undefined, "ws", {boolean, false}, - "websocket transport"}, - {quic, undefined, "quic", {string, "false"}, - "QUIC transport"}, - {nst_dets_file, undefined, "load-qst", string, "load quic session tickets from dets file"}, - {ifaddr, undefined, "ifaddr", string, - "local ipaddress or interface address"}, - {prefix, undefined, "prefix", string, ?PREFIX_DESC}, - {shortids, $s, "shortids", {boolean, false}, ?SHORTIDS_DESC}, - {lowmem, $l, "lowmem", boolean, "enable low mem mode, but use more CPU"}, - {num_retry_connect, undefined, "num-retry-connect", {integer, 0}, - "number of times to retry estabilishing a connection before giving up"}, - {conn_rate, $R, "connrate", {integer, 0}, "connection rate(/s), default: 0, fallback to use --interval"}, - {force_major_gc_interval, undefined, "force-major-gc-interval", {integer, 0}, - "interval in milliseconds in which a major GC will be forced on the " - "bench processes. a value of 0 means disabled (default). this only " - "takes effect when used together with --lowmem."}, - {log_to, undefined, "log_to", {atom, console}, - "Control where the log output goes. " - "console: directly to the console " - "null: quietly, don't output any logs." - }, - {prometheus, undefined, "prometheus", undefined, - "Enable metrics collection via Prometheus. Usually used with --restapi to enable scraping endpoint." - }, - {restapi, undefined, "restapi", {string, disabled}, - "Enable REST API for monitoring and control. For now only serves /metrics. " - "Can be set to IP:Port to listen on a specific IP and Port, or just Port " - "to listen on all interfaces on that port." - } + "subscribe qos"} ]). --define(CONN_OPTS, [ - {help, undefined, "help", boolean, - "help information"}, - {dist, $d, "dist", boolean, - "enable distribution port"}, - {host, $h, "host", {string, "localhost"}, - "mqtt server hostname or comma-separated hostnames"}, - {port, $p, "port", {integer, 1883}, - "mqtt server port number"}, - {version, $V, "version", {integer, 5}, - "mqtt protocol version: 3 | 4 | 5"}, - {count, $c, "count", {integer, 200}, - "max count of clients"}, - {startnumber, $n, "startnumber", {integer, 0}, ?STARTNUMBER_DESC}, - {qoe, $Q, "qoe", {boolean, false}, - "Enable QoE tracking"}, - {interval, $i, "interval", {integer, 10}, - "interval of connecting to the broker"}, - {username, $u, "username", string, - "username for connecting to server"}, - {password, $P, "password", string, - "password for connecting to server"}, - {keepalive, $k, "keepalive", {integer, 300}, - "keep alive in seconds"}, - {clean, $C, "clean", {boolean, true}, - "clean session"}, - {reconnect, undefined, "reconnect", {integer, 0}, - "max retries of reconnects. 0: disabled"}, - {expiry, $x, "session-expiry", {integer, 0}, - "Set 'Session-Expiry' for persistent sessions (seconds)"}, - {ssl, $S, "ssl", {boolean, false}, - "ssl socket for connecting to server"}, - {cacertfile, undefined, "cacertfile", string, - "CA certificate for server verification"}, - {certfile, undefined, "certfile", string, - "client certificate for authentication, if required by server"}, - {keyfile, undefined, "keyfile", string, - "client private key for authentication, if required by server"}, - {quic, undefined, "quic", {string, "false"}, - "QUIC transport"}, - {nst_dets_file, undefined, "load-qst", string, "load quic session tickets from dets file"}, - {ifaddr, undefined, "ifaddr", string, - "local ipaddress or interface address"}, - {prefix, undefined, "prefix", string, ?PREFIX_DESC}, - {shortids, $s, "shortids", {boolean, false}, ?SHORTIDS_DESC}, - {lowmem, $l, "lowmem", boolean, "enable low mem mode, but use more CPU"}, - {num_retry_connect, undefined, "num-retry-connect", {integer, 0}, - "number of times to retry estabilishing a connection before giving up"}, - {conn_rate, $R, "connrate", {integer, 0}, "connection rate(/s), default: 0, fallback to use --interval"}, - {force_major_gc_interval, undefined, "force-major-gc-interval", {integer, 0}, - "interval in milliseconds in which a major GC will be forced on the " - "bench processes. a value of 0 means disabled (default). this only " - "takes effect when used together with --lowmem."}, - {log_to, undefined, "log_to", {atom, console}, - "Control where the log output goes. " - "console: directly to the console " - "null: quietly, don't output any logs." - }, - {prometheus, undefined, "prometheus", undefined, - "Enable metrics collection via Prometheus. Usually used with --restapi to enable scraping endpoint." - }, - {restapi, undefined, "restapi", {string, disabled}, - "Enable REST API for monitoring and control. For now only serves /metrics. " - "Can be set to IP:Port to listen on a specific IP and Port, or just Port " - "to listen on all interfaces on that port." - } - ]). +-define(CONN_OPTS, ?COMMON_OPTS). -define(cnt_map, cnt_map). -define(hdr_cnt64, "cnt64"). @@ -474,11 +357,11 @@ prepare(PubSub, Opts) -> ok end, prepare_quicer(Opts), + init_qoe(Opts), application:ensure_all_started(emqtt_bench). init() -> process_flag(trap_exit, true), - ets:new(qoe_store, [named_table, public, ordered_set]), Now = erlang:monotonic_time(millisecond), Counters = counters(), CRef = counters:new(length(Counters)+1, [write_concurrency]), @@ -500,7 +383,7 @@ main_loop(Uptime, Count) -> return_print("publish complete~n", []); stats -> print_stats(Uptime), - maybe_print_qoe(Count), + maybe_sum_qoe(Count), maybe_dump_nst_dets(Count), garbage_collect(), main_loop(Uptime, Count); @@ -514,28 +397,43 @@ maybe_dump_nst_dets(Count)-> andalso undefined =/= dets:info(dets_quic_nsts) andalso ets:to_dets(quic_clients_nsts, dets_quic_nsts). -maybe_print_qoe(Count) -> +maybe_sum_qoe(Count) -> %% latency statistic for %% - handshake %% - conn %% - sub - case ets:info(qoe_store, size) of - Count -> - do_print_qoe(ets:tab2list(qoe_store)); - _ -> - skip + case get(is_qoe_dlog) of + true -> + Data = ets:tab2list(qoe_store), + maybe_dlog_qoe(Data), + lists:foreach(fun({ClientId, _}) -> + ets:delete(qoe_store, ClientId) + end, Data); + false -> + case Count == ets:info(qoe_store, size) of + true -> + do_print_qoe(Data = ets:tab2list(qoe_store)), + %% clear to print once + lists:foreach(fun({ClientId, _}) -> + ets:delete(qoe_store, ClientId) + end, Data); + false -> + skip + end end. + do_print_qoe([]) -> skip; do_print_qoe(Data) -> - {H, C, S} = lists:unzip3([ V || {_C, V} <-Data]), + {H, C, S} = lists:foldl(fun({_Client, {_, H1, C1, S1}}, {H, C, S}) -> + {[H1 | H], [C1 | C], [S1 | S]} + end, {[], [], []}, Data), lists:foreach( - fun({_Name, 0})-> skip; + fun({_Name, []})-> skip; ({Name, X}) -> io:format("~p, avg: ~pms, P95: ~pms, Max: ~pms ~n", [Name, lists:sum(X)/length(X), p95(X), lists:max(X)]) - end, [{handshake, H}, {connect, C}, {subscribe, S}]), - lists:foreach(fun({Client, _}) -> ets:delete(qoe_store, Client) end, Data). + end, [{handshake, H}, {connect, C}, {subscribe, S}]). print_stats(Uptime) -> [print_stats(Uptime, Cnt) || @@ -689,20 +587,24 @@ connect(Parent, N, PubSub, Opts) -> inc_counter(Prometheus, connect_succ), Res = case PubSub of - conn -> ok; + conn -> + ok = maybe_update_client_qoe(Client, Opts), + ok; sub -> subscribe(Client, N, AllOpts); - pub -> case MRef of - undefined when TopicPayloadRend == undefined -> - erlang:send_after(RandomPubWaitMS, self(), publish); - undefined -> - maps:foreach(fun(TopicName, #{name := TopicName, interval_ms := DelayMs}) -> - erlang:send_after(RandomPubWaitMS + DelayMs, self(), {publish, TopicName}) - end, TopicPayloadRend); - _ -> - %% send `publish' only when all publishers - %% are in place. - ok - end + pub -> + ok = maybe_update_client_qoe(Client, Opts), + case MRef of + undefined when TopicPayloadRend == undefined -> + erlang:send_after(RandomPubWaitMS, self(), publish); + undefined -> + maps:foreach(fun(TopicName, #{name := TopicName, interval_ms := DelayMs}) -> + erlang:send_after(RandomPubWaitMS + DelayMs, self(), {publish, TopicName}) + end, TopicPayloadRend); + _ -> + %% send `publish' only when all publishers + %% are in place. + ok + end end, case Res of {error, _SubscribeError} -> @@ -905,30 +807,12 @@ pub_limit_fun_init(N) when is_integer(N), N > 0 -> end. subscribe(Client, N, Opts) -> - Prometheus = lists:member(prometheus, Opts), Qos = proplists:get_value(qos, Opts), Res = emqtt:subscribe(Client, [{Topic, Qos} || Topic <- topics_opt(Opts)]), case Res of {ok, _, _} -> - case proplists:get_value(qoe, emqtt:info(Client), false) of - false -> - ok; - #{ initialized := StartTs - , handshaked := HSTs - , connected := ConnTs - , subscribed := SubTs - } -> - ElapsedHandshake = HSTs - StartTs, - ElapsedConn = ConnTs - StartTs, - ElapsedSub = SubTs - StartTs, - histogram_observe(Prometheus, mqtt_client_handshake_duration, ElapsedHandshake), - histogram_observe(Prometheus, mqtt_client_connect_duration, ElapsedConn), - histogram_observe(Prometheus, mqtt_client_subscribe_duration, ElapsedSub), - true = ets:insert(qoe_store, {proplists:get_value(client_id, Opts), - {ElapsedHandshake, ElapsedConn, ElapsedSub} - }), - ok - end; + maybe_update_client_qoe(Client, Opts), + ok; {error, Reason}-> io:format("client(~w): subscribe error - ~p~n", [N, Reason]) end, @@ -1644,3 +1528,110 @@ prepare_quicer(Opts) -> _ -> ok end. + +maybe_update_client_qoe(Client, Opts) -> + ClientInfo = emqtt:info(Client), + case proplists:get_value(qoe, ClientInfo, false) of + false -> + ok; + QoEMap when is_map(QoEMap) -> + Prometheus = lists:member(prometheus, Opts), + ClientId = proplists:get_value(clientid, ClientInfo), + qoe_store_insert(Prometheus, ClientId, QoEMap) + end. + +-define(invalid_elapsed, -1). +qoe_store_insert(Prometheus, + ClientId, + #{ initialized := StartTs + , handshaked := HSTs + , connected := ConnTs + } = QoE) -> + ElapsedHandshake = HSTs - StartTs, + ElapsedConn = ConnTs - StartTs, + ElapsedSub = case maps:get(subscribed, QoE, undefined) of + undefined -> + %% invalid + ?invalid_elapsed; + SubTs -> + SubTs - StartTs + end, + histogram_observe(Prometheus, mqtt_client_handshake_duration, ElapsedHandshake), + histogram_observe(Prometheus, mqtt_client_connect_duration, ElapsedConn), + histogram_observe(Prometheus, mqtt_client_subscribe_duration, ElapsedSub), + Term = {ClientId, {StartTs, ElapsedHandshake, ElapsedConn, ElapsedSub}}, + true = ets:insert(qoe_store, Term), + ok. + +init_qoe(Opts) -> + put(is_qoe_dlog, false), + case proplists:get_value(qoe, Opts) of + false -> + skip; + dump -> + case proplists:get_value(qoelog, Opts) of + "" -> + error("qoelog not specified"); + File -> + ok = open_qoe_disklog(File), + qoe_disklog_to_csv(File), + disk_log:close(?QoELog), + erlang:halt() + end; + true -> + ets:new(qoe_store, [named_table, public, set]), + %% QoE disk Log file + case proplists:get_value(qoelog, Opts) of + "" -> + skip; + File -> + put(is_qoe_dlog, true), + open_qoe_disklog(File) + end + end. + +open_qoe_disklog(File) -> + case disk_log:open([{name, ?QoELog}, {file, File}, {repair, true}, {type, halt}, + {size, infinity}]) of + {ok, _Log} -> ok; + {repaired, _Log, {recovered, Recov}, {badbytes, BadBytes}} -> + io:format("Open QoElog: ~p, recovered ~p bytes, ~p bad bytes~n", + [File, Recov, BadBytes]); + {error, Reason} -> + error({open_qoe_log, Reason}) + end. + +qoe_disklog_to_csv(File) -> + TargetFile = File++".csv", + {ok, CsvH} = file:open(TargetFile, [write]), + io:format("No execution, just dumping QoE dlog to csv file: ~p~n", [TargetFile]), + file:write(CsvH, "ClientId,TS,Handshake,Connect,Subscribe\n"), + Writer = fun(Terms) -> + Lines = lists:map( + fun([ClientId, TS, Handshake, Connect, Subscribe]) -> + io_lib:format("~s,~p,~p,~p,~p~n", + [ClientId, TS, Handshake, Connect, Subscribe]) + end, Terms), + file:write(CsvH, Lines) + end, + do_qoe_disklog_to_csv(?QoELog, start, Writer), + file:close(CsvH). + +do_qoe_disklog_to_csv(Log, Cont0, Writer) -> + case disk_log:chunk(Log, Cont0) of + {Cont, Terms} -> + Writer(Terms), + do_qoe_disklog_to_csv(Log, Cont, Writer); + {Cont, Terms, _BadBytes} -> + Writer(Terms), + do_qoe_disklog_to_csv(Log, Cont, Writer); + eof -> + ok + end. + +maybe_dlog_qoe(Data) -> + Offset = erlang:time_offset(millisecond), + DLog = [ [ClientId, Offset + StartTs, + ElapsedHandshake, ElapsedConn, ElapsedSub] + || {ClientId, {StartTs, ElapsedHandshake, ElapsedConn, ElapsedSub}} <- Data], + ok = disk_log:log_terms(?QoELog, DLog).