diff --git a/CMakeLists.txt b/CMakeLists.txt index 84c9a1f..8ff4582 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,18 +5,16 @@ find_package(daq-cmake REQUIRED ) daq_setup_environment() -find_package(rcif REQUIRED) find_package(appfwk REQUIRED) find_package(opmonlib REQUIRED) -find_package(logging REQUIRED) -find_package(daqconf REQUIRED) -find_package(ers REQUIRED) -daq_codegen( randomdatalistgenerator.jsonnet TEMPLATES Structs.hpp.j2 Nljs.hpp.j2) -daq_codegen( randomdatalistgeneratorinfo.jsonnet DEP_PKGS opmonlib TEMPLATES opmonlib/InfoStructs.hpp.j2 opmonlib/InfoNljs.hpp.j2 ) +daq_codegen( listreverser.jsonnet randomdatalistgenerator.jsonnet reversedlistvalidator.jsonnet TEMPLATES Structs.hpp.j2 Nljs.hpp.j2) +daq_codegen( listreverserinfo.jsonnet randomdatalistgeneratorinfo.jsonnet reversedlistvalidatorinfo.jsonnet DEP_PKGS opmonlib TEMPLATES opmonlib/InfoStructs.hpp.j2 opmonlib/InfoNljs.hpp.j2 ) -daq_add_plugin(ListReverser duneDAQModule LINK_LIBRARIES appfwk::appfwk) -daq_add_plugin(RandomDataListGenerator duneDAQModule LINK_LIBRARIES appfwk::appfwk) -daq_add_plugin(ReversedListValidator duneDAQModule LINK_LIBRARIES appfwk::appfwk) +daq_add_library(ListCreator.cpp ListStorage.cpp LINK_LIBRARIES appfwk::appfwk) + +daq_add_plugin(ListReverser duneDAQModule LINK_LIBRARIES listrev) +daq_add_plugin(RandomDataListGenerator duneDAQModule LINK_LIBRARIES listrev) +daq_add_plugin(ReversedListValidator duneDAQModule LINK_LIBRARIES listrev) daq_install() diff --git a/cmake/listrevConfig.cmake.in b/cmake/listrevConfig.cmake.in index 2683c8e..54ef9a0 100644 --- a/cmake/listrevConfig.cmake.in +++ b/cmake/listrevConfig.cmake.in @@ -5,9 +5,6 @@ include(CMakeFindDependencyMacro) find_dependency(opmonlib) find_dependency(appfwk) -find_dependency(rcif) -find_dependency(logging) -find_dependency(ers) # Figure out whether or not this dependency is an installed package or # in repo form diff --git a/integtest/listrev_test.py b/integtest/listrev_test.py index 298091f..1ee1c31 100755 --- a/integtest/listrev_test.py +++ b/integtest/listrev_test.py @@ -10,7 +10,8 @@ # Default values for validation parameters check_for_logfile_errors=True -expected_event_count=run_duration +expected_event_count=run_duration*10 +expected_event_count_tolerance = expected_event_count / 10 # The next three variable declarations *must* be present as globals in the test # file. They're read by the "fixtures" in conftest.py to determine how @@ -36,12 +37,14 @@ g_conf={"detector": {"op_env": "integtest"},"boot": { "use_connectivity_service": use_connectivity_service}, "listrev": {"apps": ["rv", "g"]}} r_conf={"detector": {"op_env": "integtest"},"boot": { "use_connectivity_service": use_connectivity_service}, "listrev": {"apps": ["gv", "r"]}} separate_conf={"detector": {"op_env": "integtest"},"boot": { "use_connectivity_service": use_connectivity_service}, "listrev": {"apps": ["g", "r", "v"]}} +multigen_conf={"detector": {"op_env": "integtest"},"boot": { "use_connectivity_service": use_connectivity_service}, "listrev": {"apps": ["g", "g", "g", "rr", "v"]}} confgen_arguments={"Single App": single_app_conf, "Separate Verifier": v_conf, "Separate Generator": g_conf, "Separate Reverser": r_conf, - "Independent Apps": separate_conf} + "Independent Apps": separate_conf, + "Multiple Generators": multigen_conf} # The commands to run in nanorc, as a list nanorc_command_list="integtest-partition boot conf".split() nanorc_command_list+="start_run --disable-data-storage 101 wait ".split() + [str(run_duration)] + "stop_run wait 2".split() @@ -78,38 +81,38 @@ def test_log_files(run_nanorc): # Check that there are no warnings or errors in the log files assert log_file_checks.logs_are_error_free(run_nanorc.log_files) - # Exiting the do_work() method, generated 31 lists and successfully sent 62 copies. DAQModule: rdlg + # Exiting do_stop() method, generated 2081 lists, and sent 2081 list messages DAQModule: rdlg0 generator_generated = 0 generator_sent = 0 - # Exiting do_work() method, received 31 lists and successfully sent 31. DAQModule: lr + # Exiting do_stop() method, received 2081 request messages, sent 2081, received 2081 lists, and sent 2081 reversed list messages DAQModule: lr0 reverser_received = 0 reverser_sent = 0 - # Exiting do_work() method, received 31 reversed lists, compared 31 of them to their original data, and found 0 mismatches. DAQModule: lrv + # Exiting do_stop() method, received 2081 reversed list messages, compared 2081 reversed lists to their original data, and found 0 mismatches. DAQModule: lrv validator_received = 0 validator_received_reversed = 0 validator_errors = 999 for idx in range(len(run_nanorc.log_files)): for line in open(run_nanorc.log_files[idx]).readlines(): - if "Exiting" in line: - if "generated" in line: - m = re.search("generated ([0-9]+) lists and successfully sent ([0-9]+) copies",line) + if "Exiting do_stop" in line: + if "RandomDataListGenerator" in line: + m = re.search("generated ([0-9]+) lists, and sent ([0-9]+) list messages",line) generator_generated = int(m.group(1)) generator_sent = int(m.group(2)) if "ListReverser" in line: - m = re.search("received ([0-9]+) lists and successfully sent ([0-9]+).",line) - reverser_received = int(m.group(1)) - reverser_sent = int(m.group(2)) - if "mismatches" in line: - m = re.search("received ([0-9]+) reversed lists, compared ([0-9]+) of them to their original data, and found ([0-9]+) mismatches.",line) + m = re.search("received ([0-9]+) request messages, sent ([0-9]+), received ([0-9]+) lists, and sent ([0-9]+) reversed list messages",line) + reverser_received = int(m.group(3)) + reverser_sent = int(m.group(4)) + if "ReversedListValidator" in line: + m = re.search("received ([0-9]+) reversed list messages, compared ([0-9]+) reversed lists to their original data, and found ([0-9]+) mismatches.",line) validator_received = int(m.group(2)) validator_received_reversed = int(m.group(1)) validator_errors = int(m.group(3)) - assert generator_generated >= expected_event_count - assert generator_sent >= expected_event_count * 2 - assert reverser_received >= expected_event_count - assert reverser_sent >= expected_event_count - assert validator_received >= expected_event_count - assert validator_received_reversed >= expected_event_count + assert generator_generated >= expected_event_count - expected_event_count_tolerance + assert generator_sent >= expected_event_count - expected_event_count_tolerance + assert reverser_received >= expected_event_count - expected_event_count_tolerance # times num generators? + assert reverser_sent >= expected_event_count - expected_event_count_tolerance + assert validator_received >= expected_event_count - expected_event_count_tolerance # times num generators? + assert validator_received_reversed >= expected_event_count - expected_event_count_tolerance assert validator_errors == 0 diff --git a/plugins/ListReverser.cpp b/plugins/ListReverser.cpp index 11dce07..f52d588 100644 --- a/plugins/ListReverser.cpp +++ b/plugins/ListReverser.cpp @@ -7,8 +7,11 @@ * received with this code. */ -#include "ListReverser.hpp" +#include "listrev/listreverser/Nljs.hpp" +#include "listrev/listreverserinfo/InfoNljs.hpp" + #include "CommonIssues.hpp" +#include "ListReverser.hpp" #include "appfwk/DAQModuleHelper.hpp" #include "iomanager/IOManager.hpp" @@ -25,44 +28,85 @@ #define TRACE_NAME "ListReverser" // NOLINT #define TLVL_ENTER_EXIT_METHODS 10 #define TLVL_LIST_REVERSAL 15 +#define TLVL_REQUEST_SENDING 16 +#define TLVL_CONFIGURE 17 namespace dunedaq { namespace listrev { ListReverser::ListReverser(const std::string& name) : DAQModule(name) - , thread_(std::bind(&ListReverser::do_work, this, std::placeholders::_1)) - , inputQueue_(nullptr) - , outputQueue_(nullptr) - , queueTimeout_(100) { - register_command("start", &ListReverser::do_start); - register_command("stop", &ListReverser::do_stop); + register_command("conf", &ListReverser::do_configure, std::set{ "INITIAL" }); + register_command("start", &ListReverser::do_start, std::set{ "CONFIGURED" }); + register_command("stop", &ListReverser::do_stop, std::set{ "TRIGGER_SOURCES_STOPPED" }); } void ListReverser::init(const nlohmann::json& iniobj) { TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method"; - auto qi = appfwk::connection_index(iniobj, { "input", "output" }); + auto qi = appfwk::connection_index(iniobj, { "request_input", "list_input" }); + m_requests = qi["request_input"]; + m_list_connection = qi["list_input"]; + try { - inputQueue_ = get_iom_receiver(qi["input"]); + get_iom_receiver(m_list_connection); } catch (const ers::Issue& excpt) { throw InvalidQueueFatalError(ERS_HERE, get_name(), "input", excpt); } try { - outputQueue_ = get_iom_sender(qi["output"]); + get_iom_receiver(m_requests); } catch (const ers::Issue& excpt) { throw InvalidQueueFatalError(ERS_HERE, get_name(), "output", excpt); } + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method"; } +void +ListReverser::get_info(opmonlib::InfoCollector& ci, int /*level*/) +{ + listreverserinfo::Info fcr; + + fcr.requests_received = m_requests_received.exchange(0); + fcr.requests_sent = m_requests_sent.exchange(0); + fcr.lists_received = m_lists_received.exchange(0); + fcr.lists_sent = m_lists_sent.exchange(0); + fcr.total_requests_received = m_total_requests_received.load(); + fcr.total_requests_sent = m_total_requests_sent.load(); + fcr.total_lists_received = m_total_lists_received.load(); + fcr.total_lists_sent = m_total_lists_sent.load(); + ci.add(fcr); +} + +void +ListReverser::do_configure(const nlohmann::json& obj) +{ + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_configure() method"; + auto parsed_conf = obj.get(); + m_send_timeout = std::chrono::milliseconds(parsed_conf.send_timeout_ms); + m_request_timeout = std::chrono::milliseconds(parsed_conf.request_timeout_ms); + m_num_generators = parsed_conf.num_generators; + m_reverser_id = parsed_conf.reverser_id; + + TLOG_DEBUG(TLVL_CONFIGURE) << "ListReverser " << m_reverser_id << " configured with " + << "send timeout " << parsed_conf.send_timeout_ms << " ms," + << " request timeout " << parsed_conf.request_timeout_ms << "ms, " + << " and num_generators " << parsed_conf.num_generators; + + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_configure() method"; +} + void ListReverser::do_start(const nlohmann::json& /*startobj*/) { TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method"; - thread_.start_working_thread(); + get_iomanager()->add_callback(m_list_connection, + std::bind(&ListReverser::process_list, this, std::placeholders::_1)); + get_iomanager()->add_callback( + m_requests, std::bind(&ListReverser::process_list_request, this, std::placeholders::_1)); + TLOG() << get_name() << " successfully started"; TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method"; } @@ -71,11 +115,46 @@ void ListReverser::do_stop(const nlohmann::json& /*stopobj*/) { TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method"; - thread_.stop_working_thread(); + get_iomanager()->remove_callback(m_requests); + get_iomanager()->remove_callback(m_list_connection); TLOG() << get_name() << " successfully stopped"; + + std::ostringstream oss_summ; + oss_summ << ": Exiting do_stop() method, received " << m_total_requests_received.load() << " request messages, " + << "sent " << m_total_requests_sent.load() << ", received " << m_total_lists_received.load() + << " lists, and sent " << m_total_lists_sent.load() << " reversed list messages"; + ers::info(ProgressUpdate(ERS_HERE, get_name(), oss_summ.str())); + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method"; } +void +ListReverser::process_list_request(const RequestList& request) +{ + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering process_list_request() method"; + { + std::lock_guard lk(m_map_mutex); + if (!m_pending_lists.count(request.list_id)) { + m_pending_lists[request.list_id] = PendingList(request.destination, request.list_id, m_reverser_id); + ++m_requests_received; + ++m_total_requests_received; + } + } + + for (size_t gen_idx = 0; gen_idx < m_num_generators; ++gen_idx) { + TLOG_DEBUG(TLVL_REQUEST_SENDING) << "Sending request for " << request.list_id << " with destination " + << m_list_connection << " to rdlg" << gen_idx + << "_request_connection (num_generators=" << m_num_generators << ")"; + RequestList req(request.list_id, m_list_connection); + get_iomanager() + ->get_sender("rdlg" + std::to_string(gen_idx) + "_request_connection") + ->send(std::move(req), m_send_timeout); + ++m_requests_sent; + ++m_total_requests_sent; + } + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting process_list_request() method"; +} + /** * @brief Format a std::vector to a stream * @param t ostream Instance @@ -97,59 +176,66 @@ operator<<(std::ostream& t, std::vector ints) } void -ListReverser::do_work(std::atomic& running_flag) +ListReverser::process_list(const IntList& list) { - TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_work() method"; - int receivedCount = 0; - int sentCount = 0; - std::vector workingVector; - - while (running_flag.load()) { - TLOG_DEBUG(TLVL_LIST_REVERSAL) << get_name() << ": Going to receive data from input queue"; - try { - workingVector = inputQueue_->receive(queueTimeout_).list; - } catch (const dunedaq::iomanager::TimeoutExpired& excpt) { - // it is perfectly reasonable that there might be no data in the queue - // some fraction of the times that we check, so we just continue on and try again - continue; - } + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering process_list() method"; - ++receivedCount; - TLOG_DEBUG(TLVL_LIST_REVERSAL) << get_name() << ": Received list #" << receivedCount << ". It has size " - << workingVector.size() << ". Reversing its contents"; - std::reverse(workingVector.begin(), workingVector.end()); + std::lock_guard lk(m_map_mutex); + ++m_lists_received; + ++m_total_lists_received; + TLOG_DEBUG(TLVL_LIST_REVERSAL) << get_name() << ": Received list #" << list.list_id << " from " << list.generator_id + << ". It has size " << list.list.size() << ". Reversing its contents"; - std::ostringstream oss_prog; - oss_prog << "Reversed list #" << receivedCount << ", new contents " << workingVector << " and size " - << workingVector.size() << ". "; - ers::debug(ProgressUpdate(ERS_HERE, get_name(), oss_prog.str())); + if (m_pending_lists.count(list.list_id) == 0) { + + std::ostringstream oss_warn; + oss_warn << "send " << list.list_id << " to \"" << m_pending_lists[list.list_id].requestor + << "\" (late list receive)"; + ers::warning(dunedaq::iomanager::TimeoutExpired(ERS_HERE, get_name(), oss_warn.str(), m_send_timeout.count())); + return; + } + + auto workingVector = list.list; + std::reverse(workingVector.begin(), workingVector.end()); + IntList wrapped(list.list_id, m_reverser_id, workingVector); + + ReversedList::Data this_data; + this_data.original = list; + this_data.reversed = wrapped; + + m_pending_lists[list.list_id].list.lists.push_back(this_data); + + std::ostringstream oss_prog; + oss_prog << "Reversed list #" << list.list_id << " from " << list.generator_id << ", new contents " << workingVector + << " and size " << workingVector.size() << ". "; + ers::debug(ProgressUpdate(ERS_HERE, get_name(), oss_prog.str())); + + if (m_pending_lists[list.list_id].list.lists.size() >= m_num_generators || + std::chrono::duration_cast( + std::chrono::steady_clock::now() - m_pending_lists[list.list_id].start_time) > m_request_timeout) { bool successfullyWasSent = false; - while (!successfullyWasSent && running_flag.load()) { - TLOG_DEBUG(TLVL_LIST_REVERSAL) << get_name() << ": Pushing the reversed list onto the output queue"; + int failCount = 0; + while (!successfullyWasSent && failCount < 100) { + TLOG_DEBUG(TLVL_LIST_REVERSAL) << get_name() << ": Sending the reversed lists " << list.list_id; try { - IntList wrapped(workingVector); - outputQueue_->send(std::move(wrapped), queueTimeout_); + get_iomanager() + ->get_sender(m_pending_lists[list.list_id].requestor) + ->send(std::move(m_pending_lists[list.list_id].list), m_send_timeout); successfullyWasSent = true; - ++sentCount; + ++m_lists_sent; + ++m_total_lists_sent; + m_pending_lists.erase(list.list_id); } catch (const dunedaq::iomanager::TimeoutExpired& excpt) { std::ostringstream oss_warn; - oss_warn << "push to output queue \"" << outputQueue_->get_name() << "\""; - ers::warning(dunedaq::iomanager::TimeoutExpired( - ERS_HERE, - get_name(), - oss_warn.str(), - std::chrono::duration_cast(queueTimeout_).count())); + oss_warn << "send " << list.list_id << " to \"" << m_pending_lists[list.list_id].requestor << "\""; + ers::warning(dunedaq::iomanager::TimeoutExpired(ERS_HERE, get_name(), oss_warn.str(), m_send_timeout.count())); + ++failCount; } } - TLOG_DEBUG(TLVL_LIST_REVERSAL) << get_name() << ": End of do_work loop"; } - std::ostringstream oss_summ; - oss_summ << ": Exiting do_work() method, received " << receivedCount << " lists and successfully sent " << sentCount - << ". "; - ers::info(ProgressUpdate(ERS_HERE, get_name(), oss_summ.str())); - TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_work() method"; + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting process_list() method"; } } // namespace listrev diff --git a/plugins/ListReverser.hpp b/plugins/ListReverser.hpp index bdd66d6..8ae3ce9 100644 --- a/plugins/ListReverser.hpp +++ b/plugins/ListReverser.hpp @@ -14,6 +14,7 @@ #define LISTREV_PLUGINS_LISTREVERSER_HPP_ #include "ListWrapper.hpp" +#include "ListStorage.hpp" #include "appfwk/DAQModule.hpp" #include "iomanager/Receiver.hpp" @@ -23,6 +24,7 @@ #include #include +#include #include #include @@ -48,22 +50,56 @@ class ListReverser : public dunedaq::appfwk::DAQModule ListReverser& operator=(ListReverser&&) = delete; ///< ListReverser is not move-assignable void init(const nlohmann::json& iniobj) override; + void get_info(opmonlib::InfoCollector& ci, int level) override; private: // Commands + void do_configure(const nlohmann::json& obj); void do_start(const nlohmann::json& obj); void do_stop(const nlohmann::json& obj); - // Threading - dunedaq::utilities::WorkerThread thread_; - void do_work(std::atomic&); + // Callbacks + void process_list_request(const RequestList& request); + void process_list(const IntList& list); + + // Data + struct PendingList + { + std::string requestor; + std::chrono::steady_clock::time_point start_time; + ReversedList list; + + PendingList() = default; + explicit PendingList(std::string req, int list_id, int rev_id) + : requestor(req) + , start_time(std::chrono::steady_clock::now()) + { + list.list_id = list_id; + list.reverser_id = rev_id; + } + }; + std::map m_pending_lists; + mutable std::mutex m_map_mutex; + + // Init + std::string m_requests; + std::string m_list_connection; // Configuration - using source_t = dunedaq::iomanager::ReceiverConcept; - std::shared_ptr inputQueue_; - using sink_t = dunedaq::iomanager::SenderConcept; - std::shared_ptr outputQueue_; - std::chrono::milliseconds queueTimeout_; + std::chrono::milliseconds m_send_timeout{ 100 }; + std::chrono::milliseconds m_request_timeout{ 1000 }; + size_t m_reverser_id{ 0 }; + size_t m_num_generators{ 0 }; + + // Monitoring + std::atomic m_requests_received{ 0 }; + std::atomic m_requests_sent{ 0 }; + std::atomic m_lists_received{ 0 }; + std::atomic m_lists_sent{ 0 }; + std::atomic m_total_requests_received{ 0 }; + std::atomic m_total_requests_sent{ 0 }; + std::atomic m_total_lists_received{ 0 }; + std::atomic m_total_lists_sent{ 0 }; }; } // namespace listrev } // namespace dunedaq diff --git a/plugins/ListWrapper.hpp b/plugins/ListWrapper.hpp deleted file mode 100755 index 6fd0594..0000000 --- a/plugins/ListWrapper.hpp +++ /dev/null @@ -1,36 +0,0 @@ -/** - * @file ListWrapper.hpp - * - * ListWrapper wraps a std::vector so that it can be transmitted over the network using the Unified Communications - * API (iomanager) - * - * This is part of the DUNE DAQ Software Suite, copyright 2020. - * Licensing/copyright details are in the COPYING file that you should have - * received with this code. - */ - -#ifndef LISTREV_PLUGINS_LISTWRAPPER_HPP_ -#define LISTREV_PLUGINS_LISTWRAPPER_HPP_ - -#include "serialization/Serialization.hpp" - -#include - -namespace dunedaq { -namespace listrev { -struct IntList -{ - std::vector list; - - IntList() = default; - explicit IntList(std::vector const& l) - : list(l.begin(), l.end()) - {} - - DUNE_DAQ_SERIALIZE(IntList, list); -}; -} // namespace listrev -DUNE_DAQ_SERIALIZABLE(listrev::IntList, "IntList"); -} // namespace dunedaq - -#endif // LISTREV_PLUGINS_LISTWRAPPER_HPP_ diff --git a/plugins/RandomDataListGenerator.cpp b/plugins/RandomDataListGenerator.cpp index dcf270c..3036dd4 100644 --- a/plugins/RandomDataListGenerator.cpp +++ b/plugins/RandomDataListGenerator.cpp @@ -13,6 +13,7 @@ #include "CommonIssues.hpp" #include "RandomDataListGenerator.hpp" +#include "appfwk/DAQModuleHelper.hpp" #include "appfwk/app/Nljs.hpp" #include "iomanager/IOManager.hpp" @@ -37,13 +38,10 @@ namespace listrev { RandomDataListGenerator::RandomDataListGenerator(const std::string& name) : dunedaq::appfwk::DAQModule(name) - , thread_(std::bind(&RandomDataListGenerator::do_work, this, std::placeholders::_1)) - , outputQueues_() - , queueTimeout_(100) { register_command("conf", &RandomDataListGenerator::do_configure, std::set{ "INITIAL" }); register_command("start", &RandomDataListGenerator::do_start, std::set{ "CONFIGURED" }); - register_command("stop", &RandomDataListGenerator::do_stop, std::set{ "TRIGGER_SOURCES_STOPPED"}); + register_command("stop", &RandomDataListGenerator::do_stop, std::set{ "TRIGGER_SOURCES_STOPPED" }); register_command("scrap", &RandomDataListGenerator::do_unconfigure, std::set{ "CONFIGURED" }); register_command("hello", &RandomDataListGenerator::do_hello, std::set{ "RUNNING", "READY" }); } @@ -52,14 +50,16 @@ void RandomDataListGenerator::init(const nlohmann::json& init_data) { TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method"; - auto ini = init_data.get(); - for (const auto& cr : ini.conn_refs) { - try { - outputQueues_.emplace_back(get_iom_sender(cr.uid)); - } catch (const ers::Issue& excpt) { - throw InvalidQueueFatalError(ERS_HERE, get_name(), cr.name, excpt); - } - } + auto mandatory_connections = appfwk::connection_index(init_data, { "request_input", "create_input" }); + + m_request_connection = mandatory_connections["request_input"]; + m_create_connection = mandatory_connections["create_input"]; + + // these are just tests to check if the connections are ok + auto iom = iomanager::IOManager::get(); + iom->get_receiver(m_request_connection); + iom->get_receiver(m_create_connection); + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method"; } void @@ -69,14 +69,20 @@ RandomDataListGenerator::get_info(opmonlib::InfoCollector& ci, int /*level*/) fcr.generated_numbers = m_generated_tot.load(); fcr.new_generated_numbers = m_generated.exchange(0); + fcr.sent_lists = m_sent_tot.load(); + fcr.new_sent_lists = m_sent.exchange(0); ci.add(fcr); } void -RandomDataListGenerator::do_configure(const nlohmann::json& obj) +RandomDataListGenerator::do_configure(const nlohmann::json& payload) { TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_configure() method"; - cfg_ = obj.get(); + auto parsed_conf = payload.get(); + m_send_timeout = std::chrono::milliseconds(parsed_conf.send_timeout_ms); + m_request_timeout = std::chrono::milliseconds(parsed_conf.request_timeout_ms); + m_generator_id = parsed_conf.generator_id; + m_list_mode = static_cast(m_generator_id % (static_cast(ListMode::MAX) + 1)); TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_configure() method"; } @@ -84,7 +90,13 @@ void RandomDataListGenerator::do_start(const nlohmann::json& /*args*/) { TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method"; - thread_.start_working_thread(); + + auto iom = iomanager::IOManager::get(); + iom->add_callback( + m_request_connection, std::bind(&RandomDataListGenerator::process_request_list, this, std::placeholders::_1)); + iom->add_callback(m_create_connection, + std::bind(&RandomDataListGenerator::process_create_list, this, std::placeholders::_1)); + TLOG() << get_name() << " successfully started"; TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method"; } @@ -93,8 +105,20 @@ void RandomDataListGenerator::do_stop(const nlohmann::json& /*args*/) { TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method"; - thread_.stop_working_thread(); + + auto iom = iomanager::IOManager::get(); + iom->remove_callback(m_request_connection); + iom->remove_callback(m_create_connection); + m_storage.flush(); + TLOG() << get_name() << " successfully stopped"; + + std::ostringstream oss_summ; + oss_summ << ": Exiting do_stop() method, " + << "generated " << m_generated_tot.load() << " lists, " + << "and sent " << m_sent_tot.load() << " list messages"; + ers::info(ProgressUpdate(ERS_HERE, get_name(), oss_summ.str())); + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method"; } @@ -102,7 +126,7 @@ void RandomDataListGenerator::do_unconfigure(const nlohmann::json& /*args*/) { TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_unconfigure() method"; - cfg_ = randomdatalistgenerator::ConfParams{}; // reset to defaults + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_unconfigure() method"; } @@ -135,64 +159,87 @@ operator<<(std::ostream& t, std::vector ints) } void -RandomDataListGenerator::do_work(std::atomic& running_flag) +RandomDataListGenerator::process_create_list(const CreateList& create_request) { - TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_work() method"; - // size_t generatedCount = 0; - size_t sentCount = 0; - m_generated_tot = 0; - m_generated = 0; - while (running_flag.load()) { - TLOG_DEBUG(TLVL_LIST_GENERATION) << get_name() << ": Creating list of length " << cfg_.nIntsPerList; - std::vector theList(cfg_.nIntsPerList); - - TLOG_DEBUG(TLVL_LIST_GENERATION) << get_name() << ": Start of fill loop"; - for (size_t idx = 0; idx < cfg_.nIntsPerList; ++idx) { - theList[idx] = (rand() % 1000) + 1; - } - ++m_generated_tot; - ++m_generated; - std::ostringstream oss_prog; - oss_prog << "Generated list #" << m_generated_tot.load() << " with contents " << theList << " and size " - << theList.size() << ". "; - ers::debug(ProgressUpdate(ERS_HERE, get_name(), oss_prog.str())); - - TLOG_DEBUG(TLVL_LIST_GENERATION) << get_name() << ": Pushing list onto " << outputQueues_.size() << " outputQueues"; - for (auto& outQueue : outputQueues_) { - std::string thisQueueName = outQueue->get_name(); - bool successfullyWasSent = false; - while (!successfullyWasSent && running_flag.load()) { - TLOG_DEBUG(TLVL_LIST_GENERATION) << get_name() << ": Pushing the generated list onto queue " << thisQueueName; - try { - IntList wrapped(theList); - outQueue->send(std::move(wrapped), queueTimeout_); - successfullyWasSent = true; - ++sentCount; - } catch (const dunedaq::iomanager::TimeoutExpired& excpt) { - std::ostringstream oss_warn; - oss_warn << "push to output queue \"" << thisQueueName << "\""; - ers::warning(dunedaq::iomanager::TimeoutExpired( - ERS_HERE, - get_name(), - oss_warn.str(), - std::chrono::duration_cast(queueTimeout_).count())); - } - } + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering process_create_list() method"; + std::vector theList(create_request.list_size); + + TLOG_DEBUG(TLVL_LIST_GENERATION) << get_name() << ": Start of fill loop"; + for (size_t idx = 0; idx < create_request.list_size; ++idx) { + switch (m_list_mode) { + case ListMode::Random: + theList[idx] = (rand() % 1000) + 1; + break; + case ListMode::Ascending: + theList[idx] = create_request.list_id + idx; + break; + case ListMode::Evens: + theList[idx] = (create_request.list_id % 2 == 0 ? 0 : 1) + create_request.list_id + idx * 2; + break; + case ListMode::Odds: + theList[idx] = (create_request.list_id % 2 == 0 ? 1 : 0) + create_request.list_id + idx * 2; + break; + case ListMode::Descending: + theList[idx] = create_request.list_id - idx; + break; } - if (outputQueues_.size() == 0) { - ers::warning(NoOutputQueuesAvailableWarning(ERS_HERE, get_name())); + } + ++m_generated_tot; + ++m_generated; + std::ostringstream oss_prog; + oss_prog << "Generated list #" << create_request.list_id << " with contents " << theList << " and size " + << theList.size() << ". "; + ers::debug(ProgressUpdate(ERS_HERE, get_name(), oss_prog.str())); + + m_storage.add_list(IntList(create_request.list_id, m_generator_id, theList)); + + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting process_create_list() method"; +} + +void +RandomDataListGenerator::process_request_list(const RequestList& request) +{ + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering process_request_list() method"; + auto start = std::chrono::steady_clock::now(); + IntList output; + bool list_found = false; + + while (std::chrono::duration_cast(std::chrono::steady_clock::now() - start) < + m_request_timeout) { + if (m_storage.has_list(request.list_id)) { + output = m_storage.get_list(request.list_id); + list_found = true; + break; } + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } - TLOG_DEBUG(TLVL_LIST_GENERATION) << get_name() << ": Start of sleep between sends"; - std::this_thread::sleep_for(std::chrono::milliseconds(cfg_.waitBetweenSendsMsec)); - TLOG_DEBUG(TLVL_LIST_GENERATION) << get_name() << ": End of do_work loop"; + if (!list_found) { + std::ostringstream oss_warn; + oss_warn << "wait for list \"" << request.list_id << "\""; + ers::warning(dunedaq::iomanager::TimeoutExpired( + ERS_HERE, + get_name(), + oss_warn.str(), + std::chrono::duration_cast(m_send_timeout).count())); + return; } - std::ostringstream oss_summ; - oss_summ << ": Exiting the do_work() method, generated " << m_generated_tot.load() << " lists and successfully sent " - << sentCount << " copies. "; - ers::info(ProgressUpdate(ERS_HERE, get_name(), oss_summ.str())); - TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_work() method"; + try { + dunedaq::get_iomanager()->get_sender(request.destination)->send(std::move(output), m_send_timeout); + + ++m_sent; + ++m_sent_tot; + } catch (const dunedaq::iomanager::TimeoutExpired& excpt) { + std::ostringstream oss_warn; + oss_warn << "send to destination \"" << request.destination << "\""; + ers::warning(dunedaq::iomanager::TimeoutExpired( + ERS_HERE, + get_name(), + oss_warn.str(), + std::chrono::duration_cast(m_send_timeout).count())); + } + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting process_request_list() method"; } } // namespace listrev diff --git a/plugins/RandomDataListGenerator.hpp b/plugins/RandomDataListGenerator.hpp index f67a831..16dcb43 100644 --- a/plugins/RandomDataListGenerator.hpp +++ b/plugins/RandomDataListGenerator.hpp @@ -13,6 +13,7 @@ #define LISTREV_PLUGINS_RANDOMDATALISTGENERATOR_HPP_ #include "ListWrapper.hpp" +#include "ListStorage.hpp" #include "listrev/randomdatalistgenerator/Structs.hpp" @@ -61,32 +62,41 @@ class RandomDataListGenerator : public dunedaq::appfwk::DAQModule void do_unconfigure(const nlohmann::json& obj); void do_hello(const nlohmann::json& obj); - // Threading - dunedaq::utilities::WorkerThread thread_; - void do_work(std::atomic&); + // Callbacks + void process_create_list(const CreateList& create_request); + void process_request_list(const RequestList& request_list); + + // Init + std::string m_request_connection; + std::string m_create_connection; // Configuration - using sink_t = dunedaq::iomanager::SenderConcept; - std::vector> outputQueues_; - std::chrono::milliseconds queueTimeout_; - randomdatalistgenerator::ConfParams cfg_; - // Statistic counters + enum class ListMode : uint16_t + { + Random = 0, + Ascending = 1, + Evens = 2, + Odds = 3, + Descending = 4, + MAX = Descending, + }; + ListMode m_list_mode{ ListMode::Random }; + std::chrono::milliseconds m_send_timeout{ 100 }; + std::chrono::milliseconds m_request_timeout{ 100 }; + size_t m_generator_id{ 0 }; + + // Data + ListStorage m_storage; + + // Monitoring std::atomic m_generated{ 0 }; // NOLINT(build/unsigned) std::atomic m_generated_tot{ 0 }; // NOLINT(build/unsigned) + std::atomic m_sent{ 0 }; + std::atomic m_sent_tot {0}; }; } // namespace listrev -// Disable coverage collection LCOV_EXCL_START -ERS_DECLARE_ISSUE_BASE(listrev, - NoOutputQueuesAvailableWarning, - appfwk::GeneralDAQModuleIssue, - "No output queues were available, so the generated list of integers will be dropped. Has " - "initialization been successfully completed?", - ((std::string)name), - ERS_EMPTY) -// Re-enable coverage collection LCOV_EXCL_STOP - } // namespace dunedaq #endif // LISTREV_PLUGINS_RANDOMDATALISTGENERATOR_HPP_ diff --git a/plugins/ReversedListValidator.cpp b/plugins/ReversedListValidator.cpp index 911660f..ed6c9a8 100644 --- a/plugins/ReversedListValidator.cpp +++ b/plugins/ReversedListValidator.cpp @@ -7,6 +7,9 @@ * received with this code. */ +#include "listrev/reversedlistvalidator/Nljs.hpp" +#include "listrev/reversedlistvalidatorinfo/InfoNljs.hpp" + #include "ReversedListValidator.hpp" #include "CommonIssues.hpp" @@ -26,46 +29,78 @@ #define TRACE_NAME "ReversedListValidator" // NOLINT #define TLVL_ENTER_EXIT_METHODS 10 #define TLVL_LIST_VALIDATION 15 +#define TLVL_REQUEST_SENDING 16 +#define TLVL_PROCESS_LIST 17 namespace dunedaq { namespace listrev { ReversedListValidator::ReversedListValidator(const std::string& name) : DAQModule(name) - , thread_(std::bind(&ReversedListValidator::do_work, this, std::placeholders::_1)) - , reversedDataQueue_(nullptr) - , originalDataQueue_(nullptr) - , queueTimeout_(100) + , m_work_thread(std::bind(&ReversedListValidator::do_work, this, std::placeholders::_1)) { - register_command("start", &ReversedListValidator::do_start); - register_command("stop", &ReversedListValidator::do_stop); + register_command("conf", &ReversedListValidator::do_configure, std::set{ "INITIAL" }); + register_command("start", &ReversedListValidator::do_start, std::set{ "CONFIGURED" }); + register_command("stop", &ReversedListValidator::do_stop, std::set{ "TRIGGER_SOURCES_STOPPED" }); } void ReversedListValidator::init(const nlohmann::json& obj) { TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method"; - auto qi = appfwk::connection_index(obj, { "reversed_data_input", "original_data_input" }); - try { - reversedDataQueue_ = get_iom_receiver(qi["reversed_data_input"]); - } catch (const ers::Issue& excpt) { - throw InvalidQueueFatalError(ERS_HERE, get_name(), "reversed data input", excpt); - } + auto mandatory_connections = appfwk::connection_index(obj, { "list_input", "creates_out" }); - try { - originalDataQueue_ = get_iom_receiver(qi["original_data_input"]); - } catch (const ers::Issue& excpt) { - throw InvalidQueueFatalError(ERS_HERE, get_name(), "original data input", excpt); - } + m_list_connection = mandatory_connections["list_input"]; + m_create_connection = mandatory_connections["creates_out"]; + + // these are just tests to check if the connections are ok + auto iom = iomanager::IOManager::get(); + iom->get_receiver(m_list_connection); + iom->get_sender(m_create_connection); TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method"; } +void +ReversedListValidator::get_info(opmonlib::InfoCollector& ci, int /*level*/) +{ + reversedlistvalidatorinfo::Info fcr; + + fcr.requests_total = m_requests_total.load(); + fcr.new_requests = m_new_requests.exchange(0); + fcr.total_lists = m_total_lists.load(); + fcr.new_lists = m_new_lists.exchange(0); + fcr.total_valid_pairs = m_total_valid_pairs.load(); + fcr.valid_list_pairs = m_valid_list_pairs.exchange(0); + fcr.total_invalid_pairs = m_total_invalid_pairs.load(); + fcr.invalid_list_pairs = m_invalid_list_pairs.exchange(0); + ci.add(fcr); +} + +void +ReversedListValidator::do_configure(const nlohmann::json& obj) +{ + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_configure() method"; + auto parsed_conf = obj.get(); + m_send_timeout = std::chrono::milliseconds(parsed_conf.send_timeout_ms); + m_request_timeout = std::chrono::milliseconds(parsed_conf.request_timeout_ms); + m_max_outstanding_requests = parsed_conf.max_outstanding_requests; + m_num_generators = parsed_conf.num_generators; + m_num_reversers = parsed_conf.num_reversers; + m_list_creator = + ListCreator(m_create_connection, m_send_timeout, parsed_conf.min_list_size, parsed_conf.max_list_size); + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_configure() method"; +} + void ReversedListValidator::do_start(const nlohmann::json& /*args*/) { TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method"; - thread_.start_working_thread(); + m_next_id = 0; + m_work_thread.start_working_thread(); + get_iomanager()->add_callback( + m_list_connection, + std::bind(&ReversedListValidator::process_list, this, std::placeholders::_1)); TLOG() << get_name() << " successfully started"; TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method"; } @@ -74,8 +109,30 @@ void ReversedListValidator::do_stop(const nlohmann::json& /*args*/) { TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method"; - thread_.stop_working_thread(); + m_work_thread.stop_working_thread(); + + std::chrono::milliseconds stop_timeout(10000); + auto stop_wait = std::chrono::steady_clock::now(); + size_t outstanding_wait = 1; + while (outstanding_wait > 0 && std::chrono::duration_cast( + std::chrono::steady_clock::now() - stop_wait) < stop_timeout) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::lock_guard lk(m_outstanding_id_mutex); + outstanding_wait = m_outstanding_ids.size(); + } + + TLOG() << get_name() << " Removing callback, there are " << outstanding_wait << " requests left outstanding."; + + get_iomanager()->remove_callback(m_list_connection); TLOG() << get_name() << " successfully stopped"; + + + std::ostringstream oss_summ; + oss_summ << ": Exiting do_stop() method, received " << m_total_lists.load() << " reversed list messages, " + << "compared " << m_total_valid_pairs.load() + m_total_invalid_pairs.load() + << " reversed lists to their original data, and found " << m_total_invalid_pairs.load() << " mismatches. "; + ers::info(ProgressUpdate(ERS_HERE, get_name(), oss_summ.str())); + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method"; } @@ -103,75 +160,100 @@ void ReversedListValidator::do_work(std::atomic& running_flag) { TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_work() method"; - int reversedCount = 0; - int comparisonCount = 0; - int failureCount = 0; - std::vector reversedData; - std::vector originalData; + m_request_start = std::chrono::steady_clock::now(); while (running_flag.load()) { - TLOG_DEBUG(TLVL_LIST_VALIDATION) << get_name() << ": Going to receive data from the reversed list queue"; - try { - reversedData = reversedDataQueue_->receive(queueTimeout_).list; - } catch (const dunedaq::iomanager::TimeoutExpired& excpt) { - // it is perfectly reasonable that there might be no reversed data in the queue - // some fraction of the times that we check, so we just continue on and try again - continue; - } - ++reversedCount; - - TLOG_DEBUG(TLVL_LIST_VALIDATION) << get_name() << ": Received reversed list #" << reversedCount << ". It has size " - << reversedData.size() - << ". Now going to receive data from the original data queue."; - bool originalWasSuccessfullyReceived = false; - while (!originalWasSuccessfullyReceived && running_flag.load()) { - TLOG_DEBUG(TLVL_LIST_VALIDATION) << get_name() << ": Popping the next element off the original data queue"; - try { - originalData = originalDataQueue_->receive(queueTimeout_).list; - originalWasSuccessfullyReceived = true; - ++comparisonCount; - } catch (const dunedaq::iomanager::TimeoutExpired& excpt) { - std::ostringstream oss_warn; - oss_warn << "pop from original data queue"; - ers::warning(dunedaq::iomanager::TimeoutExpired( - ERS_HERE, - get_name(), - oss_warn.str(), - std::chrono::duration_cast(queueTimeout_).count())); - } - } + TLOG_DEBUG(TLVL_LIST_VALIDATION) << get_name() << ": Locking out id list"; + std::lock_guard lk(m_outstanding_id_mutex); - if (originalWasSuccessfullyReceived) { - std::ostringstream oss_prog; - oss_prog << "Validating list #" << reversedCount << ", original contents " << originalData - << " and reversed contents " << reversedData << ". "; - ers::debug(ProgressUpdate(ERS_HERE, get_name(), oss_prog.str())); - - TLOG_DEBUG(TLVL_LIST_VALIDATION) - << get_name() << ": Re-reversing the reversed list so that it can be compared to the original list"; - std::reverse(reversedData.begin(), reversedData.end()); - - TLOG_DEBUG(TLVL_LIST_VALIDATION) << get_name() << ": Comparing the doubly-reversed list with the original list"; - if (reversedData != originalData) { - std::ostringstream oss_rev; - oss_rev << reversedData; - std::ostringstream oss_orig; - oss_orig << originalData; - ers::error(DataMismatchError(ERS_HERE, get_name(), oss_rev.str(), oss_orig.str())); - ++failureCount; - } + TLOG_DEBUG(TLVL_LIST_VALIDATION) << get_name() << ": Sending new requests"; + auto next_req_time = [&]() { + auto ms = 1000.0 / m_request_rate_hz; + auto off = ms * m_next_id; + return m_request_start + std::chrono::milliseconds(static_cast(off)); + }; + + while (m_outstanding_ids.size() < m_max_outstanding_requests && std::chrono::steady_clock::now() > next_req_time()) { + m_list_creator.send_create(++m_next_id); + m_outstanding_ids[m_next_id] = std::chrono::steady_clock::now(); + send_request(m_next_id); + ++m_requests_total; + ++m_new_requests; } + TLOG_DEBUG(TLVL_LIST_VALIDATION) << get_name() << ": End of do_work loop"; } - std::ostringstream oss_summ; - oss_summ << ": Exiting do_work() method, received " << reversedCount << " reversed lists, " - << "compared " << comparisonCount << " of them to their original data, and found " << failureCount - << " mismatches. "; - ers::info(ProgressUpdate(ERS_HERE, get_name(), oss_summ.str())); TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_work() method"; } +void +ReversedListValidator::process_list(const ReversedList& list) +{ + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering process_list() method"; + + ++m_total_lists; + ++m_new_lists; + + std::ostringstream oss_prog; + oss_prog << "Validating list set #" << list.list_id << " from reverser " << list.reverser_id << ". "; + ers::debug(ProgressUpdate(ERS_HERE, get_name(), oss_prog.str())); + + if (list.lists.size() != m_num_generators) { + ers::error(MissingListError(ERS_HERE, get_name(), list.list_id, m_num_generators, list.lists.size())); + } + + for (auto& list_data : list.lists) { + + std::ostringstream oss_prog; + oss_prog << "Validating list #" << list.list_id << " from generator " << list_data.original.generator_id << ", original contents " << list_data.original.list + << " and reversed contents " << list_data.reversed.list << ". "; + ers::debug(ProgressUpdate(ERS_HERE, get_name(), oss_prog.str())); + + TLOG_DEBUG(TLVL_LIST_VALIDATION) + << get_name() << ": Re-reversing the reversed list so that it can be compared to the original list"; + auto reversed = list_data.reversed.list; + std::reverse(reversed.begin(), reversed.end()); + + TLOG_DEBUG(TLVL_LIST_VALIDATION) << get_name() << ": Comparing the doubly-reversed list with the original list"; + if (reversed != list_data.original.list) { + std::ostringstream oss_rev; + oss_rev << reversed; + std::ostringstream oss_orig; + oss_orig << list_data.original.list; + ers::error(DataMismatchError(ERS_HERE, get_name(), list.list_id, oss_rev.str(), oss_orig.str())); + ++m_invalid_list_pairs; + ++m_total_invalid_pairs; + } else { + ++m_valid_list_pairs; + ++m_total_valid_pairs; + } + } + + std::lock_guard lk(m_outstanding_id_mutex); + m_outstanding_ids.erase(list.list_id); + + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting process_list() method"; +} + +void +ReversedListValidator::send_request(int id) +{ + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering send_request() method"; + + auto reverser_id = id % m_num_reversers; + + RequestList req; + req.list_id = id; + req.destination = m_list_connection; + + get_iomanager() + ->get_sender("lr" + std::to_string(reverser_id) + "_request_connection") + ->send(std::move(req), m_send_timeout); + + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting send_request() method"; +} + } // namespace listrev } // namespace dunedaq diff --git a/plugins/ReversedListValidator.hpp b/plugins/ReversedListValidator.hpp index 49a34e0..37ffc85 100644 --- a/plugins/ReversedListValidator.hpp +++ b/plugins/ReversedListValidator.hpp @@ -15,9 +15,12 @@ #define LISTREV_PLUGINS_REVERSEDLISTVALIDATOR_HPP_ #include "ListWrapper.hpp" +#include "ListStorage.hpp" +#include "ListCreator.hpp" #include "appfwk/DAQModule.hpp" #include "iomanager/Receiver.hpp" +#include "iomanager/Sender.hpp" #include "utilities/WorkerThread.hpp" #include @@ -49,32 +52,72 @@ class ReversedListValidator : public dunedaq::appfwk::DAQModule ReversedListValidator& operator=(ReversedListValidator&&) = delete; ///< ReversedListValidator is not move-assignable void init(const nlohmann::json& obj) override; + void get_info(opmonlib::InfoCollector& ci, int level) override; private: // Commands + void do_configure(const nlohmann::json& obj); void do_start(const nlohmann::json& obj); void do_stop(const nlohmann::json& obj); // Threading - dunedaq::utilities::WorkerThread thread_; + dunedaq::utilities::WorkerThread m_work_thread; void do_work(std::atomic&); + // Callbacks + void process_list(const ReversedList& list); + + // Methods + void send_request(int id); + + // Data + std::map m_outstanding_ids; + int m_next_id{ 0 }; + std::chrono::steady_clock::time_point m_request_start; + mutable std::mutex m_outstanding_id_mutex; + ListCreator m_list_creator; + + // Init + std::string m_list_connection; + std::string m_create_connection; + // Configuration - using source_t = dunedaq::iomanager::ReceiverConcept; - std::shared_ptr reversedDataQueue_; - std::shared_ptr originalDataQueue_; - std::chrono::milliseconds queueTimeout_; + std::chrono::milliseconds m_send_timeout{ 100 }; + std::chrono::milliseconds m_request_timeout{ 1000 }; + size_t m_max_outstanding_requests{ 100 }; + size_t m_num_generators{ 0 }; + size_t m_num_reversers{ 0 }; + size_t m_request_rate_hz{ 100 }; + + // Monitoring + std::atomic m_requests_total{ 0 }; + std::atomic m_new_requests{ 0 }; + std::atomic m_total_lists{ 0 }; + std::atomic m_new_lists{ 0 }; + std::atomic m_total_reversed{ 0 }; + std::atomic m_new_reversed{ 0 }; + std::atomic m_total_valid_pairs{ 0 }; + std::atomic m_valid_list_pairs{ 0 }; + std::atomic m_total_invalid_pairs{ 0 }; + std::atomic m_invalid_list_pairs{ 0 }; }; } // namespace listrev // Disable coverage collection LCOV_EXCL_START +ERS_DECLARE_ISSUE_BASE(listrev, + MissingListError, + appfwk::GeneralDAQModuleIssue, + "Missing lists detected, for list set " << id << " expected " << n_gen << " lists, but received only " << n_lists, + ((std::string)name), + ((int)id)((int)n_gen)((int)n_lists)) + ERS_DECLARE_ISSUE_BASE(listrev, DataMismatchError, appfwk::GeneralDAQModuleIssue, - "Data mismatch when validating lists: doubly-reversed list contents = " + "Data mismatch when validating list" << id << ": doubly-reversed list contents = " << revContents << ", original list contents = " << origContents, ((std::string)name), - ((std::string)revContents)((std::string)origContents)) + ((int)id)((std::string)revContents)((std::string)origContents)) // Re-enable coverage collection LCOV_EXCL_STOP } // namespace dunedaq diff --git a/python/listrev/listrevapp_gen.py b/python/listrev/listrevapp_gen.py index 5517684..d261092 100644 --- a/python/listrev/listrevapp_gen.py +++ b/python/listrev/listrevapp_gen.py @@ -9,10 +9,14 @@ # Load configuration types import moo.otypes +moo.otypes.load_types("listrev/listreverser.jsonnet") moo.otypes.load_types("listrev/randomdatalistgenerator.jsonnet") +moo.otypes.load_types("listrev/reversedlistvalidator.jsonnet") # Import new types +import dunedaq.listrev.listreverser as lr import dunedaq.listrev.randomdatalistgenerator as rlg +import dunedaq.listrev.reversedlistvalidator as rlv from daqconf.core.app import App, ModuleGraph @@ -20,7 +24,21 @@ from daqconf.core.conf_utils import Endpoint, Direction # =============================================================================== -def get_listrev_app(nickname, host="localhost", n_ints=4, n_wait_ms=1000, gen_mode="s"): +def get_listrev_app( + nickname, + host="localhost", + n_wait_ms=100, + request_timeout_ms=1000, + request_rate_hz=10, + generator_indicies=[], + reverser_indicies=[], + has_validator=False, + n_generators=1, + n_reversers=1, + n_ints_min=50, + n_ints_max=200, + n_reqs=100, +): """ Here an entire application is generated. """ @@ -28,41 +46,81 @@ def get_listrev_app(nickname, host="localhost", n_ints=4, n_wait_ms=1000, gen_mo modules = [] - if gen_mode == "s" or "g" in gen_mode: - modules += [ + modules += [ DAQModule( - name="rdlg", + name=f"rdlg{gidx}", plugin="RandomDataListGenerator", - conf=rlg.ConfParams( - nIntsPerList=n_ints, waitBetweenSendsMsec=n_wait_ms - ), - ) + conf=rlg.ConfParams(send_timeout_ms=n_wait_ms, request_timeout_ms=request_timeout_ms, generator_id=gidx), + ) for gidx in generator_indicies ] - if gen_mode == "s" or "r" in gen_mode: - modules += [DAQModule(name="lr", plugin="ListReverser")] + modules += [ + DAQModule( + name=f"lr{ridx}", + plugin="ListReverser", + conf=lr.ConfParams( + send_timeout_ms=n_wait_ms, + request_timeout_ms=request_timeout_ms, + num_generators=n_generators, + reverser_id=ridx + ), + ) for ridx in reverser_indicies + ] - if gen_mode == "s" or "v" in gen_mode: - modules += [DAQModule(name="lrv", plugin="ReversedListValidator")] + if has_validator: + modules += [ + DAQModule( + name="lrv", + plugin="ReversedListValidator", + conf=rlv.ConfParams( + send_timeout_ms=n_wait_ms, + request_timeout_ms=request_timeout_ms, + request_rate_hz=request_rate_hz, + max_outstanding_requests=n_reqs, + num_reversers=n_reversers, + num_generators=n_generators, + min_list_size=n_ints_min, + max_list_size=n_ints_max + ), + ) + ] mgraph = ModuleGraph(modules) - if gen_mode == "s": - mgraph.connect_modules("rdlg.q1", "lrv.original_data_input", "IntList", "original", size_hint=10) - mgraph.connect_modules("rdlg.q2", "lr.input", "IntList", "to_reverse", size_hint=10) - mgraph.connect_modules("lr.output", "lrv.reversed_data_input", "IntList", "reversed", size_hint=10) - - if gen_mode != "s" and "g" in gen_mode: - mgraph.add_endpoint("original", "rdlg.q1", "IntList", Direction.OUT) - mgraph.add_endpoint("to_reverse", "rdlg.q2", "IntList", Direction.OUT) - - if gen_mode != "s" and "r" in gen_mode: - mgraph.add_endpoint("to_reverse", "lr.input", "IntList", Direction.IN) - mgraph.add_endpoint("reversed", "lr.output", "IntList", Direction.OUT) - - if gen_mode != "s" and "v" in gen_mode: - mgraph.add_endpoint("original", "lrv.original_data_input", "IntList", Direction.IN) - mgraph.add_endpoint("reversed", "lrv.reversed_data_input", "IntList", Direction.IN) + for gidx in generator_indicies: + for ridx in range(n_reversers): + mgraph.add_endpoint(f"lr{ridx}_list_connection", f"rdlg{gidx}.q{ridx}", "IntList", Direction.OUT) + mgraph.add_endpoint( + f"rdlg{gidx}_request_connection", + f"rdlg{gidx}.request_input", + "RequestList", + Direction.IN + ) + mgraph.add_endpoint(f"creates", f"rdlg{gidx}.create_input", "CreateList", Direction.IN, is_pubsub=True, toposort=False) + + for ridx in reverser_indicies: + mgraph.add_endpoint(f"lr{ridx}_list_connection", f"lr{ridx}.list_input", "IntList", Direction.IN) + mgraph.add_endpoint(f"validator_list_connection", f"lr{ridx}.output", "ReversedList", Direction.OUT) + mgraph.add_endpoint( + f"lr{ridx}_request_connection", + f"lr{ridx}.request_input", + "RequestList", + Direction.IN + ) + + for gidx in range(n_generators): + mgraph.add_endpoint(f"rdlg{gidx}_request_connection", f"lr{ridx}.request_output_{gidx}", "RequestList", Direction.OUT) + + if has_validator: + mgraph.add_endpoint("validator_list_connection", "lrv.list_input", "ReversedList", Direction.IN) + for ridx in range(n_reversers): + mgraph.add_endpoint( + f"lr{ridx}_request_connection", + f"lrv.request_output_{ridx}", + "RequestList", + Direction.OUT + ) + mgraph.add_endpoint(f"creates", "lrv.creates_out", "CreateList", Direction.OUT, is_pubsub=True, toposort=False) lr_app = App(modulegraph=mgraph, host=host, name=nickname) diff --git a/schema/listrev/confgen.jsonnet b/schema/listrev/confgen.jsonnet index 52c5450..ec624fd 100755 --- a/schema/listrev/confgen.jsonnet +++ b/schema/listrev/confgen.jsonnet @@ -19,17 +19,24 @@ local cs = { app: s.string ("app", doc="a string"), // !?!?! apps: s.sequence("apps", self.app, "some strings"), + listrevapp: s.record("listrevapp", [ + s.field('ints_per_list_min', self.number, default=50, doc='Minimum number of integers in the list'), + s.field('ints_per_list_max', self.number, default=200, doc='Maximum number of integers in the list'), + s.field('max_requests', self.number, default=100, doc='Maximum number of requests in-flight'), + s.field('wait_ms', self.number, default=100, doc='Number of ms to wait while sending'), + s.field('request_timeout_ms', self.number, default=1000, doc='Number of ms to wait for requests to be fulfilled'), + s.field('request_rate_hz', self.number, default=10, doc='Target rate for requests, in Hz'), + ]), + listrev: s.record("listrev", [ + s.field('config', self.listrevapp, default=self.listrevapp, doc='Configuration of the listrev apps'), s.field('host_app', daqconf.host, default='localhost', doc='Host to run the listrev sw app on'), - s.field('ints_per_list', self.number, default=4, doc='Number of integers in the list'), - s.field('wait_ms', self.number, default=1000, doc='Number of ms to wait between list sends'), s.field('apps', self.apps, default=['s'], doc="Apps to generate: \"s\" for single-app ListRev, otherwise specify \"g\", \"r\", and \"v\". E.g.: [\"gv\",\"r\"]") ]), commtest: s.record("commtest", [ s.field('hosts', daqconf.hosts, default=['localhost', 'localhost'], doc='Hosts to run test programs on. First host will receive \"rv\" app, while others will have \"g\" apps'), - s.field('ints_per_list', self.number, default=4, doc='Number of integers in the list'), - s.field('wait_ms', self.number, default=1000, doc='Number of ms to wait between list sends'), + s.field('config', self.listrevapp,default=self.listrevapp, doc='Configuration of the listrev apps'), ]), listrev_gen: s.record('listrev_gen', [ diff --git a/schema/listrev/listreverser.jsonnet b/schema/listrev/listreverser.jsonnet new file mode 100644 index 0000000..2ce2638 --- /dev/null +++ b/schema/listrev/listreverser.jsonnet @@ -0,0 +1,18 @@ +local moo = import "moo.jsonnet"; +local ns = "dunedaq.listrev.listreverser"; +local s = moo.oschema.schema(ns); + +local types = { + count : s.number("Count", "i4", + doc="A count of not too many things"), + + conf: s.record("ConfParams", [ + s.field("send_timeout_ms", self.count, 100, doc="Milliseconds to wait while sending"), + s.field("request_timeout_ms", self.count, 1000, doc="Milliseconds to wait before giving up on a request"), + s.field("num_generators", self.count, 1, doc="Number of RandomDataListGenerator instances in the system"), + s.field("reverser_id", self.count, 0, doc="Index of this ListReverser instance"), + ], doc="ListReverser configuration"), + +}; + +moo.oschema.sort_select(types, ns) diff --git a/schema/listrev/listreverserinfo.jsonnet b/schema/listrev/listreverserinfo.jsonnet new file mode 100644 index 0000000..49bd41f --- /dev/null +++ b/schema/listrev/listreverserinfo.jsonnet @@ -0,0 +1,22 @@ +// This is an example of how to define a schema for operational monitoring + +local moo = import "moo.jsonnet"; +local s = moo.oschema.schema("dunedaq.listrev.listreverserinfo"); + +local info = { + uint8 : s.number("uint8", "u8", + doc="An unsigned of 8 bytes"), + + info: s.record("Info", [ + s.field("requests_received", self.uint8, 0, doc="Count of received requests"), + s.field("requests_sent", self.uint8, 0, doc="Count of sent requests"), + s.field("lists_received", self.uint8, 0, doc="Count of lists received"), + s.field("lists_sent", self.uint8, 0, doc="Count of sent lists"), + s.field("total_requests_received", self.uint8, 0, doc="Count of received requests"), + s.field("total_requests_sent", self.uint8, 0, doc="Count of sent requests"), + s.field("total_lists_received", self.uint8, 0, doc="Count of lists received"), + s.field("total_lists_sent", self.uint8, 0, doc="Count of sent lists"), + ], doc="List generator information information") +}; + +moo.oschema.sort_select(info) diff --git a/schema/listrev/randomdatalistgenerator.jsonnet b/schema/listrev/randomdatalistgenerator.jsonnet index 6f6f8a5..9a77d29 100644 --- a/schema/listrev/randomdatalistgenerator.jsonnet +++ b/schema/listrev/randomdatalistgenerator.jsonnet @@ -3,17 +3,13 @@ local ns = "dunedaq.listrev.randomdatalistgenerator"; local s = moo.oschema.schema(ns); local types = { - size: s.number("Size", "u8", - doc="A count of very many things"), - count : s.number("Count", "i4", doc="A count of not too many things"), conf: s.record("ConfParams", [ - s.field("nIntsPerList", self.size, 4, - doc="Number of numbers"), - s.field("waitBetweenSendsMsec", self.count, 1000, - doc="Millisecs to wait between sending"), + s.field("send_timeout_ms", self.count, 100, doc="Milliseconds to wait while sending"), + s.field("request_timeout_ms", self.count, 1000, doc="Milliseconds to wait before giving up on a request"), + s.field("generator_id", self.count, 0, doc="Index of this RandomDataListGenerator instance"), ], doc="RandomDataListGenerator configuration"), }; diff --git a/schema/listrev/randomdatalistgeneratorinfo.jsonnet b/schema/listrev/randomdatalistgeneratorinfo.jsonnet index 0d25667..8e26d4b 100644 --- a/schema/listrev/randomdatalistgeneratorinfo.jsonnet +++ b/schema/listrev/randomdatalistgeneratorinfo.jsonnet @@ -10,6 +10,8 @@ local info = { info: s.record("Info", [ s.field("generated_numbers", self.uint8, 0, doc="Counting generated numbers"), s.field("new_generated_numbers", self.uint8, 0, doc="Counting incrementally generated numbers"), + s.field("sent_lists", self.uint8, 0, doc="Counting sent numbers"), + s.field("new_sent_lists", self.uint8, 0, doc="Counting incrementally sent numbers"), ], doc="List generator information information") }; diff --git a/schema/listrev/reversedlistvalidator.jsonnet b/schema/listrev/reversedlistvalidator.jsonnet new file mode 100644 index 0000000..26cb2dc --- /dev/null +++ b/schema/listrev/reversedlistvalidator.jsonnet @@ -0,0 +1,22 @@ +local moo = import "moo.jsonnet"; +local ns = "dunedaq.listrev.reversedlistvalidator"; +local s = moo.oschema.schema(ns); + +local types = { + count : s.number("Count", "i4", + doc="A count of not too many things"), + + conf: s.record("ConfParams", [ + s.field("send_timeout_ms", self.count, 100, doc="Milliseconds to wait while sending"), + s.field("request_timeout_ms", self.count, 1000, doc="Milliseconds to wait before giving up on a request"), + s.field("request_rate_hz", self.count, 10, doc="Target request rate, in Hz"), + s.field("max_outstanding_requests", self.count, 100, doc="Number of requests to handle at one time"), + s.field("num_reversers", self.count, 1, doc="Number of ListReverser instances in the system"), + s.field("num_generators", self.count, 1, doc="Number of RandomDataListGenerator instances in the system"), + s.field("min_list_size", self.count, 50, doc="Minimum size of created lists"), + s.field("max_list_size", self.count, 200, doc="Maximum size of created lists"), + ], doc="ReversedListValidator configuration"), + +}; + +moo.oschema.sort_select(types, ns) diff --git a/schema/listrev/reversedlistvalidatorinfo.jsonnet b/schema/listrev/reversedlistvalidatorinfo.jsonnet new file mode 100644 index 0000000..1095f4b --- /dev/null +++ b/schema/listrev/reversedlistvalidatorinfo.jsonnet @@ -0,0 +1,22 @@ +// This is an example of how to define a schema for operational monitoring + +local moo = import "moo.jsonnet"; +local s = moo.oschema.schema("dunedaq.listrev.reversedlistvalidatorinfo"); + +local info = { + uint8 : s.number("uint8", "u8", + doc="An unsigned of 8 bytes"), + + info: s.record("Info", [ + s.field("requests_total", self.uint8, 0, doc="Count of all requests"), + s.field("new_requests", self.uint8, 0, doc="Count of newly-generated requests"), + s.field("total_lists", self.uint8, 0, doc="Count of all lists received"), + s.field("new_lists", self.uint8, 0, doc="Count of new lists received"), + s.field("total_valid_pairs", self.uint8, 0, doc="Count of all valid list pairs"), + s.field("valid_list_pairs", self.uint8, 0, doc="Count of valid list pairs"), + s.field("total_invalid_pairs", self.uint8, 0, doc="Count of all invalid list pairs"), + s.field("invalid_list_pairs", self.uint8, 0, doc="Count of invalid list pairs"), + ], doc="List generator information information") +}; + +moo.oschema.sort_select(info) diff --git a/scripts/commtest_gen b/scripts/commtest_gen index 2f7d501..5c1f1f2 100755 --- a/scripts/commtest_gen +++ b/scripts/commtest_gen @@ -56,17 +56,24 @@ def cli(config, debug, json_dir): the_system.apps["listrev-app-rv-" + host] = listrevapp_gen.get_listrev_app( "listrev-app-rv-" + host, host, - n_ints = commtest.ints_per_list, - n_wait_ms = commtest.wait_ms, - gen_mode = "rv" + n_ints_min = commtest.config['ints_per_list_min'], + n_ints_max = commtest.config['ints_per_list_max'], + n_wait_ms = commtest.config['wait_ms'], + request_timeout_ms = commtest.config['request_timeout_ms'], + request_rate_hz = commtest.config['request_rate_hz'], + has_validator=True, + reverser_indicies=[0], + n_generators=len(commtest.hosts)-1, + n_reversers=1 ) else: the_system.apps[f"listrev-app-g-{hostidx}-{host}"] = listrevapp_gen.get_listrev_app( f"listrev-app-g-{hostidx}-{host}", host, - n_ints = commtest.ints_per_list, - n_wait_ms = commtest.wait_ms, - gen_mode="g" + n_wait_ms = commtest.config['wait_ms'], + request_timeout_ms = commtest.config['request_timeout_ms'], + generator_indicies=[hostidx - 1], + n_reversers=1 ) #################################################################### diff --git a/scripts/listrev_gen b/scripts/listrev_gen index 0790e63..5596883 100755 --- a/scripts/listrev_gen +++ b/scripts/listrev_gen @@ -46,13 +46,17 @@ def cli(config, debug, json_dir): if debug: console.log(f"listrev configuration object: {listrev.pod()}") # Validate apps - apps_check = "".join(listrev.apps) - if "s" in apps_check and not apps_check == "s": - raise RuntimeError(f"App spec {apps} contains an \"s\" entry, which must be alone!") + parsed_apps = [ a.replace("s", "grv") for a in listrev.apps ] + apps_check = "".join(parsed_apps) - if "s" not in apps_check and (len(apps_check) != 3 or "g" not in apps_check or "r" not in apps_check or "v" not in apps_check): + if ("g" not in apps_check or "r" not in apps_check or "v" not in apps_check): raise RuntimeError(f"App spec {apps} is not \"s\", so must contain one each of \"g\", \"r\", and \"v\"") + n_generators = apps_check.count("g") + n_reversers = apps_check.count("r") + + if apps_check.count("v") > 1: + raise RuntimeError(f"App spec {apps} contains more than one Validator!") console.log('Loading listrevapp config generator') from listrev import listrevapp_gen @@ -61,14 +65,32 @@ def cli(config, debug, json_dir): # add app - for appspec in listrev.apps: - the_system.apps["listrev-app-"+appspec] = listrevapp_gen.get_listrev_app( - nickname="listrev-app-"+appspec, + appidx=0 + generator_count=0 + reverser_count=0 + for appspec in parsed_apps: + generators=[generator_count+gg for gg in range(appspec.count("g"))] + generator_count += appspec.count("g") + reversers=[reverser_count+rr for rr in range(appspec.count("r"))] + reverser_count+= appspec.count("r") + has_validator=appspec.count("v") == 1 + + the_system.apps[f"listrev-app-{appspec}-{appidx}"] = listrevapp_gen.get_listrev_app( + nickname=f"listrev-app-{appspec}-{appidx}", host=listrev.host_app, - n_ints=listrev.ints_per_list, - n_wait_ms=listrev.wait_ms, - gen_mode=appspec + n_wait_ms=listrev.config['wait_ms'], + request_timeout_ms=listrev.config['request_timeout_ms'], + request_rate_hz=listrev.config['request_rate_hz'], + generator_indicies=generators, + reverser_indicies=reversers, + has_validator=has_validator, + n_generators=n_generators, + n_reversers=n_reversers, + n_ints_min=listrev.config['ints_per_list_min'], + n_ints_max=listrev.config['ints_per_list_max'], + n_reqs=listrev.config['max_requests'] ) + appidx+=1 #################################################################### # Application command data generation diff --git a/plugins/CommonIssues.hpp b/src/CommonIssues.hpp similarity index 76% rename from plugins/CommonIssues.hpp rename to src/CommonIssues.hpp index 57baf9d..3f807a7 100644 --- a/plugins/CommonIssues.hpp +++ b/src/CommonIssues.hpp @@ -33,6 +33,15 @@ ERS_DECLARE_ISSUE_BASE(listrev, "The " << queueType << " queue was not successfully created.", ((std::string)name), ((std::string)queueType)) + +ERS_DECLARE_ISSUE(listrev, + ListNotFound, + "An IntList with ID " << list_id << " was not found when requested.", + ((int)list_id)) +ERS_DECLARE_ISSUE(listrev, + ListExists, + "An IntList with ID " << list_id << " already is in storage.", + ((int)list_id)) // Re-enable coverage collection LCOV_EXCL_STOP } // namespace dunedaq diff --git a/src/ListCreator.cpp b/src/ListCreator.cpp new file mode 100755 index 0000000..adda45f --- /dev/null +++ b/src/ListCreator.cpp @@ -0,0 +1,43 @@ +/** + * @file ListCreator.cpp + * + * Helper methods for sending CreateList requests (implementation) + * + * This is part of the DUNE DAQ Software Suite, copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ + +#include "ListCreator.hpp" + +#include "iomanager/IOManager.hpp" +#include "iomanager/Sender.hpp" + +dunedaq::listrev::ListCreator::ListCreator(std::string conn, + std::chrono::milliseconds tmo, + int min_list_size, + int max_list_size) + : m_create_connection(conn) + , m_send_timeout(tmo) +{ + std::random_device seed; + m_random_generator = std::mt19937(seed()); + + if (min_list_size < 0) { + min_list_size = 1; + } + if (max_list_size < min_list_size) { + max_list_size = min_list_size; + } + m_size_dist = std::uniform_int_distribution<>{ min_list_size, max_list_size }; +} + +void +dunedaq::listrev::ListCreator::send_create(int id) +{ + CreateList req; + req.list_id = id; + req.list_size = m_size_dist(m_random_generator); + + get_iomanager()->get_sender(m_create_connection)->send(std::move(req), m_send_timeout); +} diff --git a/src/ListCreator.hpp b/src/ListCreator.hpp new file mode 100755 index 0000000..8f3a9ea --- /dev/null +++ b/src/ListCreator.hpp @@ -0,0 +1,42 @@ +/** + * @file ListCreator.hpp + * + * Helper methods for sending CreateList requests + * + * This is part of the DUNE DAQ Software Suite, copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ + +#ifndef LISTREV_PLUGINS_LISTCREATOR_HPP_ +#define LISTREV_PLUGINS_LISTCREATOR_HPP_ + +#include "ListWrapper.hpp" + +#include + +namespace dunedaq { +namespace listrev { + +class ListCreator +{ +public: + ListCreator() = default; + ListCreator(std::string conn, std::chrono::milliseconds tmo, int min_list_size, int max_list_size); + + // Methods + void send_create(int id); + +private: + // Data + std::mt19937 m_random_generator; + std::uniform_int_distribution<> m_size_dist; + + // Configuration + std::string m_create_connection; + std::chrono::milliseconds m_send_timeout; +}; +} // namespace listrev +} // namespace dunedaq + +#endif // LISTREV_PLUGINS_LISTCREATOR_HPP_ \ No newline at end of file diff --git a/src/ListStorage.cpp b/src/ListStorage.cpp new file mode 100755 index 0000000..fdd7822 --- /dev/null +++ b/src/ListStorage.cpp @@ -0,0 +1,56 @@ +/** + * @file ListStorage.cpp ListStorage implementations + * + * This is part of the DUNE DAQ Software Suite, copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ + +#include "ListStorage.hpp" +#include "CommonIssues.hpp" + +bool +dunedaq::listrev::ListStorage::has_list(const int& id) const +{ + std::lock_guard lk(m_lists_mutex); + return m_lists.count(id); +} + +dunedaq::listrev::IntList +dunedaq::listrev::ListStorage::get_list(const int& id) const +{ + std::lock_guard lk(m_lists_mutex); + if (!m_lists.count(id)) { + throw ListNotFound(ERS_HERE, id); + } + + return m_lists.at(id); +} + +void +dunedaq::listrev::ListStorage::add_list(IntList list, bool ignoreDuplicates) +{ + std::lock_guard lk(m_lists_mutex); + if (m_lists.count(list.list_id) && !ignoreDuplicates) { + throw ListExists(ERS_HERE, list.list_id); + } + m_lists[list.list_id] = list; + + while (m_lists.size() > m_capacity) { + m_lists.erase(m_lists.begin()); + } +} + +size_t +dunedaq::listrev::ListStorage::size() const +{ + std::lock_guard lk(m_lists_mutex); + return m_lists.size(); +} + +void +dunedaq::listrev::ListStorage::flush() +{ + std::lock_guard lk(m_lists_mutex); + m_lists.clear(); +} \ No newline at end of file diff --git a/src/ListStorage.hpp b/src/ListStorage.hpp new file mode 100755 index 0000000..c115403 --- /dev/null +++ b/src/ListStorage.hpp @@ -0,0 +1,45 @@ +/** + * @file ListStorage.hpp + * + * ListStorage defines the data storage class used by the listrev modules + * + * This is part of the DUNE DAQ Software Suite, copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ + +#ifndef LISTREV_PLUGINS_LISTSTORAGE_HPP_ +#define LISTREV_PLUGINS_LISTSTORAGE_HPP_ + +#include "ListWrapper.hpp" + +#include +#include +#include + +namespace dunedaq { +namespace listrev { + + class ListStorage + { + public: + ListStorage() {} + + bool has_list(const int& id) const; + IntList get_list(const int& id) const; + void add_list(IntList list, bool ignoreDuplicates = false); + + size_t size() const; + void set_capacity(const size_t& capacity) { m_capacity = capacity; } + size_t capacity() const { return m_capacity; } + void flush(); + + private: + std::map m_lists; + mutable std::mutex m_lists_mutex; + size_t m_capacity{ 1000 }; + }; +} // namespace listrev +} // namespace duneadq + +#endif // LISTREV_PLUGINS_LISTSTORAGE_HPP_ \ No newline at end of file diff --git a/src/ListWrapper.hpp b/src/ListWrapper.hpp new file mode 100755 index 0000000..f3d8058 --- /dev/null +++ b/src/ListWrapper.hpp @@ -0,0 +1,99 @@ +/** + * @file ListWrapper.hpp + * + * ListWrapper wraps a std::vector so that it can be transmitted over the network using the Unified Communications + * API (iomanager) + * + * This is part of the DUNE DAQ Software Suite, copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ + +#ifndef LISTREV_PLUGINS_LISTWRAPPER_HPP_ +#define LISTREV_PLUGINS_LISTWRAPPER_HPP_ + +#include "serialization/Serialization.hpp" + +#include + +namespace dunedaq { +namespace listrev { +struct IntList +{ + int list_id; + int generator_id; + std::vector list; + + IntList() = default; + explicit IntList(const int& id, const int& gid, std::vector const& l) + : list_id(id) + , generator_id(gid) + , list(l.begin(), l.end()) + { + } + + DUNE_DAQ_SERIALIZE(IntList, list_id, generator_id, list); +}; + +struct ReversedList +{ + struct Data + { + IntList original; + IntList reversed; + + DUNE_DAQ_SERIALIZE(Data, original, reversed); + }; + int list_id; + int reverser_id; + std::vector lists; + + ReversedList() = default; + ReversedList(const int& id, const int& rid, std::vector const& ls) + : list_id(id) + , reverser_id(rid) + , lists(ls.begin(), ls.end()) + { + } + + DUNE_DAQ_SERIALIZE(ReversedList, list_id, reverser_id, lists); +}; + +struct CreateList +{ + int list_id; + uint16_t list_size; + + CreateList() = default; + CreateList(const int& id, const uint16_t& size) + : list_id(id) + , list_size(size) + { + } + + DUNE_DAQ_SERIALIZE(CreateList, list_id, list_size); +}; +struct RequestList +{ + int list_id; + std::string destination; + + RequestList() = default; + explicit RequestList(const int& id, const std::string& dest) + : list_id(id) + , destination(dest) + { + } + + DUNE_DAQ_SERIALIZE(RequestList, list_id, destination); +}; +} // namespace listrev + +DUNE_DAQ_SERIALIZABLE(listrev::IntList, "IntList"); +DUNE_DAQ_SERIALIZABLE(listrev::ReversedList::Data, "ReversedListData"); +DUNE_DAQ_SERIALIZABLE(listrev::ReversedList, "ReversedList"); +DUNE_DAQ_SERIALIZABLE(listrev::CreateList, "CreateList"); +DUNE_DAQ_SERIALIZABLE(listrev::RequestList, "RequestList"); +} // namespace dunedaq + +#endif // LISTREV_PLUGINS_LISTWRAPPER_HPP_