Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request-Based listrev #52

Merged
merged 3 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,16 @@ find_package(daq-cmake REQUIRED )

daq_setup_environment()

find_package(rcif REQUIRED)
find_package(appfwk REQUIRED)
find_package(opmonlib REQUIRED)
find_package(logging REQUIRED)
find_package(daqconf REQUIRED)
find_package(ers REQUIRED)

daq_codegen( randomdatalistgenerator.jsonnet TEMPLATES Structs.hpp.j2 Nljs.hpp.j2)
daq_codegen( randomdatalistgeneratorinfo.jsonnet DEP_PKGS opmonlib TEMPLATES opmonlib/InfoStructs.hpp.j2 opmonlib/InfoNljs.hpp.j2 )
daq_codegen( listreverser.jsonnet randomdatalistgenerator.jsonnet reversedlistvalidator.jsonnet TEMPLATES Structs.hpp.j2 Nljs.hpp.j2)
daq_codegen( listreverserinfo.jsonnet randomdatalistgeneratorinfo.jsonnet reversedlistvalidatorinfo.jsonnet DEP_PKGS opmonlib TEMPLATES opmonlib/InfoStructs.hpp.j2 opmonlib/InfoNljs.hpp.j2 )

daq_add_plugin(ListReverser duneDAQModule LINK_LIBRARIES appfwk::appfwk)
daq_add_plugin(RandomDataListGenerator duneDAQModule LINK_LIBRARIES appfwk::appfwk)
daq_add_plugin(ReversedListValidator duneDAQModule LINK_LIBRARIES appfwk::appfwk)
daq_add_library(ListCreator.cpp ListStorage.cpp LINK_LIBRARIES appfwk::appfwk)

daq_add_plugin(ListReverser duneDAQModule LINK_LIBRARIES listrev)
daq_add_plugin(RandomDataListGenerator duneDAQModule LINK_LIBRARIES listrev)
daq_add_plugin(ReversedListValidator duneDAQModule LINK_LIBRARIES listrev)

daq_install()
3 changes: 0 additions & 3 deletions cmake/listrevConfig.cmake.in
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ include(CMakeFindDependencyMacro)

find_dependency(opmonlib)
find_dependency(appfwk)
find_dependency(rcif)
find_dependency(logging)
find_dependency(ers)

# Figure out whether or not this dependency is an installed package or
# in repo form
Expand Down
41 changes: 22 additions & 19 deletions integtest/listrev_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@

# Default values for validation parameters
check_for_logfile_errors=True
expected_event_count=run_duration
expected_event_count=run_duration*10
expected_event_count_tolerance = expected_event_count / 10

# The next three variable declarations *must* be present as globals in the test
# file. They're read by the "fixtures" in conftest.py to determine how
Expand All @@ -36,12 +37,14 @@
g_conf={"detector": {"op_env": "integtest"},"boot": { "use_connectivity_service": use_connectivity_service}, "listrev": {"apps": ["rv", "g"]}}
r_conf={"detector": {"op_env": "integtest"},"boot": { "use_connectivity_service": use_connectivity_service}, "listrev": {"apps": ["gv", "r"]}}
separate_conf={"detector": {"op_env": "integtest"},"boot": { "use_connectivity_service": use_connectivity_service}, "listrev": {"apps": ["g", "r", "v"]}}
multigen_conf={"detector": {"op_env": "integtest"},"boot": { "use_connectivity_service": use_connectivity_service}, "listrev": {"apps": ["g", "g", "g", "rr", "v"]}}

confgen_arguments={"Single App": single_app_conf,
"Separate Verifier": v_conf,
"Separate Generator": g_conf,
"Separate Reverser": r_conf,
"Independent Apps": separate_conf}
"Independent Apps": separate_conf,
"Multiple Generators": multigen_conf}
# The commands to run in nanorc, as a list
nanorc_command_list="integtest-partition boot conf".split()
nanorc_command_list+="start_run --disable-data-storage 101 wait ".split() + [str(run_duration)] + "stop_run wait 2".split()
Expand Down Expand Up @@ -78,38 +81,38 @@ def test_log_files(run_nanorc):
# Check that there are no warnings or errors in the log files
assert log_file_checks.logs_are_error_free(run_nanorc.log_files)

# Exiting the do_work() method, generated 31 lists and successfully sent 62 copies. DAQModule: rdlg
# Exiting do_stop() method, generated 2081 lists, and sent 2081 list messages DAQModule: rdlg0
generator_generated = 0
generator_sent = 0
# Exiting do_work() method, received 31 lists and successfully sent 31. DAQModule: lr
# Exiting do_stop() method, received 2081 request messages, sent 2081, received 2081 lists, and sent 2081 reversed list messages DAQModule: lr0
reverser_received = 0
reverser_sent = 0
# Exiting do_work() method, received 31 reversed lists, compared 31 of them to their original data, and found 0 mismatches. DAQModule: lrv
# Exiting do_stop() method, received 2081 reversed list messages, compared 2081 reversed lists to their original data, and found 0 mismatches. DAQModule: lrv
validator_received = 0
validator_received_reversed = 0
validator_errors = 999

for idx in range(len(run_nanorc.log_files)):
for line in open(run_nanorc.log_files[idx]).readlines():
if "Exiting" in line:
if "generated" in line:
m = re.search("generated ([0-9]+) lists and successfully sent ([0-9]+) copies",line)
if "Exiting do_stop" in line:
if "RandomDataListGenerator" in line:
m = re.search("generated ([0-9]+) lists, and sent ([0-9]+) list messages",line)
generator_generated = int(m.group(1))
generator_sent = int(m.group(2))
if "ListReverser" in line:
m = re.search("received ([0-9]+) lists and successfully sent ([0-9]+).",line)
reverser_received = int(m.group(1))
reverser_sent = int(m.group(2))
if "mismatches" in line:
m = re.search("received ([0-9]+) reversed lists, compared ([0-9]+) of them to their original data, and found ([0-9]+) mismatches.",line)
m = re.search("received ([0-9]+) request messages, sent ([0-9]+), received ([0-9]+) lists, and sent ([0-9]+) reversed list messages",line)
reverser_received = int(m.group(3))
reverser_sent = int(m.group(4))
if "ReversedListValidator" in line:
m = re.search("received ([0-9]+) reversed list messages, compared ([0-9]+) reversed lists to their original data, and found ([0-9]+) mismatches.",line)
validator_received = int(m.group(2))
validator_received_reversed = int(m.group(1))
validator_errors = int(m.group(3))

assert generator_generated >= expected_event_count
assert generator_sent >= expected_event_count * 2
assert reverser_received >= expected_event_count
assert reverser_sent >= expected_event_count
assert validator_received >= expected_event_count
assert validator_received_reversed >= expected_event_count
assert generator_generated >= expected_event_count - expected_event_count_tolerance
assert generator_sent >= expected_event_count - expected_event_count_tolerance
assert reverser_received >= expected_event_count - expected_event_count_tolerance # times num generators?
assert reverser_sent >= expected_event_count - expected_event_count_tolerance
assert validator_received >= expected_event_count - expected_event_count_tolerance # times num generators?
assert validator_received_reversed >= expected_event_count - expected_event_count_tolerance
assert validator_errors == 0
190 changes: 138 additions & 52 deletions plugins/ListReverser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
* received with this code.
*/

#include "ListReverser.hpp"
#include "listrev/listreverser/Nljs.hpp"
#include "listrev/listreverserinfo/InfoNljs.hpp"

#include "CommonIssues.hpp"
#include "ListReverser.hpp"

#include "appfwk/DAQModuleHelper.hpp"
#include "iomanager/IOManager.hpp"
Expand All @@ -25,44 +28,85 @@
#define TRACE_NAME "ListReverser" // NOLINT
#define TLVL_ENTER_EXIT_METHODS 10
#define TLVL_LIST_REVERSAL 15
#define TLVL_REQUEST_SENDING 16
#define TLVL_CONFIGURE 17

namespace dunedaq {
namespace listrev {

ListReverser::ListReverser(const std::string& name)
: DAQModule(name)
, thread_(std::bind(&ListReverser::do_work, this, std::placeholders::_1))
, inputQueue_(nullptr)
, outputQueue_(nullptr)
, queueTimeout_(100)
{
register_command("start", &ListReverser::do_start);
register_command("stop", &ListReverser::do_stop);
register_command("conf", &ListReverser::do_configure, std::set<std::string>{ "INITIAL" });
register_command("start", &ListReverser::do_start, std::set<std::string>{ "CONFIGURED" });
register_command("stop", &ListReverser::do_stop, std::set<std::string>{ "TRIGGER_SOURCES_STOPPED" });
}

void
ListReverser::init(const nlohmann::json& iniobj)
{
TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method";
auto qi = appfwk::connection_index(iniobj, { "input", "output" });
auto qi = appfwk::connection_index(iniobj, { "request_input", "list_input" });
m_requests = qi["request_input"];
m_list_connection = qi["list_input"];

try {
inputQueue_ = get_iom_receiver<IntList>(qi["input"]);
get_iom_receiver<IntList>(m_list_connection);
} catch (const ers::Issue& excpt) {
throw InvalidQueueFatalError(ERS_HERE, get_name(), "input", excpt);
}
try {
outputQueue_ = get_iom_sender<IntList>(qi["output"]);
get_iom_receiver<RequestList>(m_requests);
} catch (const ers::Issue& excpt) {
throw InvalidQueueFatalError(ERS_HERE, get_name(), "output", excpt);
}

TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
}

void
ListReverser::get_info(opmonlib::InfoCollector& ci, int /*level*/)
{
listreverserinfo::Info fcr;

fcr.requests_received = m_requests_received.exchange(0);
fcr.requests_sent = m_requests_sent.exchange(0);
fcr.lists_received = m_lists_received.exchange(0);
fcr.lists_sent = m_lists_sent.exchange(0);
fcr.total_requests_received = m_total_requests_received.load();
fcr.total_requests_sent = m_total_requests_sent.load();
fcr.total_lists_received = m_total_lists_received.load();
fcr.total_lists_sent = m_total_lists_sent.load();
ci.add(fcr);
}

void
ListReverser::do_configure(const nlohmann::json& obj)
{
TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_configure() method";
auto parsed_conf = obj.get<listreverser::ConfParams>();
m_send_timeout = std::chrono::milliseconds(parsed_conf.send_timeout_ms);
m_request_timeout = std::chrono::milliseconds(parsed_conf.request_timeout_ms);
m_num_generators = parsed_conf.num_generators;
m_reverser_id = parsed_conf.reverser_id;

TLOG_DEBUG(TLVL_CONFIGURE) << "ListReverser " << m_reverser_id << " configured with "
<< "send timeout " << parsed_conf.send_timeout_ms << " ms,"
<< " request timeout " << parsed_conf.request_timeout_ms << "ms, "
<< " and num_generators " << parsed_conf.num_generators;

TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_configure() method";
}

void
ListReverser::do_start(const nlohmann::json& /*startobj*/)
{
TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method";
thread_.start_working_thread();
get_iomanager()->add_callback<IntList>(m_list_connection,
std::bind(&ListReverser::process_list, this, std::placeholders::_1));
get_iomanager()->add_callback<RequestList>(
m_requests, std::bind(&ListReverser::process_list_request, this, std::placeholders::_1));

TLOG() << get_name() << " successfully started";
TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method";
}
Expand All @@ -71,11 +115,46 @@ void
ListReverser::do_stop(const nlohmann::json& /*stopobj*/)
{
TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method";
thread_.stop_working_thread();
get_iomanager()->remove_callback<RequestList>(m_requests);
get_iomanager()->remove_callback<IntList>(m_list_connection);
TLOG() << get_name() << " successfully stopped";

std::ostringstream oss_summ;
oss_summ << ": Exiting do_stop() method, received " << m_total_requests_received.load() << " request messages, "
<< "sent " << m_total_requests_sent.load() << ", received " << m_total_lists_received.load()
<< " lists, and sent " << m_total_lists_sent.load() << " reversed list messages";
ers::info(ProgressUpdate(ERS_HERE, get_name(), oss_summ.str()));

TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method";
}

void
ListReverser::process_list_request(const RequestList& request)
{
TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering process_list_request() method";
{
std::lock_guard<std::mutex> lk(m_map_mutex);
if (!m_pending_lists.count(request.list_id)) {
m_pending_lists[request.list_id] = PendingList(request.destination, request.list_id, m_reverser_id);
++m_requests_received;
++m_total_requests_received;
}
}

for (size_t gen_idx = 0; gen_idx < m_num_generators; ++gen_idx) {
TLOG_DEBUG(TLVL_REQUEST_SENDING) << "Sending request for " << request.list_id << " with destination "
<< m_list_connection << " to rdlg" << gen_idx
<< "_request_connection (num_generators=" << m_num_generators << ")";
RequestList req(request.list_id, m_list_connection);
get_iomanager()
->get_sender<RequestList>("rdlg" + std::to_string(gen_idx) + "_request_connection")
->send(std::move(req), m_send_timeout);
++m_requests_sent;
++m_total_requests_sent;
}
TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting process_list_request() method";
}

/**
* @brief Format a std::vector<int> to a stream
* @param t ostream Instance
Expand All @@ -97,59 +176,66 @@ operator<<(std::ostream& t, std::vector<int> ints)
}

void
ListReverser::do_work(std::atomic<bool>& running_flag)
ListReverser::process_list(const IntList& list)
{
TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_work() method";
int receivedCount = 0;
int sentCount = 0;
std::vector<int> workingVector;

while (running_flag.load()) {
TLOG_DEBUG(TLVL_LIST_REVERSAL) << get_name() << ": Going to receive data from input queue";
try {
workingVector = inputQueue_->receive(queueTimeout_).list;
} catch (const dunedaq::iomanager::TimeoutExpired& excpt) {
// it is perfectly reasonable that there might be no data in the queue
// some fraction of the times that we check, so we just continue on and try again
continue;
}
TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering process_list() method";

++receivedCount;
TLOG_DEBUG(TLVL_LIST_REVERSAL) << get_name() << ": Received list #" << receivedCount << ". It has size "
<< workingVector.size() << ". Reversing its contents";
std::reverse(workingVector.begin(), workingVector.end());
std::lock_guard<std::mutex> lk(m_map_mutex);
++m_lists_received;
++m_total_lists_received;
TLOG_DEBUG(TLVL_LIST_REVERSAL) << get_name() << ": Received list #" << list.list_id << " from " << list.generator_id
<< ". It has size " << list.list.size() << ". Reversing its contents";

std::ostringstream oss_prog;
oss_prog << "Reversed list #" << receivedCount << ", new contents " << workingVector << " and size "
<< workingVector.size() << ". ";
ers::debug(ProgressUpdate(ERS_HERE, get_name(), oss_prog.str()));
if (m_pending_lists.count(list.list_id) == 0) {

std::ostringstream oss_warn;
oss_warn << "send " << list.list_id << " to \"" << m_pending_lists[list.list_id].requestor
<< "\" (late list receive)";
ers::warning(dunedaq::iomanager::TimeoutExpired(ERS_HERE, get_name(), oss_warn.str(), m_send_timeout.count()));
return;
}

auto workingVector = list.list;
std::reverse(workingVector.begin(), workingVector.end());
IntList wrapped(list.list_id, m_reverser_id, workingVector);

ReversedList::Data this_data;
this_data.original = list;
this_data.reversed = wrapped;

m_pending_lists[list.list_id].list.lists.push_back(this_data);

std::ostringstream oss_prog;
oss_prog << "Reversed list #" << list.list_id << " from " << list.generator_id << ", new contents " << workingVector
<< " and size " << workingVector.size() << ". ";
ers::debug(ProgressUpdate(ERS_HERE, get_name(), oss_prog.str()));

if (m_pending_lists[list.list_id].list.lists.size() >= m_num_generators ||
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - m_pending_lists[list.list_id].start_time) > m_request_timeout) {

bool successfullyWasSent = false;
while (!successfullyWasSent && running_flag.load()) {
TLOG_DEBUG(TLVL_LIST_REVERSAL) << get_name() << ": Pushing the reversed list onto the output queue";
int failCount = 0;
while (!successfullyWasSent && failCount < 100) {
TLOG_DEBUG(TLVL_LIST_REVERSAL) << get_name() << ": Sending the reversed lists " << list.list_id;
try {
IntList wrapped(workingVector);
outputQueue_->send(std::move(wrapped), queueTimeout_);
get_iomanager()
->get_sender<ReversedList>(m_pending_lists[list.list_id].requestor)
->send(std::move(m_pending_lists[list.list_id].list), m_send_timeout);
successfullyWasSent = true;
++sentCount;
++m_lists_sent;
++m_total_lists_sent;
m_pending_lists.erase(list.list_id);
} catch (const dunedaq::iomanager::TimeoutExpired& excpt) {
std::ostringstream oss_warn;
oss_warn << "push to output queue \"" << outputQueue_->get_name() << "\"";
ers::warning(dunedaq::iomanager::TimeoutExpired(
ERS_HERE,
get_name(),
oss_warn.str(),
std::chrono::duration_cast<std::chrono::milliseconds>(queueTimeout_).count()));
oss_warn << "send " << list.list_id << " to \"" << m_pending_lists[list.list_id].requestor << "\"";
ers::warning(dunedaq::iomanager::TimeoutExpired(ERS_HERE, get_name(), oss_warn.str(), m_send_timeout.count()));
++failCount;
}
}
TLOG_DEBUG(TLVL_LIST_REVERSAL) << get_name() << ": End of do_work loop";
}

std::ostringstream oss_summ;
oss_summ << ": Exiting do_work() method, received " << receivedCount << " lists and successfully sent " << sentCount
<< ". ";
ers::info(ProgressUpdate(ERS_HERE, get_name(), oss_summ.str()));
TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_work() method";
TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting process_list() method";
}

} // namespace listrev
Expand Down
Loading