Skip to content

Commit

Permalink
Merge pull request #488 from ZeroCM/bridge-startup_watchdog
Browse files Browse the repository at this point in the history
able to specify watchdog timeouts for bridge prior to first message received
  • Loading branch information
jbendes authored May 20, 2024
2 parents 9f1d928 + e66f569 commit 7881609
Showing 1 changed file with 77 additions and 43 deletions.
120 changes: 77 additions & 43 deletions tools/cpp/bridge/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ static void sighandler(int signal)

struct Watchdog
{
uint64_t startTime = 0;
uint64_t lastFeedTimeA = 0, lastFeedTimeB = 0;
uint64_t timeoutA, timeoutB;
uint64_t startupA, startupB;

std::unique_ptr<std::thread> thr;
std::mutex lock;
Expand All @@ -42,20 +44,36 @@ struct Watchdog

void run()
{
startTime = TimeUtil::utime();

while (!done) {
uint64_t now = TimeUtil::utime();

uint64_t dtA = timeoutA, dtB = timeoutB;

if (lastFeedTimeA != 0 && timeoutA != std::numeric_limits<uint64_t>::max()) {
uint64_t expireTimeA = lastFeedTimeA + timeoutA;
if (now > expireTimeA) assert(false && "Watchdog A exceeded");
if (lastFeedTimeA != 0) {
if (timeoutA != std::numeric_limits<uint64_t>::max()) {
uint64_t expireTimeA = lastFeedTimeA + timeoutA;
if (now > expireTimeA) assert(false && "Watchdog A exceeded");
dtA = expireTimeA - now;
}
} else if (startupA != std::numeric_limits<uint64_t>::max()) {
uint64_t expireTimeA = startTime + startupA;
if (now > expireTimeA) assert(false && "Watchdog A never fed");
dtA = expireTimeA - now;
}
if (lastFeedTimeB != 0 && timeoutB != std::numeric_limits<uint64_t>::max()) {
uint64_t expireTimeB = lastFeedTimeB + timeoutB;
if (now > expireTimeB) assert(false && "Watchdog B exceeded");

if (lastFeedTimeB != 0) {
if (dtB != std::numeric_limits<uint64_t>::max()) {
uint64_t expireTimeB = lastFeedTimeB + dtB;
if (now > expireTimeB) assert(false && "Watchdog B exceeded");
dtB = expireTimeB - now;
}
} else if (startupB != std::numeric_limits<uint64_t>::max()) {
uint64_t expireTimeB = startTime + startupB;
if (now > expireTimeB) assert(false && "Watchdog B never fed");
dtB = expireTimeB - now;

}

auto dt = std::chrono::microseconds(std::min(std::min(dtA, dtB), (uint64_t) 1e8));
Expand All @@ -65,8 +83,8 @@ struct Watchdog
}
}

Watchdog(uint64_t timeoutA, uint64_t timeoutB) :
timeoutA(timeoutA), timeoutB(timeoutB)
Watchdog(uint64_t timeoutA, uint64_t timeoutB, uint64_t startupA, uint64_t startupB) :
timeoutA(timeoutA), timeoutB(timeoutB), startupA(startupA), startupB(startupB)
{
thr.reset(new std::thread(&Watchdog::run, this));
}
Expand Down Expand Up @@ -98,6 +116,8 @@ struct Args

uint64_t Awatchdog = std::numeric_limits<uint64_t>::max();
uint64_t Bwatchdog = std::numeric_limits<uint64_t>::max();
uint64_t Astartup = std::numeric_limits<uint64_t>::max();
uint64_t Bstartup = std::numeric_limits<uint64_t>::max();

string plugin_path = "";

Expand All @@ -108,18 +128,20 @@ struct Args
// set some defaults
const char *optstring = "hA:B:a:b:D:p:d";
struct option long_opts[] = {
{ "help", no_argument, 0, 'h' },
{ "A-prefix", required_argument, 0, 0 },
{ "B-prefix", required_argument, 0, 0 },
{ "A-endpt", required_argument, 0, 'A' },
{ "B-endpt", required_argument, 0, 'B' },
{ "A-channel", required_argument, 0, 'a' },
{ "B-channel", required_argument, 0, 'b' },
{ "A-watchdog", required_argument, 0, 0 },
{ "B-watchdog", required_argument, 0, 0 },
{ "decimation", required_argument, 0, 'D' },
{ "plugin-path", required_argument, 0, 'p' },
{ "debug", no_argument, 0, 'd' },
{ "help", no_argument, 0, 'h' },
{ "A-prefix", required_argument, 0, 0 },
{ "B-prefix", required_argument, 0, 0 },
{ "A-endpt", required_argument, 0, 'A' },
{ "B-endpt", required_argument, 0, 'B' },
{ "A-channel", required_argument, 0, 'a' },
{ "B-channel", required_argument, 0, 'b' },
{ "A-watchdog", required_argument, 0, 0 },
{ "B-watchdog", required_argument, 0, 0 },
{ "A-startup-watchdog", required_argument, 0, 0 },
{ "B-startup-watchdog", required_argument, 0, 0 },
{ "decimation", required_argument, 0, 'D' },
{ "plugin-path", required_argument, 0, 'p' },
{ "debug", no_argument, 0, 'd' },
{ 0, 0, 0, 0 }
};

Expand Down Expand Up @@ -161,8 +183,16 @@ struct Args
Bprefix = optarg;
} else if (string(long_opts[option_index].name) == "A-watchdog") {
Awatchdog = atol(optarg);
assert(Awatchdog && "A-watchdog specified as 0, likely input parsing error");
} else if (string(long_opts[option_index].name) == "B-watchdog") {
Bwatchdog = atol(optarg);
assert(Bwatchdog && "B-watchdog specified as 0, likely input parsing error");
} else if (string(long_opts[option_index].name) == "A-startup-watchdog") {
Astartup = atol(optarg);
assert(Astartup && "A-startup-watchdog specified as 0, likely input parsing error");
} else if (string(long_opts[option_index].name) == "B-startup-watchdog") {
Bstartup = atol(optarg);
assert(Bstartup && "B-startup-watchdog specified as 0, likely input parsing error");
}
break;
case 'h': default: usage(); return false;
Expand Down Expand Up @@ -198,28 +228,32 @@ struct Args
<< "" << endl
<< "Options:" << endl
<< "" << endl
<< " -h, --help Shows this help text and exits" << endl
<< " -A, --A-enpt=URL One end of the bridge. Ex: zcm-bridge -A ipc" << endl
<< " -B, --B-endpt=URL One end of the bridge. Ex: zcm-bridge -B udpm://239.255.76.67:7667?ttl=0" << endl
<< " --A-prefix=PREFIX Specify a prefix for all messages published on the A url" << endl
<< " --B-prefix=PREFIX Specify a prefix for all messages published on the B url" << endl
<< " -a, --A-channel=CHANNEL One channel to subscribe to on A and repeat to B." << endl
<< " This argument can be specified multiple times. If this option is not," << endl
<< " present then we subscribe to all messages on the A interface." << endl
<< " Ex: zcm-bridge -A ipc -a EXAMPLE -B udpm://239.255.76.67:7667?ttl=0" << endl
<< " -b, --B-channel=CHANNEL One channel to subscribe to on B and repeat to A." << endl
<< " This argument can be specified multiple times. If this option is not," << endl
<< " present then we subscribe to all messages on the B interface." << endl
<< " Ex: zcm-bridge -A ipc -B udpm://239.255.76.67:7667?ttl=0 -b EXAMPLE" << endl
<< " --A-watchdog=TIMEOUT Timeout (in microseconds) since the most recently received message" << endl
<< " on the A transport after which bridge will assert" << endl
<< " --B-watchdog=TIMEOUT Timeout (in microseconds) since the most recently received message" << endl
<< " on the B transport after which bridge will assert" << endl
<< " -D, --decimation Decimation level for the preceeding A-channel or B-channel. " << endl
<< " Ex: zcm-bridge -A ipc -B udpm://239.255.76.67:7667?ttl=0 -b EXAMPLE -d 2" << endl
<< " This example would result in the message on EXAMPLE being rebroadcast on" << endl
<< " the A url every third message." << endl
<< " -p, --plugin-path=path Path to shared library containing transcoder plugins" << endl
<< " -h, --help Shows this help text and exits" << endl
<< " -A, --A-enpt=URL One end of the bridge. Ex: zcm-bridge -A ipc" << endl
<< " -B, --B-endpt=URL One end of the bridge. Ex: zcm-bridge -B udpm://239.255.76.67:7667?ttl=0" << endl
<< " --A-prefix=PREFIX Specify a prefix for all messages published on the A url" << endl
<< " --B-prefix=PREFIX Specify a prefix for all messages published on the B url" << endl
<< " -a, --A-channel=CHANNEL One channel to subscribe to on A and repeat to B." << endl
<< " This argument can be specified multiple times. If this option is not," << endl
<< " present then we subscribe to all messages on the A interface." << endl
<< " Ex: zcm-bridge -A ipc -a EXAMPLE -B udpm://239.255.76.67:7667?ttl=0" << endl
<< " -b, --B-channel=CHANNEL One channel to subscribe to on B and repeat to A." << endl
<< " This argument can be specified multiple times. If this option is not," << endl
<< " present then we subscribe to all messages on the B interface." << endl
<< " Ex: zcm-bridge -A ipc -B udpm://239.255.76.67:7667?ttl=0 -b EXAMPLE" << endl
<< " --A-watchdog=TIMEOUT Timeout (in microseconds) since the most recently received message" << endl
<< " on the A transport after which bridge will assert (default = inf)" << endl
<< " --B-watchdog=TIMEOUT Timeout (in microseconds) since the most recently received message" << endl
<< " on the B transport after which bridge will assert (default = inf)" << endl
<< " --A-startup-watchdog=TIMEOUT Timeout (in microseconds) from startup to receiving the first message" << endl
<< " on the A transport after which bridge will assert (default = inf)" << endl
<< " --B-startup-watchdog=TIMEOUT Timeout (in microseconds) from startup to receiving the first message" << endl
<< " on the B transport after which bridge will assert (default = inf)" << endl
<< " -D, --decimation Decimation level for the preceeding A-channel or B-channel. " << endl
<< " Ex: zcm-bridge -A ipc -B udpm://239.255.76.67:7667?ttl=0 -b EXAMPLE -d 2" << endl
<< " This example would result in the message on EXAMPLE being rebroadcast on" << endl
<< " the A url every third message." << endl
<< " -p, --plugin-path=path Path to shared library containing transcoder plugins" << endl
<< "" << endl << endl;
}
};
Expand Down Expand Up @@ -355,7 +389,7 @@ struct Bridge

void run()
{
wd.reset(new Watchdog(args.Awatchdog, args.Bwatchdog));
wd.reset(new Watchdog(args.Awatchdog, args.Bwatchdog, args.Astartup, args.Bstartup));

vector<BridgeInfo> infoA, infoB;

Expand Down

0 comments on commit 7881609

Please sign in to comment.