Skip to content

Commit

Permalink
Allow event statuses to be taken anytime
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Dec 28, 2024
1 parent e7493d3 commit 41b5de7
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 156 deletions.
66 changes: 33 additions & 33 deletions rmw_zenoh_cpp/src/detail/event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <algorithm>
#include <deque>
#include <memory>
#include <mutex>
#include <stdexcept>
#include <unordered_map>
#include <utility>

Expand Down Expand Up @@ -52,6 +54,18 @@ rmw_zenoh_event_type_t zenoh_event_from_rmw_event(rmw_event_type_t rmw_event_typ
return ZENOH_EVENT_INVALID;
}

///=============================================================================
rmw_zenoh_event_status_t::rmw_zenoh_event_status_t()
: total_count(0),
total_count_change(0),
current_count(0),
current_count_change(0),
data(std::string()),
changed(false)
{
// Do nothing.
}

///=============================================================================
void DataCallbackManager::set_callback(
const void * user_data, rmw_event_callback_t callback)
Expand Down Expand Up @@ -134,35 +148,30 @@ void EventsManager::trigger_event_callback(rmw_zenoh_event_type_t event_id)
}

///=============================================================================
std::unique_ptr<rmw_zenoh_event_status_t> EventsManager::pop_next_event(
rmw_zenoh_event_status_t EventsManager::take_event_status(
rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return nullptr;
throw std::runtime_error("Invalid event_type");
}

std::lock_guard<std::mutex> lock(event_mutex_);

if (event_queues_[event_id].empty()) {
// This tells rcl that the check for a new events was done, but no events have come in yet.
return nullptr;
}

std::unique_ptr<rmw_zenoh_event_status_t> event_status =
std::move(event_queues_[event_id].front());
event_queues_[event_id].pop_front();

return event_status;
// Create a copy to return before resetting counters.
auto ret = event_statuses_[event_id];
event_statuses_[event_id].current_count_change = 0;
event_statuses_[event_id].total_count_change = 0;
event_statuses_[event_id].changed = false;
return ret;
}

///=============================================================================
void EventsManager::add_new_event(
void EventsManager::update_event_status(
rmw_zenoh_event_type_t event_id,
std::unique_ptr<rmw_zenoh_event_status_t> event)
int32_t current_count_change)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
Expand All @@ -174,24 +183,15 @@ void EventsManager::add_new_event(

{
std::lock_guard<std::mutex> lock(event_mutex_);

std::deque<std::unique_ptr<rmw_zenoh_event_status_t>> & event_queue = event_queues_[event_id];
if (event_queue.size() >= event_queue_depth_) {
// Log warning if message is discarded due to hitting the queue depth
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"Event queue depth of %ld reached, discarding oldest message "
"for event type %d",
event_queue_depth_,
event_id);

event_queue.pop_front();
}

event_queue.emplace_back(std::move(event));
rmw_zenoh_event_status_t & status_to_update = event_statuses_[event_id];
status_to_update.total_count += std::max(0, current_count_change);
status_to_update.total_count_change += std::max(0, current_count_change);
status_to_update.current_count += current_count_change;
status_to_update.current_count_change = current_count_change;
status_to_update.changed = true;
}

// Since we added new data, trigger event callback and guard condition if they are available
// Since we updated data, trigger event callback and guard condition if they are available
trigger_event_callback(event_id);
notify_event(event_id);
}
Expand All @@ -211,7 +211,7 @@ bool EventsManager::queue_has_data_and_attach_condition_if_not(

std::lock_guard<std::mutex> lock(event_condition_mutex_);

if (!event_queues_[event_id].empty()) {
if (event_statuses_[event_id].changed) {
return true;
}

Expand All @@ -235,7 +235,7 @@ bool EventsManager::detach_condition_and_event_queue_is_empty(rmw_zenoh_event_ty

wait_set_data_[event_id] = nullptr;

return event_queues_[event_id].empty();
return !event_statuses_[event_id].changed;
}

///=============================================================================
Expand Down
28 changes: 14 additions & 14 deletions rmw_zenoh_cpp/src/detail/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,11 @@ struct rmw_zenoh_event_status_t
int32_t current_count_change;
// The data field can be used to store serialized information for more complex statuses.
std::string data;
// A boolean field to indicate if the status changed since the last take.
bool changed;

rmw_zenoh_event_status_t()
: total_count(0),
total_count_change(0),
current_count(0),
current_count_change(0)
{}
// Constructor.
rmw_zenoh_event_status_t();
};

///=============================================================================
Expand Down Expand Up @@ -110,16 +108,16 @@ class EventsManager
rmw_event_callback_t callback,
const void * user_data);

/// Pop the next event in the queue.
/// @param event_id the event id whose queue should be popped.
std::unique_ptr<rmw_zenoh_event_status_t> pop_next_event(
rmw_zenoh_event_type_t event_id);
/// @brief Get the status for an event.
/// @param event_id the id for the event whose status should be retrieved.
rmw_zenoh_event_status_t take_event_status(rmw_zenoh_event_type_t event_id);

/// Add an event status for an event.
/// @param event_id the event id queue to which the status should be added.
void add_new_event(
/// @brief Update the status for an event.
/// @param event_id the id for the event whose status should be changed.
/// @param current_count_change the change in the current count.
void update_event_status(
rmw_zenoh_event_type_t event_id,
std::unique_ptr<rmw_zenoh_event_status_t> event);
int32_t current_count_change);

/// @brief Attach the condition variable provided by rmw_wait.
/// @param condition_variable to attach.
Expand Down Expand Up @@ -151,6 +149,8 @@ class EventsManager
// A dequeue of events for each type of event this RMW supports.
std::deque<std::unique_ptr<rmw_zenoh_event_status_t>> event_queues_[ZENOH_EVENT_ID_MAX + 1] {};
const std::size_t event_queue_depth_ = 10;
// Statuses for events supported.
rmw_zenoh_event_status_t event_statuses_[ZENOH_EVENT_ID_MAX + 1];
};
} // namespace rmw_zenoh_cpp

Expand Down
86 changes: 18 additions & 68 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <algorithm>
#include <array>
#include <functional>
#include <limits>
Expand Down Expand Up @@ -229,14 +228,14 @@ void GraphCache::handle_matched_events_for_put(
if (entity->topic_info()->topic_keyexpr_ ==
sub_entity->topic_info().value().topic_keyexpr_)
{
update_event_counters(topic_info.name_, ZENOH_EVENT_SUBSCRIPTION_MATCHED, 1);
update_event_counters(entity, ZENOH_EVENT_SUBSCRIPTION_MATCHED, 1);
if (is_entity_local(*sub_entity)) {
local_entities_with_events[sub_entity].insert(ZENOH_EVENT_SUBSCRIPTION_MATCHED);
}
}
}
// Update event counters for the new entity->
update_event_counters(topic_info.name_,
// Update event counters for the new entity.
update_event_counters(entity,
ZENOH_EVENT_PUBLICATION_MATCHED,
match_count_for_entity);
if (is_entity_local(*entity) && match_count_for_entity > 0) {
Expand All @@ -259,23 +258,22 @@ void GraphCache::handle_matched_events_for_put(
if (entity->topic_info()->topic_keyexpr_ ==
pub_entity->topic_info().value().topic_keyexpr_)
{
update_event_counters(topic_info.name_, ZENOH_EVENT_PUBLICATION_MATCHED, 1);
update_event_counters(entity, ZENOH_EVENT_PUBLICATION_MATCHED, 1);
if (is_entity_local(*pub_entity)) {
local_entities_with_events[pub_entity].insert(ZENOH_EVENT_PUBLICATION_MATCHED);
}
}
}
// Update event counters for the new entity->
// Update event counters for the new entity.
update_event_counters(
topic_info.name_,
entity,
ZENOH_EVENT_SUBSCRIPTION_MATCHED,
match_count_for_entity);
if (is_entity_local(*entity) && match_count_for_entity > 0) {
local_entities_with_events[entity].insert(ZENOH_EVENT_SUBSCRIPTION_MATCHED);
}
}
}
take_entities_with_events(local_entities_with_events);
}

///=============================================================================
Expand All @@ -296,7 +294,7 @@ void GraphCache::handle_matched_events_for_del(
for (const auto & [_, topic_data_ptr] : topic_qos_map) {
for (liveliness::ConstEntityPtr sub_entity : topic_data_ptr->subs_) {
update_event_counters(
topic_info.name_,
entity,
ZENOH_EVENT_SUBSCRIPTION_MATCHED,
static_cast<int32_t>(-1));
if (is_entity_local(*sub_entity)) {
Expand All @@ -309,7 +307,7 @@ void GraphCache::handle_matched_events_for_del(
for (const auto & [_, topic_data_ptr] : topic_qos_map) {
for (liveliness::ConstEntityPtr pub_entity : topic_data_ptr->pubs_) {
update_event_counters(
topic_info.name_,
entity,
ZENOH_EVENT_PUBLICATION_MATCHED,
static_cast<int32_t>(-1));
if (is_entity_local(*pub_entity)) {
Expand All @@ -318,28 +316,6 @@ void GraphCache::handle_matched_events_for_del(
}
}
}
take_entities_with_events(local_entities_with_events);
}

///=============================================================================
void GraphCache::take_entities_with_events(const EntityEventMap & entities_with_events)
{
for (const auto & [local_entity, event_set] : entities_with_events) {
// Trigger callback set for this entity for the event type.
GraphEventCallbackMap::const_iterator event_callbacks_it =
event_callbacks_.find(local_entity->keyexpr_hash());
if (event_callbacks_it != event_callbacks_.end()) {
for (const rmw_zenoh_event_type_t & event_type : event_set) {
GraphEventCallbacks::const_iterator callback_it =
event_callbacks_it->second.find(event_type);
if (callback_it != event_callbacks_it->second.end()) {
std::unique_ptr<rmw_zenoh_event_status_t> taken_event =
take_event_status(local_entity->topic_info()->name_, event_type);
callback_it->second(std::move(taken_event));
}
}
}
}
}

///=============================================================================
Expand Down Expand Up @@ -1265,7 +1241,7 @@ bool GraphCache::is_entity_pub(const liveliness::Entity & entity)

///=============================================================================
void GraphCache::update_event_counters(
const std::string & topic_name,
liveliness::ConstEntityPtr entity,
const rmw_zenoh_event_type_t event_id,
int32_t change)
{
Expand All @@ -1275,42 +1251,16 @@ void GraphCache::update_event_counters(

std::lock_guard<std::mutex> lock(events_mutex_);

auto event_statuses_it = event_statuses_.find(topic_name);
if (event_statuses_it == event_statuses_.end()) {
// Initialize statuses.
std::array<rmw_zenoh_event_status_t, ZENOH_EVENT_ID_MAX + 1> status_array {};
event_statuses_[topic_name] = std::move(status_array);
}

rmw_zenoh_event_status_t & status_to_update = event_statuses_[topic_name][event_id];
status_to_update.total_count += std::max(0, change);
status_to_update.total_count_change += std::max(0, change);
status_to_update.current_count += change;
status_to_update.current_count_change = change;
}

///=============================================================================
std::unique_ptr<rmw_zenoh_event_status_t> GraphCache::take_event_status(
const std::string & topic_name,
const rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
return nullptr;
}

std::lock_guard<std::mutex> lock(events_mutex_);

auto event_statuses_it = event_statuses_.find(topic_name);
if (event_statuses_it == event_statuses_.end()) {
return nullptr;
// Trigger callback set for this entity for the event type.
GraphEventCallbackMap::const_iterator event_callbacks_it =
event_callbacks_.find(entity->keyexpr_hash());
if (event_callbacks_it != event_callbacks_.end()) {
GraphEventCallbacks::const_iterator callback_it =
event_callbacks_it->second.find(event_id);
if (callback_it != event_callbacks_it->second.end()) {
callback_it->second(change);
}
}

rmw_zenoh_event_status_t & status_to_take = event_statuses_[topic_name][event_id];
auto result = std::make_unique<rmw_zenoh_event_status_t>(status_to_take);
// Reset changes.
status_to_take.total_count_change = 0;
status_to_take.current_count_change = 0;
return result;
}

///=============================================================================
Expand Down
12 changes: 2 additions & 10 deletions rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class GraphCache final
public:
/// @brief Signature for a function that will be invoked by the GraphCache when a QoS
/// event is detected.
using GraphCacheEventCallback = std::function<void (std::unique_ptr<rmw_zenoh_event_status_t>)>;
using GraphCacheEventCallback = std::function<void (int32_t change)>;
/// Callback to be triggered when a publication cache is detected in the ROS Graph.
using QueryingSubscriberCallback = std::function<void (const std::string & queryable_prefix)>;

Expand Down Expand Up @@ -230,15 +230,10 @@ class GraphCache final
bool is_entity_local(const liveliness::Entity & entity) const;

void update_event_counters(
const std::string & topic_name,
liveliness::ConstEntityPtr entity,
const rmw_zenoh_event_type_t event_id,
int32_t change);

// Take status and reset change counters.
std::unique_ptr<rmw_zenoh_event_status_t> take_event_status(
const std::string & topic_name,
const rmw_zenoh_event_type_t event_id);

void handle_matched_events_for_put(
liveliness::ConstEntityPtr entity,
const GraphNode::TopicQoSMap & topic_qos_map);
Expand Down Expand Up @@ -299,9 +294,6 @@ class GraphCache final
// Map key expressions to a map of sub keyexpr_hash and QueryingSubscriberCallback.
std::unordered_map<std::string, std::unordered_map<std::size_t,
QueryingSubscriberCallback>> querying_subs_cbs_;
// Counters to track changes to event statues for each topic.
std::unordered_map<std::string,
std::array<rmw_zenoh_event_status_t, ZENOH_EVENT_ID_MAX + 1>> event_statuses_;
std::mutex events_mutex_;

// Mutex to lock before modifying the members above.
Expand Down
9 changes: 2 additions & 7 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ SubscriptionData::SubscriptionData(
type_support_impl_(type_support_impl),
type_support_(std::move(type_support)),
last_known_published_msg_({}),
total_messages_lost_(0),
wait_set_data_(nullptr),
is_shutdown_(false),
initialized_(false)
Expand Down Expand Up @@ -624,13 +623,9 @@ void SubscriptionData::add_new_message(
last_known_pub_it->second);
if (seq_increment > 1) {
const size_t num_msg_lost = seq_increment - 1;
total_messages_lost_ += num_msg_lost;
auto event_status = std::make_unique<rmw_zenoh_event_status_t>();
event_status->total_count_change = num_msg_lost;
event_status->total_count = total_messages_lost_;
events_mgr_->add_new_event(
events_mgr_->update_event_status(
ZENOH_EVENT_MESSAGE_LOST,
std::move(event_status));
num_msg_lost);
}
}
// Always update the last known sequence number for the publisher.
Expand Down
Loading

0 comments on commit 41b5de7

Please sign in to comment.