Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configure IOManager with OKS objects #303

Merged
merged 7 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ find_package(oksdalgen REQUIRED)
daq_oks_codegen(appfwk.schema.xml TEST NAMESPACE dunedaq::appfwk::dal
DALDIR dal DEP_PKGS confmodel)

daq_codegen( app.jsonnet cmd.jsonnet DEP_PKGS iomanager rcif cmdlib TEMPLATES Structs.hpp.j2 Nljs.hpp.j2 )
daq_codegen( cmd.jsonnet DEP_PKGS rcif cmdlib TEMPLATES Structs.hpp.j2 Nljs.hpp.j2 )
daq_protobuf_codegen( opmon/*.proto )

##############################################################################
Expand Down
40 changes: 12 additions & 28 deletions apps/daq_application.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ int
main(int argc, char* argv[])
{

dunedaq::logging::Logging().setup();

// Setup signals
// std::signal(SIGABRT, signal_handler);
Expand All @@ -66,45 +65,30 @@ main(int argc, char* argv[])
appfwk::CommandLineInterpreter args;
try {
args = appfwk::CommandLineInterpreter::parse(argc, argv);
} catch (ers::Issue& e) {
} catch ( bpo::error const& e ) {
// Die but do it gracefully gracefully.
ers::error(appfwk::BadCliUsage(ERS_HERE, e.message()));
exit(-1);
std::cerr << "Failed to interpret command line: " << e.what();
exit(1);
}

if (args.help_requested) {
exit(0);
}

// Get the application and session name from the environment.
std::string app_name = "";
std::string session_name = "";

char* app_name_c = getenv("DUNEDAQ_APPLICATION_NAME");
char* session_name_c = getenv("DUNEDAQ_SESSION");

bool missing_env_var =
!app_name_c || std::string(app_name_c) == "" || !session_name_c || std::string(session_name_c) == "";
if (missing_env_var) {
ers::error(appfwk::EnvironmentVariableNotFound(ERS_HERE, "DUNEDAQ_APPLICATION_NAME or DUNEDAQ_SESSION"));
exit(1);
}

app_name = app_name_c;
session_name = session_name_c;

if (args.app_name != app_name || args.session_name != session_name) {
ers::error(appfwk::MismatchedEnvAndCLI(ERS_HERE, "name", "DUNEDAQ_APPLICATION_NAME", args.app_name, app_name));
ers::error(appfwk::MismatchedEnvAndCLI(ERS_HERE, "session", "DUNEDAQ_SESSION", args.session_name, session_name));
exit(1);
}
// up to here it was not possible to use ERS messages

dunedaq::logging::Logging().setup( args.session_name, args.app_name );

// from now on, it's possible to use ERS messages

// Create the Application
appfwk::Application app(app_name, session_name, args.command_facility_plugin_name, args.conf_service_plugin_name);
appfwk::Application app(args.app_name, args.session_name,
args.command_facility_plugin_name,
args.conf_service_plugin_name);

app.init();
app.run(run_marker);

TLOG() << "Application " << app_name << " exiting.";
TLOG() << "Application " << args.session_name << '.' << args.app_name << " exiting.";
return 0;
}
23 changes: 14 additions & 9 deletions include/appfwk/ModuleConfiguration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
#define APPFWK_INCLUDE_MODULECONFIGURATION_HPP_

#include "appfwk/ConfigurationManager.hpp"
#include "conffwk/Configuration.hpp"
#include "confmodel/ActionPlan.hpp"
#include "confmodel/DaqModule.hpp"
#include "iomanager/IOManager.hpp"
#include "conffwk/Configuration.hpp"

#include <string>
#include <vector>
Expand Down Expand Up @@ -43,20 +43,25 @@ namespace appfwk {
class ModuleConfiguration
{
std::shared_ptr<ConfigurationManager> m_config_mgr;
std::unordered_map<std::string, const dunedaq::confmodel::ActionPlan*> m_action_plans;
std::vector<const dunedaq::confmodel::DaqModule*> m_modules;
iomanager::Queues_t m_queues;
iomanager::Connections_t m_networkconnections;
std::unordered_map<std::string, const confmodel::ActionPlan*> m_action_plans;
std::vector<const confmodel::DaqModule*> m_modules;
std::vector<const confmodel::Queue*> m_queues;
std::vector<const confmodel::NetworkConnection*> m_networkconnections;
const confmodel::ConnectivityService* m_connsvc_config;

public:
explicit ModuleConfiguration(std::shared_ptr<ConfigurationManager> mgr);

const iomanager::Queues_t& queues() { return m_queues; }
const iomanager::Connections_t& networkconnections() { return m_networkconnections; }
const std::vector<const confmodel::Queue*>& queues() { return m_queues; }
const std::vector<const confmodel::NetworkConnection*>& networkconnections() { return m_networkconnections; }
const std::vector<const confmodel::DaqModule*>& modules() { return m_modules; }
const confmodel::ConnectivityService* connectivity_service() { return m_connsvc_config; }

const std::unordered_map<std::string, const dunedaq::confmodel::ActionPlan*>& action_plans() { return m_action_plans; }
const dunedaq::confmodel::ActionPlan* action_plan(std::string cmd) const;
const std::unordered_map<std::string, const confmodel::ActionPlan*>& action_plans()
{
return m_action_plans;
}
const confmodel::ActionPlan* action_plan(std::string cmd) const;

std::shared_ptr<ConfigurationManager> configuration_manager() { return m_config_mgr; }

Expand Down
68 changes: 0 additions & 68 deletions schema/appfwk/app.jsonnet

This file was deleted.

4 changes: 2 additions & 2 deletions src/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ Application::Application(std::string appname,

m_cmd_fac = cmdlib::make_command_facility(
cmdlibimpl,
m_config_mgr->session()->get_connectivity_service_interval_ms(),
m_config_mgr->session()->get_use_connectivity_server()
session,
m_config_mgr->session()->get_connectivity_service()
);

set_opmon_conf(m_config_mgr->application()->get_opmon_conf());
Expand Down
33 changes: 11 additions & 22 deletions src/CommandLineInterpreter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,23 @@
#define APPFWK_INCLUDE_APPFWK_COMMANDLINEINTERPRETER_HPP_

#include "boost/program_options.hpp"
#include "ers/ers.hpp"

#include <iostream>
#include <string>
#include <vector>

namespace bpo = boost::program_options;

namespace dunedaq {

// Disable coverage collection LCOV_EXCL_START
ERS_DECLARE_ISSUE(appfwk, // Namespace
CommandLineIssue, // Class name
"Command-line processing issue in " << app_name << ": " << message, // Message
((std::string)app_name)((std::string)message)) // Args

// Re-enable coverage collection LCOV_EXCL_STOP
namespace appfwk {
/**
* @brief CommandLineInterpreter parses the command-line options given to the
* application and stores the results as validated data members
*
* @details Please note that contrary to the rest of the framework this class is not supposed to use ERS
* because the ERS cannot be instantiated until the command line is parsed
*/
struct CommandLineInterpreter
{
Expand All @@ -53,33 +50,25 @@ struct CommandLineInterpreter
"passed on)";
bpo::options_description desc(descstr.str());
desc.add_options()("name,n", bpo::value<std::string>()->required(), "Application name")(
"session,s", bpo::value<std::string>()->default_value("global"), "Session name")(
"session,s", bpo::value<std::string>()->required(), "Session name")(
"commandFacility,c", bpo::value<std::string>()->required(), "CommandFacility URI")(
"configurationService,d", bpo::value<std::string>()->required(), "Configuration Service URI")(
"help,h", "produce help message");

bpo::variables_map vm;
try {
auto parsed = bpo::command_line_parser(argc, argv).options(desc).allow_unregistered().run();

output.other_options = bpo::collect_unrecognized(parsed.options, bpo::include_positional);
bpo::store(parsed, vm);
} catch (bpo::error const& e) {
throw CommandLineIssue(ERS_HERE, *argv, e.what());
}
auto parsed = bpo::command_line_parser(argc, argv).options(desc).allow_unregistered().run();

output.other_options = bpo::collect_unrecognized(parsed.options, bpo::include_positional);
bpo::store(parsed, vm);

if (vm.count("help")) {
std::cout << desc << std::endl; // NOLINT
output.help_requested = true;
return output;
}

try {
bpo::notify(vm);
} catch (bpo::error const& e) {
throw CommandLineIssue(ERS_HERE, *argv, e.what());
}

bpo::notify(vm);

output.app_name = vm["name"].as<std::string>();
output.session_name = vm["session"].as<std::string>();
output.command_facility_plugin_name = vm["commandFacility"].as<std::string>();
Expand Down
53 changes: 26 additions & 27 deletions src/DAQModuleManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include "cmdlib/cmd/Nljs.hpp"

#include "appfwk/Issues.hpp"
#include "appfwk/app/Nljs.hpp"
#include "appfwk/cmd/Nljs.hpp"

#include "appfwk/DAQModule.hpp"
Expand Down Expand Up @@ -43,33 +42,32 @@ DAQModuleManager::DAQModuleManager()
void
DAQModuleManager::initialize(std::shared_ptr<ConfigurationManager> cfgMgr, opmonlib::OpMonManager& opm)
{
auto csInterval = cfgMgr->session()->get_connectivity_service_interval_ms();
m_module_configuration = std::make_shared<ModuleConfiguration>(cfgMgr);
get_iomanager()->configure(m_module_configuration->queues(),
get_iomanager()->configure(cfgMgr->session()->UID(),
m_module_configuration->queues(),
m_module_configuration->networkconnections(),
cfgMgr->session()->get_use_connectivity_server(),
std::chrono::milliseconds(csInterval),
m_module_configuration->connectivity_service(),
opm);
init_modules(m_module_configuration->modules(), opm);

for (auto& plan_pair : m_module_configuration->action_plans()) {
auto cmd = plan_pair.first;

for (auto& step : plan_pair.second->get_steps()) {
auto byType = step->cast<confmodel::DaqModulesGroupByType>();
auto byMod = step->cast<confmodel::DaqModulesGroupById>();
if (byType != nullptr) {
for (auto& mod_type : byType->get_modules()) {
check_mod_has_cmd(cmd, mod_type);
}
} else if (byMod != nullptr) {
for (auto& mod : byMod->get_modules()) {
check_mod_has_cmd(cmd, mod->class_name(), mod->UID());
}
} else {
throw ActionPlanValidationFailed(ERS_HERE, cmd, "", "Invalid subclass of DaqModulesGroup encountered!");
for (auto& step : plan_pair.second->get_steps()) {
auto byType = step->cast<confmodel::DaqModulesGroupByType>();
auto byMod = step->cast<confmodel::DaqModulesGroupById>();
if (byType != nullptr) {
for (auto& mod_type : byType->get_modules()) {
check_mod_has_cmd(cmd, mod_type);
}
} else if (byMod != nullptr) {
for (auto& mod : byMod->get_modules()) {
check_mod_has_cmd(cmd, mod->class_name(), mod->UID());
}
} else {
throw ActionPlanValidationFailed(ERS_HERE, cmd, "", "Invalid subclass of DaqModulesGroup encountered!");
}
}
}
this->m_initialized = true;
}
Expand Down Expand Up @@ -174,7 +172,8 @@ DAQModuleManager::execute_action(const std::string& module_name, const std::stri
void
DAQModuleManager::execute_action_plan_step(std::string const& cmd,
const confmodel::DaqModulesGroup* step,
const dataobj_t& cmd_data, bool execution_mode_is_serial)
const dataobj_t& cmd_data,
bool execution_mode_is_serial)
{
std::string failed_mod_names("");
std::unordered_map<std::string, std::future<bool>> futures;
Expand All @@ -185,19 +184,20 @@ DAQModuleManager::execute_action_plan_step(std::string const& cmd,
for (auto& mod_class : byType->get_modules()) {
auto modules = m_modules_by_type[mod_class];
for (auto& mod_name : modules) {
auto data_obj = get_dataobj_for_module(mod_name, cmd_data);
TLOG_DEBUG(1) << "Executing action " << cmd << " on module " << mod_name << " (class " << mod_class << ")";
futures[mod_name] =
std::async(std::launch::async, &DAQModuleManager::execute_action, this, mod_name, cmd, data_obj);
if (execution_mode_is_serial)
futures[mod_name].wait();
auto data_obj = get_dataobj_for_module(mod_name, cmd_data);
TLOG_DEBUG(1) << "Executing action " << cmd << " on module " << mod_name << " (class " << mod_class << ")";
futures[mod_name] =
std::async(std::launch::async, &DAQModuleManager::execute_action, this, mod_name, cmd, data_obj);
if (execution_mode_is_serial)
futures[mod_name].wait();
}
}
} else if (byMod != nullptr) {
for (auto& mod : byMod->get_modules()) {
auto mod_name = mod->UID();
auto data_obj = get_dataobj_for_module(mod_name, cmd_data);
TLOG_DEBUG(1) << "Executing action " << cmd << " on module " << mod_name << " (class " << mod->class_name() << ")";
TLOG_DEBUG(1) << "Executing action " << cmd << " on module " << mod_name << " (class " << mod->class_name()
<< ")";
futures[mod_name] =
std::async(std::launch::async, &DAQModuleManager::execute_action, this, mod_name, cmd, data_obj);
if (execution_mode_is_serial)
Expand Down Expand Up @@ -338,7 +338,6 @@ DAQModuleManager::execute(const std::string& cmd, const dataobj_t& cmd_data)
for (auto& step : action_plan->get_steps()) {
execute_action_plan_step(cmd, step, cmd_data, serial_execution);
}

}

// Shutdown IOManager at scrap
Expand Down
1 change: 0 additions & 1 deletion src/DAQModuleManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include "confmodel/DaqModule.hpp"
#include "conffwk/Configuration.hpp"

#include "appfwk/app/Structs.hpp"
#include "cmdlib/cmd/Structs.hpp"
#include "opmonlib/OpMonManager.hpp"

Expand Down
Loading
Loading