Skip to content

Commit

Permalink
Revert "Implement RW as YARP periodic threads"
Browse files Browse the repository at this point in the history
* this reverts commit 908bc29
* see #231
  • Loading branch information
PeterBowman committed Jan 18, 2020
1 parent 908bc29 commit 059dd41
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 37 deletions.
65 changes: 52 additions & 13 deletions libraries/YarpPlugins/CanBusControlboard/CanRxTxThreads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#include <cstring>

#include <yarp/os/Time.h>

#include <ColorDebug.h>

#include "YarpCanSenderDelegate.hpp"
Expand All @@ -26,6 +28,20 @@ void CanReaderWriterThread::afterStart(bool success)

// -----------------------------------------------------------------------------

void CanReaderWriterThread::onStop()
{
CD_INFO("Stopping CanBusControlboard %s thread %s.\n", type.c_str(), id.c_str());
}

// -----------------------------------------------------------------------------

void CanReaderWriterThread::setDelay(double delay)
{
this->delay = delay <= 0.0 ? std::numeric_limits<double>::min() : delay;
}

// -----------------------------------------------------------------------------

CanReaderThread::CanReaderThread(const std::string & id)
: CanReaderWriterThread("read", id)
{}
Expand All @@ -47,22 +63,30 @@ void CanReaderThread::registerHandle(ICanBusSharer * p)
void CanReaderThread::run()
{
unsigned int read;
bool ok;

//-- Return immediately if there is nothing to be read (non-blocking call), return false on errors.
bool ok = iCanBus->canRead(canBuffer, bufferSize, &read);
while (!isStopping())
{
//-- Lend CPU time to write threads.
// https://github.com/roboticslab-uc3m/yarp-devices/issues/191
yarp::os::Time::delay(delay);

//-- All debugging messages should be contained in canRead, so just loop again.
if (!ok || read == 0) return;
//-- Return immediately if there is nothing to be read (non-blocking call), return false on errors.
ok = iCanBus->canRead(canBuffer, bufferSize, &read);

for (int i = 0; i < read; i++)
{
const yarp::dev::CanMessage & msg = canBuffer[i];
auto it = canIdToHandle.find(msg.getId() & 0x7F);
//-- All debugging messages should be contained in canRead, so just loop again.
if (!ok || read == 0) continue;

if (it != canIdToHandle.end())
for (int i = 0; i < read; i++)
{
it->second->interpretMessage(msg);
}
const yarp::dev::CanMessage & msg = canBuffer[i];
auto it = canIdToHandle.find(msg.getId() & 0x7F);

if (it != canIdToHandle.end())
{
it->second->interpretMessage(msg);
}
}
}
}

Expand All @@ -83,11 +107,11 @@ CanWriterThread::~CanWriterThread()

// -----------------------------------------------------------------------------

void CanWriterThread::run()
void CanWriterThread::flush()
{
std::lock_guard<std::mutex> lock(bufferMutex);

//-- Nothing to write, just loop again.
//-- Nothing to write, exit.
if (preparedMessages == 0) return;

yarp::dev::CanErrors errors;
Expand Down Expand Up @@ -120,6 +144,21 @@ void CanWriterThread::run()

// -----------------------------------------------------------------------------

void CanWriterThread::run()
{
while (!isStopping())
{
//-- Lend CPU time to read threads.
// https://github.com/roboticslab-uc3m/yarp-devices/issues/191
yarp::os::Time::delay(delay);

//-- Send everything and reset the queue.
flush();
}
}

// -----------------------------------------------------------------------------

void CanWriterThread::handlePartialWrite(unsigned int sent)
{
for (int i = sent, j = 0; i < preparedMessages; i++, j++)
Expand Down
39 changes: 22 additions & 17 deletions libraries/YarpPlugins/CanBusControlboard/CanRxTxThreads.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include <string>
#include <unordered_map>

#include <yarp/os/PeriodicThread.h>
#include <yarp/os/Thread.h>
#include <yarp/dev/CanBusInterface.h>

#include "CanSenderDelegate.hpp"
Expand All @@ -21,31 +21,20 @@ namespace roboticslab
* @brief Base class for a thread that attends CAN reads or writes.
*
* Child classes take advantage of CAN message buffers to perform bulk reads
* and writes. Non-zero wait periods aim to lend CPU time to other threads, see
* <a href="https://github.com/roboticslab-uc3m/yarp-devices/issues/191">#191</a>.
* and writes.
*/
class CanReaderWriterThread : public yarp::os::PeriodicThread
class CanReaderWriterThread : public yarp::os::Thread
{
public:
//! Constructor.
CanReaderWriterThread(const std::string & type, const std::string & id)
: yarp::os::PeriodicThread(0.0),
iCanBus(nullptr), iCanBusErrors(nullptr), iCanBufferFactory(nullptr),
: iCanBus(nullptr), iCanBusErrors(nullptr), iCanBufferFactory(nullptr),
type(type), id(id), bufferSize(0), delay(0.0)
{ }

//! Virtual destructor.
virtual ~CanReaderWriterThread() = default;

//! Configure CAN interface handles.
virtual void setCanHandles(yarp::dev::ICanBus * iCanBus, yarp::dev::ICanBusErrors * iCanBusErrors,
yarp::dev::ICanBufferFactory * iCanBufferFactory, unsigned int bufferSize)
{
this->iCanBus = iCanBus; this->iCanBusErrors = iCanBusErrors; this->iCanBufferFactory = iCanBufferFactory;
this->bufferSize = bufferSize;
}

protected:
//! Invoked by the thread right before it is started.
virtual bool threadInit() override
{ canBuffer = iCanBufferFactory->createBuffer(bufferSize); return true; }
Expand All @@ -60,9 +49,24 @@ class CanReaderWriterThread : public yarp::os::PeriodicThread
//! Invoked by the caller right after the thread is started.
virtual void afterStart(bool success) override;

//! Callback on thread stop.
virtual void onStop() override;

//! The thread will invoke this once.
virtual void run() override = 0;

//! Configure CAN interface handles.
virtual void setCanHandles(yarp::dev::ICanBus * iCanBus, yarp::dev::ICanBusErrors * iCanBusErrors,
yarp::dev::ICanBufferFactory * iCanBufferFactory, unsigned int bufferSize)
{
this->iCanBus = iCanBus; this->iCanBusErrors = iCanBusErrors; this->iCanBufferFactory = iCanBufferFactory;
this->bufferSize = bufferSize;
}

//! Configure a delay (in seconds) before each read/write.
void setDelay(double delay);

protected:
yarp::dev::ICanBus * iCanBus;
yarp::dev::ICanBusErrors * iCanBusErrors;
yarp::dev::ICanBufferFactory * iCanBufferFactory;
Expand Down Expand Up @@ -95,7 +99,6 @@ class CanReaderThread : public CanReaderWriterThread
const std::unordered_map<unsigned int, ICanBusSharer *> & getHandleMap()
{ return canIdToHandle; }

protected:
virtual void run() override;

private:
Expand All @@ -121,10 +124,12 @@ class CanWriterThread : public CanReaderWriterThread
//! Retrieve a handle to the CAN sender delegate.
CanSenderDelegate * getDelegate();

//! Send awaiting messages and clear the queue.
void flush();

virtual void setCanHandles(yarp::dev::ICanBus * iCanBus, yarp::dev::ICanBusErrors * iCanBusErrors,
yarp::dev::ICanBufferFactory * iCanBufferFactory, unsigned int bufferSize) override;

protected:
virtual void run() override;

private:
Expand Down
14 changes: 7 additions & 7 deletions libraries/YarpPlugins/CanBusControlboard/DeviceDriverImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,16 @@ bool CanBusControlboard::open(yarp::os::Searchable & config)
{
int rxBufferSize = canBusOptions.check("rxBufferSize", yarp::os::Value(100), "CAN bus RX buffer size").asInt32();
int txBufferSize = canBusOptions.check("txBufferSize", yarp::os::Value(100), "CAN bus TX buffer size").asInt32();
double rxPeriod = canBusOptions.check("rxPeriod", yarp::os::Value(0.0), "CAN bus RX period (seconds)").asFloat64();
double txPeriod = canBusOptions.check("txPeriod", yarp::os::Value(0.0), "CAN bus TX period (seconds)").asFloat64();
double rxDelay = canBusOptions.check("rxDelay", yarp::os::Value(0.0), "CAN bus RX delay (seconds)").asFloat64();
double txDelay = canBusOptions.check("txDelay", yarp::os::Value(0.0), "CAN bus TX delay (seconds)").asFloat64();

CanReaderThread * reader = new CanReaderThread(canBus);
reader->setCanHandles(iCanBus, iCanBusErrors, iCanBufferFactory, rxBufferSize);
reader->setPeriod(rxPeriod);
reader->setDelay(rxDelay);

CanWriterThread * writer = new CanWriterThread(canBus);
writer->setCanHandles(iCanBus, iCanBusErrors, iCanBufferFactory, txBufferSize);
writer->setPeriod(txPeriod);
writer->setDelay(txDelay);

canThreads.push_back({canBus, reader, writer});
}
Expand Down Expand Up @@ -285,7 +285,7 @@ bool CanBusControlboard::open(yarp::os::Searchable & config)
}

writer->getDelegate()->prepareMessage({0x80, 0, nullptr}); // SYNC
writer->step();
writer->flush();
}

return true;
Expand Down Expand Up @@ -329,14 +329,14 @@ bool CanBusControlboard::close()
{
if (bundle.reader && bundle.reader->isRunning())
{
bundle.reader->stop();
ok &= bundle.reader->stop();
}

delete bundle.reader;

if (bundle.writer && bundle.writer->isRunning())
{
bundle.writer->stop();
ok &= bundle.writer->stop();
}

delete bundle.writer;
Expand Down

0 comments on commit 059dd41

Please sign in to comment.