Skip to content

Commit

Permalink
Merge pull request #43 from kjellwinblad/kjell/jq_port/start_on_demand
Browse files Browse the repository at this point in the history
start port programs on demand and auto stop after configurable idle time
  • Loading branch information
kjellwinblad authored Sep 4, 2023
2 parents 02474bf + b1f1d5d commit 1b6f52d
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 17 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,16 @@ application is loaded (the jq Erlang application is loaded automatically when
* `jq_port_restart_period` (default value = 1000000) (only relevant for the
`jq_port` option) - Use this option to set how many `jq:process_json/2` calls
a port program can process before it is restarted. This is a safety option
that can be handy if it turn out that the jq port program has memory leaks.
that can be handy if it turns out that the jq port program has memory leaks.
As far as we know the port program does not have any memory leaks but it is
possible that it is leaky for inputs that we have not tested (please report a
bug if you find any leaks).
* `jq_port_auto_turn_off_time_seconds` (default value = 300) (only relevant for
the `jq_port` option) - Use this option to set how long time a port program
can be idle before it can be turned off. The port program will be automatically
turned on again when it is needed so this feature should be invisible from the
point of view of the user and is only meant to save resources. If this option
is set to 0 then the port programs will never be turned off automatically.

## Test with address sanitizer

Expand Down
4 changes: 0 additions & 4 deletions src/jq_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ start(_StartType, _StartArgs) ->
application:get_env(jq, jq_port_nr_of_jq_port_servers, erlang:system_info(schedulers)),
jq_port:set_nr_of_jq_port_servers(NrOfJQPortServers),
Res = jq_port_sup:start_link(),
%% Configure the jq port servers once they are up and running
CacheMaxSize =
application:get_env(jq, jq_filter_program_lru_cache_max_size, 500),
jq:set_filter_program_lru_cache_max_size(CacheMaxSize),
Res.

stop(_State) ->
Expand Down
81 changes: 69 additions & 12 deletions src/jq_port.erl
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,19 @@ get_filter_program_lru_cache_max_size() ->
end,
do_op_ensure_started(Op).

set_filter_program_lru_cache_max_size(PortServer, NewSize) ->
gen_server:call(PortServer,
{set_filter_program_lru_cache_max_size, NewSize},
infinity).

set_filter_program_lru_cache_max_size(NewSize)
when is_integer(NewSize), NewSize >= 0, NewSize < 1073741824 ->
Op =
fun() ->
gen_server:call(port_server(),
{set_filter_program_lru_cache_max_size, NewSize},
infinity)
Expect = lists:duplicate(jq_port:nr_of_jq_port_servers(), ok),
Expect = [set_filter_program_lru_cache_max_size(port_server_by_id(Id), NewSize) ||
Id <- lists:seq(0, jq_port:nr_of_jq_port_servers() - 1)],
ok
end,
do_op_ensure_started(Op).

Expand Down Expand Up @@ -137,9 +143,8 @@ remove_from_lookup_table(Id) ->

init(Id) ->
process_flag(trap_exit, true),
Port = start_port_program(),
State = #{
port => Port,
port => port_not_started,
id => Id,
processed_json_calls => 0,
restart_period => application:get_env(jq, jq_port_restart_period, 1000000)
Expand Down Expand Up @@ -198,6 +203,8 @@ is_port_alive(Port) ->
error({bad_ping_response, Other})
end.

kill_port(port_not_started) ->
ok;
kill_port(Port) ->
Port ! {self(), {command, <<"exit\0">>}},
erlang:port_close(Port),
Expand Down Expand Up @@ -247,7 +254,7 @@ misbehaving_port_program(State, ErrorClass, Reason) ->
end,
OldPort = state_port(State),
kill_port(OldPort),
NewPort = start_port_program(),
NewPort = port_not_started, %% will be started by the next call
NewState = State#{port => NewPort},
{reply, Ret, NewState}.

Expand All @@ -273,12 +280,45 @@ new_state_after_process_json(State) ->
0 ->
OldPort = state_port(State),
kill_port(OldPort),
NewPort = start_port_program(),
NewPort = port_not_started,
State#{port => NewPort, processed_json_calls => NrOfCalls + 1};
_ ->
State#{processed_json_calls => NrOfCalls + 1}
end.

processed_json_calls_after_call({jq_process_json, _, _, _} = _Call,
#{processed_json_calls := CallCnt} = _State) ->
CallCnt + 1;
processed_json_calls_after_call(_Call, #{processed_json_calls := CallCnt}) ->
CallCnt.

jq_port_auto_turn_off_time_ms() ->
application:get_env(jq,
jq_port_auto_turn_off_time_seconds,
300) * 1000.

maybe_send_idle_check_message(Port, NrOfProcessJSONCalls) ->
Timeout = jq_port_auto_turn_off_time_ms(),
case Timeout of
0 -> ok;
_ ->
TimeoutMessage = {turn_off_port,
#{port => Port,
processed_json_calls_on_start => NrOfProcessJSONCalls}},
{ok, _} = timer:send_after(Timeout, TimeoutMessage)
end.

handle_call(Call, From, #{port := port_not_started} = State) ->
NewPort = start_port_program(),
StateWithPort = State#{port => NewPort},
%% Configure the jq port server once it is up and running
CacheMaxSize =
application:get_env(jq, jq_filter_program_lru_cache_max_size, 500),
{reply, ok, NewState} = handle_call({set_filter_program_lru_cache_max_size, CacheMaxSize}, From, StateWithPort),
%% Set a timer to turn off the port after a while if idle
maybe_send_idle_check_message(NewPort, processed_json_calls_after_call(Call, NewState)),
%% Do the original call with the updated state
handle_call(Call, From, NewState);
handle_call({jq_process_json, FilterProgram, JSONText, TimeoutMs}, _From, State) ->
Port = state_port(State),
try
Expand Down Expand Up @@ -353,12 +393,29 @@ terminate(_Reason, State) ->
remove_from_lookup_table({?MODULE, Id}),
ok.

handle_info({turn_off_port, #{port := PortToTurnOff,
processed_json_calls_on_start := NrOfCallsOnStart}},
#{port := StatePort,
processed_json_calls := NrOfCallsNow} = State)
when PortToTurnOff =:= StatePort,
NrOfCallsOnStart =:= NrOfCallsNow ->
%% Port is idle, stop it, it will be started on next request
kill_port(PortToTurnOff),
{noreply, State#{port => port_not_started}};
handle_info({turn_off_port, #{port := PortToTurnOff}},
#{port := StatePort,
processed_json_calls := NrOfCallsNow} = State)
when PortToTurnOff =:= StatePort ->
%% Port is not idle enough to be turned off, so we set a new timer
maybe_send_idle_check_message(StatePort, NrOfCallsNow),
{noreply, State};
handle_info({turn_off_port, _}, State) ->
%% Flush message for old port
{noreply, State};
handle_info({'EXIT', Port, Reason}, #{port := Port} = State) ->
logger:error(io_lib:format("jq port program has died unexpectedly for reason ~p (state = ~p) \nTrying to restart...",
logger:error(io_lib:format("jq port program has died unexpectedly for reason ~p (state = ~p) \nTrying to restart on next request...",
[Reason, State])),
%% Let us try to start a new port
NewPort = start_port_program(),
{noreply, State#{port => NewPort}};
{noreply, State#{port => port_not_started}};
handle_info({'EXIT', Port, _Reason}, State) when is_port(Port) ->
%% Flush message from old port
{noreply, State};
Expand All @@ -374,7 +431,7 @@ handle_info(UnknownMessage, State) ->
code_change(_OldVsn, State, _Extra) ->
OldPort = state_port(State),
kill_port(OldPort),
NewPort = start_port_program(),
NewPort = port_not_started, %% Will be started on next request
NewState = State#{port => NewPort},
{ok, NewState}.

Expand Down
23 changes: 23 additions & 0 deletions test/jq_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,29 @@ concurrent_queries_t_() ->
concurrent_queries_test_() -> wrap_setup_cleanup(concurrent_queries_t_()).

-ifndef(TEST_ONLY_NIF).

port_program_turn_off_automatically_test_() ->
{timeout, 10,
fun() ->
%% This test tries to trigger relevant code paths, but
%% at the time off writing, one have to check manually
%% that the right code is triggered.
%% TODO: Use snabbkaffe to check that the right code is triggered
jq:set_implementation_module(jq_port),
application:set_env(jq, jq_port_auto_turn_off_time_seconds, 1),
{ok, [<<"42">>]} = jq:process_json(<<"42">>, <<"42">>),
timer:sleep(1100),
%% should be off now
{ok, [<<"42">>]} = jq:process_json(<<"42">>, <<"42">>),
{ok, [<<"42">>]} = jq:process_json(<<"42">>, <<"42">>),
timer:sleep(1100),
%% Should still be on now
{ok, [<<"42">>]} = jq:process_json(<<"42">>, <<"42">>),
ok
end}.



port_program_valgrind_test_() ->
{timeout, 30,
fun() ->
Expand Down

0 comments on commit 1b6f52d

Please sign in to comment.