diff --git a/rebar.config b/rebar.config index 8b82bf2..ddb3ed8 100644 --- a/rebar.config +++ b/rebar.config @@ -21,7 +21,16 @@ {alias, [{test, - [compile, format, hank, lint, xref, dialyzer, ct, cover, {covertool, "generate"}]}]}. + [compile, + format, + hank, + lint, + xref, + dialyzer, + eunit, + ct, + cover, + {covertool, "generate"}]}]}. {covertool, [{coverdata_files, ["ct.coverdata"]}]}. diff --git a/src/epr.erl b/src/epr.erl new file mode 100644 index 0000000..a931ae9 --- /dev/null +++ b/src/epr.erl @@ -0,0 +1,30 @@ +-module(epr). + +-compile({nowarn_unused_function, [init/1, run/1, shutdown/2]}). + +-ifdef(TEST). + +-export([init/1, run/1, shutdown/2]). + +-endif. + +init(Files) -> + ProcessingFiles = Files, + {_, AggServerPid} = epr_data_aggregator_server:start_link(), + + Pids = + lists:map(fun(FileName) -> + {ok, Pid} = epr_processor_server:start_link(FileName, AggServerPid), + Pid + end, + ProcessingFiles), + + {AggServerPid, Pids}. + +run(Pids) -> + lists:map(fun(Pid) -> spawn(gen_server, cast, [Pid, run]) end, Pids). + +shutdown(AggServerPid, ProcessorPids) -> + Pids = [AggServerPid | ProcessorPids], + % gen_server:cast(AggServerPid, stop), + lists:map(fun(Pid) -> gen_server:cast(Pid, stop) end, Pids). diff --git a/src/epr_app.erl b/src/epr_app.erl index a99286c..e75600c 100644 --- a/src/epr_app.erl +++ b/src/epr_app.erl @@ -10,11 +10,6 @@ -export([start/2, stop/1]). start(_StartType, _StartArgs) -> - ProcessingFiles = ["python/processing.py", "python/processing2.py"], - - Pids = epr_processor_engine:init(ProcessingFiles), - epr_processor_engine:run(Pids), - epr_sup:start_link(). stop(_State) -> diff --git a/src/epr_data_aggregator_server.erl b/src/epr_data_aggregator_server.erl index 727fa20..07a30fc 100644 --- a/src/epr_data_aggregator_server.erl +++ b/src/epr_data_aggregator_server.erl @@ -3,7 +3,7 @@ -behaviour(gen_server). -export([start_link/0]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). +-export([init/1, handle_call/3, handle_cast/2, terminate/2]). -type state() :: map(). @@ -18,19 +18,14 @@ init(Param) -> handle_cast({add_data, Id, Data}, State) -> NewState = State#{Id => Data}, - io:format(user, ": ~p~n", [NewState]), - {noreply, NewState}. + io:format(user, "New State: ~p~n", [NewState]), + {noreply, NewState}; +handle_cast(stop, State) -> + {stop, normal, State}. -handle_call(_, _, State) -> - {reply, ok, State}. - -handle_info(Msg, State) -> - io:format("Unexpected message: ~p~n", [Msg]), - {noreply, State}. +handle_call(get_state, _From, State) -> + {reply, State, State}. terminate(normal, State) -> - io:format(user, "Processor terminated with file name: ~p~n", [State]), - ok; -terminate(Error, _State) -> - io:format(user, "Error: ~p~n", [Error]), + io:format(user, "Processor terminated with state: ~p~n", [State]), ok. diff --git a/src/epr_processor_engine.erl b/src/epr_processor_engine.erl deleted file mode 100644 index b0765ab..0000000 --- a/src/epr_processor_engine.erl +++ /dev/null @@ -1,13 +0,0 @@ --module(epr_processor_engine). - --export([init/1, run/1]). - -init(Filenames) -> - %TODO: I want to add some error handling and only - %return those that are passed. Now we return all. - {_, AggServerPid} = epr_data_aggregator_server:start_link(), - lists:map(fun(FileName) -> epr_processor_server:start_link(FileName, AggServerPid) end, - Filenames). - -run(PidList) -> - lists:map(fun({ok, Pid}) -> spawn(gen_server, cast, [Pid, run]) end, PidList). diff --git a/src/epr_processor_server.erl b/src/epr_processor_server.erl index 215e114..63c7c3d 100644 --- a/src/epr_processor_server.erl +++ b/src/epr_processor_server.erl @@ -3,7 +3,7 @@ -behaviour(gen_server). -export([start_link/2]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). +-export([init/1, handle_call/3, handle_cast/2, terminate/2]). -type state() :: #{file_name => string(), aggregator_server => pid()}. @@ -20,35 +20,19 @@ init({FileName, AggServer}) -> handle_cast(run, #{file_name := FileName, aggregator_server := AggServer} = State) -> Command = "python3 " ++ FileName, Port = open_port({spawn, Command}, [binary, exit_status]), - loop(Port, FileName, AggServer), - {noreply, State}. + receive + {Port, {data, Data}} -> + gen_server:cast(AggServer, {add_data, FileName, Data}) + end, + + {noreply, State}; +handle_cast(stop, State) -> + {stop, normal, State}. handle_call(_, _, State) -> {reply, ok, State}. -handle_info(Msg, State) -> - io:format("Unexpected message: ~p~n", [Msg]), - {noreply, State}. - terminate(normal, State) -> io:format(user, "Processor terminated with file name: ~p~n", [State]), - ok; -terminate(Error, State) -> - io:format(user, "Error: ~p~n", [Error]), - io:format(user, "State: ~p~n", [State]), ok. - -%%%%%%%%%%%%%%%%%%%%% -%%% Private functions -%%%%%%%%%%%%%%%%%%%%% - -loop(Port, FileName, AggServer) -> - receive - {Port, {data, Data}} -> - gen_server:cast(AggServer, {add_data, FileName, Data}), - loop(Port, FileName, AggServer); - {Port, closed} -> - io:format("Python script finished~n"), - ok - end. diff --git a/test/epr_SUITE.erl b/test/epr_SUITE.erl index 492f5c5..773f94f 100644 --- a/test/epr_SUITE.erl +++ b/test/epr_SUITE.erl @@ -10,7 +10,7 @@ -export([all/0]). -export([init_per_suite/1, end_per_suite/1]). --export([start_test/1]). +-export([run_hello_world_from_python/1]). -elvis([{elvis_style, no_block_expressions, disable}]). @@ -18,7 +18,7 @@ -spec all() -> [atom()]. all() -> - [start_test]. + [run_hello_world_from_python]. -spec init_per_suite(config()) -> config(). init_per_suite(Config) -> @@ -30,6 +30,6 @@ end_per_suite(Config) -> application:stop(epr), Config. --spec start_test(Config :: config()) -> {ok, ok}. -start_test(_Config) -> +-spec run_hello_world_from_python(Config :: config()) -> ok. +run_hello_world_from_python(_Config) -> ok. diff --git a/test/epr_test.erl b/test/epr_test.erl new file mode 100644 index 0000000..4e14edc --- /dev/null +++ b/test/epr_test.erl @@ -0,0 +1,20 @@ +-module(epr_test). + +-include_lib("eunit/include/eunit.hrl"). + +workflow_test() -> + ProcessingFiles = + ["test/epr_test_python_data/processing.py", "test/epr_test_python_data/processing2.py"], + + {Aggregator, Pids} = epr:init(ProcessingFiles), + epr:run(Pids), + + timer:sleep(1000), + State = gen_server:call(Aggregator, get_state), + + ?assertEqual(#{"test/epr_test_python_data/processing.py" => <<"szia lajos\n">>, + "test/epr_test_python_data/processing2.py" => <<"szia retek\n">>}, + State), + + epr:shutdown(Aggregator, Pids), + ok. diff --git a/test/epr_test_python_data/processing.py b/test/epr_test_python_data/processing.py new file mode 100644 index 0000000..4aeb1b1 --- /dev/null +++ b/test/epr_test_python_data/processing.py @@ -0,0 +1 @@ +print("szia lajos") diff --git a/test/epr_test_python_data/processing2.py b/test/epr_test_python_data/processing2.py new file mode 100644 index 0000000..91b2049 --- /dev/null +++ b/test/epr_test_python_data/processing2.py @@ -0,0 +1 @@ +print("szia retek")