diff --git a/libraries/YarpPlugins/CanBusControlboard/CanRxTxThreads.cpp b/libraries/YarpPlugins/CanBusControlboard/CanRxTxThreads.cpp index 269709cb4..bd37dafd5 100644 --- a/libraries/YarpPlugins/CanBusControlboard/CanRxTxThreads.cpp +++ b/libraries/YarpPlugins/CanBusControlboard/CanRxTxThreads.cpp @@ -4,6 +4,8 @@ #include +#include + #include #include "YarpCanSenderDelegate.hpp" @@ -26,6 +28,20 @@ void CanReaderWriterThread::afterStart(bool success) // ----------------------------------------------------------------------------- +void CanReaderWriterThread::onStop() +{ + CD_INFO("Stopping CanBusControlboard %s thread %s.\n", type.c_str(), id.c_str()); +} + +// ----------------------------------------------------------------------------- + +void CanReaderWriterThread::setDelay(double delay) +{ + this->delay = delay <= 0.0 ? std::numeric_limits::min() : delay; +} + +// ----------------------------------------------------------------------------- + CanReaderThread::CanReaderThread(const std::string & id) : CanReaderWriterThread("read", id) {} @@ -47,22 +63,30 @@ void CanReaderThread::registerHandle(ICanBusSharer * p) void CanReaderThread::run() { unsigned int read; + bool ok; - //-- Return immediately if there is nothing to be read (non-blocking call), return false on errors. - bool ok = iCanBus->canRead(canBuffer, bufferSize, &read); + while (!isStopping()) + { + //-- Lend CPU time to write threads. + // https://github.com/roboticslab-uc3m/yarp-devices/issues/191 + yarp::os::Time::delay(delay); - //-- All debugging messages should be contained in canRead, so just loop again. - if (!ok || read == 0) return; + //-- Return immediately if there is nothing to be read (non-blocking call), return false on errors. + ok = iCanBus->canRead(canBuffer, bufferSize, &read); - for (int i = 0; i < read; i++) - { - const yarp::dev::CanMessage & msg = canBuffer[i]; - auto it = canIdToHandle.find(msg.getId() & 0x7F); + //-- All debugging messages should be contained in canRead, so just loop again. + if (!ok || read == 0) continue; - if (it != canIdToHandle.end()) + for (int i = 0; i < read; i++) { - it->second->interpretMessage(msg); - } + const yarp::dev::CanMessage & msg = canBuffer[i]; + auto it = canIdToHandle.find(msg.getId() & 0x7F); + + if (it != canIdToHandle.end()) + { + it->second->interpretMessage(msg); + } + } } } @@ -83,11 +107,11 @@ CanWriterThread::~CanWriterThread() // ----------------------------------------------------------------------------- -void CanWriterThread::run() +void CanWriterThread::flush() { std::lock_guard lock(bufferMutex); - //-- Nothing to write, just loop again. + //-- Nothing to write, exit. if (preparedMessages == 0) return; yarp::dev::CanErrors errors; @@ -120,6 +144,21 @@ void CanWriterThread::run() // ----------------------------------------------------------------------------- +void CanWriterThread::run() +{ + while (!isStopping()) + { + //-- Lend CPU time to read threads. + // https://github.com/roboticslab-uc3m/yarp-devices/issues/191 + yarp::os::Time::delay(delay); + + //-- Send everything and reset the queue. + flush(); + } +} + +// ----------------------------------------------------------------------------- + void CanWriterThread::handlePartialWrite(unsigned int sent) { for (int i = sent, j = 0; i < preparedMessages; i++, j++) diff --git a/libraries/YarpPlugins/CanBusControlboard/CanRxTxThreads.hpp b/libraries/YarpPlugins/CanBusControlboard/CanRxTxThreads.hpp index 1b78bb4f3..26c0012e1 100644 --- a/libraries/YarpPlugins/CanBusControlboard/CanRxTxThreads.hpp +++ b/libraries/YarpPlugins/CanBusControlboard/CanRxTxThreads.hpp @@ -7,7 +7,7 @@ #include #include -#include +#include #include #include "CanSenderDelegate.hpp" @@ -21,31 +21,20 @@ namespace roboticslab * @brief Base class for a thread that attends CAN reads or writes. * * Child classes take advantage of CAN message buffers to perform bulk reads - * and writes. Non-zero wait periods aim to lend CPU time to other threads, see - * #191. + * and writes. */ -class CanReaderWriterThread : public yarp::os::PeriodicThread +class CanReaderWriterThread : public yarp::os::Thread { public: //! Constructor. CanReaderWriterThread(const std::string & type, const std::string & id) - : yarp::os::PeriodicThread(0.0), - iCanBus(nullptr), iCanBusErrors(nullptr), iCanBufferFactory(nullptr), + : iCanBus(nullptr), iCanBusErrors(nullptr), iCanBufferFactory(nullptr), type(type), id(id), bufferSize(0), delay(0.0) { } //! Virtual destructor. virtual ~CanReaderWriterThread() = default; - //! Configure CAN interface handles. - virtual void setCanHandles(yarp::dev::ICanBus * iCanBus, yarp::dev::ICanBusErrors * iCanBusErrors, - yarp::dev::ICanBufferFactory * iCanBufferFactory, unsigned int bufferSize) - { - this->iCanBus = iCanBus; this->iCanBusErrors = iCanBusErrors; this->iCanBufferFactory = iCanBufferFactory; - this->bufferSize = bufferSize; - } - -protected: //! Invoked by the thread right before it is started. virtual bool threadInit() override { canBuffer = iCanBufferFactory->createBuffer(bufferSize); return true; } @@ -60,9 +49,24 @@ class CanReaderWriterThread : public yarp::os::PeriodicThread //! Invoked by the caller right after the thread is started. virtual void afterStart(bool success) override; + //! Callback on thread stop. + virtual void onStop() override; + //! The thread will invoke this once. virtual void run() override = 0; + //! Configure CAN interface handles. + virtual void setCanHandles(yarp::dev::ICanBus * iCanBus, yarp::dev::ICanBusErrors * iCanBusErrors, + yarp::dev::ICanBufferFactory * iCanBufferFactory, unsigned int bufferSize) + { + this->iCanBus = iCanBus; this->iCanBusErrors = iCanBusErrors; this->iCanBufferFactory = iCanBufferFactory; + this->bufferSize = bufferSize; + } + + //! Configure a delay (in seconds) before each read/write. + void setDelay(double delay); + +protected: yarp::dev::ICanBus * iCanBus; yarp::dev::ICanBusErrors * iCanBusErrors; yarp::dev::ICanBufferFactory * iCanBufferFactory; @@ -95,7 +99,6 @@ class CanReaderThread : public CanReaderWriterThread const std::unordered_map & getHandleMap() { return canIdToHandle; } -protected: virtual void run() override; private: @@ -121,10 +124,12 @@ class CanWriterThread : public CanReaderWriterThread //! Retrieve a handle to the CAN sender delegate. CanSenderDelegate * getDelegate(); + //! Send awaiting messages and clear the queue. + void flush(); + virtual void setCanHandles(yarp::dev::ICanBus * iCanBus, yarp::dev::ICanBusErrors * iCanBusErrors, yarp::dev::ICanBufferFactory * iCanBufferFactory, unsigned int bufferSize) override; -protected: virtual void run() override; private: diff --git a/libraries/YarpPlugins/CanBusControlboard/DeviceDriverImpl.cpp b/libraries/YarpPlugins/CanBusControlboard/DeviceDriverImpl.cpp index 0cf8f5b89..f36cf0023 100644 --- a/libraries/YarpPlugins/CanBusControlboard/DeviceDriverImpl.cpp +++ b/libraries/YarpPlugins/CanBusControlboard/DeviceDriverImpl.cpp @@ -102,16 +102,16 @@ bool CanBusControlboard::open(yarp::os::Searchable & config) { int rxBufferSize = canBusOptions.check("rxBufferSize", yarp::os::Value(100), "CAN bus RX buffer size").asInt32(); int txBufferSize = canBusOptions.check("txBufferSize", yarp::os::Value(100), "CAN bus TX buffer size").asInt32(); - double rxPeriod = canBusOptions.check("rxPeriod", yarp::os::Value(0.0), "CAN bus RX period (seconds)").asFloat64(); - double txPeriod = canBusOptions.check("txPeriod", yarp::os::Value(0.0), "CAN bus TX period (seconds)").asFloat64(); + double rxDelay = canBusOptions.check("rxDelay", yarp::os::Value(0.0), "CAN bus RX delay (seconds)").asFloat64(); + double txDelay = canBusOptions.check("txDelay", yarp::os::Value(0.0), "CAN bus TX delay (seconds)").asFloat64(); CanReaderThread * reader = new CanReaderThread(canBus); reader->setCanHandles(iCanBus, iCanBusErrors, iCanBufferFactory, rxBufferSize); - reader->setPeriod(rxPeriod); + reader->setDelay(rxDelay); CanWriterThread * writer = new CanWriterThread(canBus); writer->setCanHandles(iCanBus, iCanBusErrors, iCanBufferFactory, txBufferSize); - writer->setPeriod(txPeriod); + writer->setDelay(txDelay); canThreads.push_back({canBus, reader, writer}); } @@ -285,7 +285,7 @@ bool CanBusControlboard::open(yarp::os::Searchable & config) } writer->getDelegate()->prepareMessage({0x80, 0, nullptr}); // SYNC - writer->step(); + writer->flush(); } return true; @@ -329,14 +329,14 @@ bool CanBusControlboard::close() { if (bundle.reader && bundle.reader->isRunning()) { - bundle.reader->stop(); + ok &= bundle.reader->stop(); } delete bundle.reader; if (bundle.writer && bundle.writer->isRunning()) { - bundle.writer->stop(); + ok &= bundle.writer->stop(); } delete bundle.writer;