Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

able to specify watchdog timeouts for bridge prior to first message received #488

Merged
merged 2 commits into from
May 20, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 77 additions & 43 deletions tools/cpp/bridge/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ static void sighandler(int signal)

struct Watchdog
{
uint64_t startTime = 0;
uint64_t lastFeedTimeA = 0, lastFeedTimeB = 0;
uint64_t timeoutA, timeoutB;
uint64_t startupA, startupB;

std::unique_ptr<std::thread> thr;
std::mutex lock;
Expand All @@ -42,20 +44,36 @@ struct Watchdog

void run()
{
startTime = TimeUtil::utime();

while (!done) {
uint64_t now = TimeUtil::utime();

uint64_t dtA = timeoutA, dtB = timeoutB;

if (lastFeedTimeA != 0 && timeoutA != std::numeric_limits<uint64_t>::max()) {
uint64_t expireTimeA = lastFeedTimeA + timeoutA;
if (now > expireTimeA) assert(false && "Watchdog A exceeded");
if (lastFeedTimeA != 0) {
if (timeoutA != std::numeric_limits<uint64_t>::max()) {
uint64_t expireTimeA = lastFeedTimeA + timeoutA;
if (now > expireTimeA) assert(false && "Watchdog A exceeded");
dtA = expireTimeA - now;
}
} else if (startupA != std::numeric_limits<uint64_t>::max()) {
uint64_t expireTimeA = startTime + startupA;
if (now > expireTimeA) assert(false && "Watchdog A never fed");
dtA = expireTimeA - now;
}
if (lastFeedTimeB != 0 && timeoutB != std::numeric_limits<uint64_t>::max()) {
uint64_t expireTimeB = lastFeedTimeB + timeoutB;
if (now > expireTimeB) assert(false && "Watchdog B exceeded");

if (lastFeedTimeB != 0) {
if (dtB != std::numeric_limits<uint64_t>::max()) {
uint64_t expireTimeB = lastFeedTimeB + dtB;
if (now > expireTimeB) assert(false && "Watchdog B exceeded");
dtB = expireTimeB - now;
}
} else if (startupB != std::numeric_limits<uint64_t>::max()) {
uint64_t expireTimeB = startTime + startupB;
if (now > expireTimeB) assert(false && "Watchdog B never fed");
dtB = expireTimeB - now;

}

auto dt = std::chrono::microseconds(std::min(std::min(dtA, dtB), (uint64_t) 1e8));
Expand All @@ -65,8 +83,8 @@ struct Watchdog
}
}

Watchdog(uint64_t timeoutA, uint64_t timeoutB) :
timeoutA(timeoutA), timeoutB(timeoutB)
Watchdog(uint64_t timeoutA, uint64_t timeoutB, uint64_t startupA, uint64_t startupB) :
timeoutA(timeoutA), timeoutB(timeoutB), startupA(startupA), startupB(startupB)
{
thr.reset(new std::thread(&Watchdog::run, this));
}
Expand Down Expand Up @@ -98,6 +116,8 @@ struct Args

uint64_t Awatchdog = std::numeric_limits<uint64_t>::max();
uint64_t Bwatchdog = std::numeric_limits<uint64_t>::max();
uint64_t Astartup = std::numeric_limits<uint64_t>::max();
uint64_t Bstartup = std::numeric_limits<uint64_t>::max();

string plugin_path = "";

Expand All @@ -108,18 +128,20 @@ struct Args
// set some defaults
const char *optstring = "hA:B:a:b:D:p:d";
struct option long_opts[] = {
{ "help", no_argument, 0, 'h' },
{ "A-prefix", required_argument, 0, 0 },
{ "B-prefix", required_argument, 0, 0 },
{ "A-endpt", required_argument, 0, 'A' },
{ "B-endpt", required_argument, 0, 'B' },
{ "A-channel", required_argument, 0, 'a' },
{ "B-channel", required_argument, 0, 'b' },
{ "A-watchdog", required_argument, 0, 0 },
{ "B-watchdog", required_argument, 0, 0 },
{ "decimation", required_argument, 0, 'D' },
{ "plugin-path", required_argument, 0, 'p' },
{ "debug", no_argument, 0, 'd' },
{ "help", no_argument, 0, 'h' },
{ "A-prefix", required_argument, 0, 0 },
{ "B-prefix", required_argument, 0, 0 },
{ "A-endpt", required_argument, 0, 'A' },
{ "B-endpt", required_argument, 0, 'B' },
{ "A-channel", required_argument, 0, 'a' },
{ "B-channel", required_argument, 0, 'b' },
{ "A-watchdog", required_argument, 0, 0 },
{ "B-watchdog", required_argument, 0, 0 },
{ "A-startup-watchdog", required_argument, 0, 0 },
{ "B-startup-watchdog", required_argument, 0, 0 },
{ "decimation", required_argument, 0, 'D' },
{ "plugin-path", required_argument, 0, 'p' },
{ "debug", no_argument, 0, 'd' },
{ 0, 0, 0, 0 }
};

Expand Down Expand Up @@ -161,8 +183,16 @@ struct Args
Bprefix = optarg;
} else if (string(long_opts[option_index].name) == "A-watchdog") {
Awatchdog = atol(optarg);
assert(Awatchdog && "A-watchdog specified as 0, likely input parsing error");
} else if (string(long_opts[option_index].name) == "B-watchdog") {
Bwatchdog = atol(optarg);
assert(Bwatchdog && "B-watchdog specified as 0, likely input parsing error");
} else if (string(long_opts[option_index].name) == "A-startup-watchdog") {
Astartup = atol(optarg);
assert(Astartup && "A-startup-watchdog specified as 0, likely input parsing error");
} else if (string(long_opts[option_index].name) == "B-startup-watchdog") {
Bstartup = atol(optarg);
assert(Bstartup && "B-startup-watchdog specified as 0, likely input parsing error");
}
break;
case 'h': default: usage(); return false;
Expand Down Expand Up @@ -198,28 +228,32 @@ struct Args
<< "" << endl
<< "Options:" << endl
<< "" << endl
<< " -h, --help Shows this help text and exits" << endl
<< " -A, --A-enpt=URL One end of the bridge. Ex: zcm-bridge -A ipc" << endl
<< " -B, --B-endpt=URL One end of the bridge. Ex: zcm-bridge -B udpm://239.255.76.67:7667?ttl=0" << endl
<< " --A-prefix=PREFIX Specify a prefix for all messages published on the A url" << endl
<< " --B-prefix=PREFIX Specify a prefix for all messages published on the B url" << endl
<< " -a, --A-channel=CHANNEL One channel to subscribe to on A and repeat to B." << endl
<< " This argument can be specified multiple times. If this option is not," << endl
<< " present then we subscribe to all messages on the A interface." << endl
<< " Ex: zcm-bridge -A ipc -a EXAMPLE -B udpm://239.255.76.67:7667?ttl=0" << endl
<< " -b, --B-channel=CHANNEL One channel to subscribe to on B and repeat to A." << endl
<< " This argument can be specified multiple times. If this option is not," << endl
<< " present then we subscribe to all messages on the B interface." << endl
<< " Ex: zcm-bridge -A ipc -B udpm://239.255.76.67:7667?ttl=0 -b EXAMPLE" << endl
<< " --A-watchdog=TIMEOUT Timeout (in microseconds) since the most recently received message" << endl
<< " on the A transport after which bridge will assert" << endl
<< " --B-watchdog=TIMEOUT Timeout (in microseconds) since the most recently received message" << endl
<< " on the B transport after which bridge will assert" << endl
<< " -D, --decimation Decimation level for the preceeding A-channel or B-channel. " << endl
<< " Ex: zcm-bridge -A ipc -B udpm://239.255.76.67:7667?ttl=0 -b EXAMPLE -d 2" << endl
<< " This example would result in the message on EXAMPLE being rebroadcast on" << endl
<< " the A url every third message." << endl
<< " -p, --plugin-path=path Path to shared library containing transcoder plugins" << endl
<< " -h, --help Shows this help text and exits" << endl
<< " -A, --A-enpt=URL One end of the bridge. Ex: zcm-bridge -A ipc" << endl
<< " -B, --B-endpt=URL One end of the bridge. Ex: zcm-bridge -B udpm://239.255.76.67:7667?ttl=0" << endl
<< " --A-prefix=PREFIX Specify a prefix for all messages published on the A url" << endl
<< " --B-prefix=PREFIX Specify a prefix for all messages published on the B url" << endl
<< " -a, --A-channel=CHANNEL One channel to subscribe to on A and repeat to B." << endl
<< " This argument can be specified multiple times. If this option is not," << endl
<< " present then we subscribe to all messages on the A interface." << endl
<< " Ex: zcm-bridge -A ipc -a EXAMPLE -B udpm://239.255.76.67:7667?ttl=0" << endl
<< " -b, --B-channel=CHANNEL One channel to subscribe to on B and repeat to A." << endl
<< " This argument can be specified multiple times. If this option is not," << endl
<< " present then we subscribe to all messages on the B interface." << endl
<< " Ex: zcm-bridge -A ipc -B udpm://239.255.76.67:7667?ttl=0 -b EXAMPLE" << endl
<< " --A-watchdog=TIMEOUT Timeout (in microseconds) since the most recently received message" << endl
<< " on the A transport after which bridge will assert (default = inf)" << endl
<< " --B-watchdog=TIMEOUT Timeout (in microseconds) since the most recently received message" << endl
<< " on the B transport after which bridge will assert (default = inf)" << endl
<< " --A-startup-watchdog=TIMEOUT Timeout (in microseconds) from startup to receiving the first message" << endl
<< " on the A transport after which bridge will assert (default = inf)" << endl
<< " --B-startup-watchdog=TIMEOUT Timeout (in microseconds) from startup to receiving the first message" << endl
<< " on the B transport after which bridge will assert (default = inf)" << endl
<< " -D, --decimation Decimation level for the preceeding A-channel or B-channel. " << endl
<< " Ex: zcm-bridge -A ipc -B udpm://239.255.76.67:7667?ttl=0 -b EXAMPLE -d 2" << endl
<< " This example would result in the message on EXAMPLE being rebroadcast on" << endl
<< " the A url every third message." << endl
<< " -p, --plugin-path=path Path to shared library containing transcoder plugins" << endl
<< "" << endl << endl;
}
};
Expand Down Expand Up @@ -355,7 +389,7 @@ struct Bridge

void run()
{
wd.reset(new Watchdog(args.Awatchdog, args.Bwatchdog));
wd.reset(new Watchdog(args.Awatchdog, args.Bwatchdog, args.Astartup, args.Bstartup));

vector<BridgeInfo> infoA, infoB;

Expand Down
Loading