Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow event statuses to be taken anytime #365

Merged
merged 1 commit into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 33 additions & 33 deletions rmw_zenoh_cpp/src/detail/event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <algorithm>
#include <deque>
#include <memory>
#include <mutex>
#include <stdexcept>
#include <unordered_map>
#include <utility>

Expand Down Expand Up @@ -52,6 +54,18 @@ rmw_zenoh_event_type_t zenoh_event_from_rmw_event(rmw_event_type_t rmw_event_typ
return ZENOH_EVENT_INVALID;
}

///=============================================================================
rmw_zenoh_event_status_t::rmw_zenoh_event_status_t()
: total_count(0),
total_count_change(0),
current_count(0),
current_count_change(0),
data(std::string()),
changed(false)
{
// Do nothing.
}

///=============================================================================
void DataCallbackManager::set_callback(
const void * user_data, rmw_event_callback_t callback)
Expand Down Expand Up @@ -134,35 +148,30 @@ void EventsManager::trigger_event_callback(rmw_zenoh_event_type_t event_id)
}

///=============================================================================
std::unique_ptr<rmw_zenoh_event_status_t> EventsManager::pop_next_event(
rmw_zenoh_event_status_t EventsManager::take_event_status(
rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"RMW Zenoh is not correctly configured to handle rmw_zenoh_event_type_t [%d]. "
"Report this bug.",
event_id);
return nullptr;
throw std::runtime_error("Invalid event_type");
}

std::lock_guard<std::mutex> lock(event_mutex_);

if (event_queues_[event_id].empty()) {
// This tells rcl that the check for a new events was done, but no events have come in yet.
return nullptr;
}

std::unique_ptr<rmw_zenoh_event_status_t> event_status =
std::move(event_queues_[event_id].front());
event_queues_[event_id].pop_front();

return event_status;
// Create a copy to return before resetting counters.
auto ret = event_statuses_[event_id];
event_statuses_[event_id].current_count_change = 0;
event_statuses_[event_id].total_count_change = 0;
event_statuses_[event_id].changed = false;
return ret;
}

///=============================================================================
void EventsManager::add_new_event(
void EventsManager::update_event_status(
rmw_zenoh_event_type_t event_id,
std::unique_ptr<rmw_zenoh_event_status_t> event)
int32_t current_count_change)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
Expand All @@ -174,24 +183,15 @@ void EventsManager::add_new_event(

{
std::lock_guard<std::mutex> lock(event_mutex_);

std::deque<std::unique_ptr<rmw_zenoh_event_status_t>> & event_queue = event_queues_[event_id];
if (event_queue.size() >= event_queue_depth_) {
// Log warning if message is discarded due to hitting the queue depth
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp",
"Event queue depth of %ld reached, discarding oldest message "
"for event type %d",
event_queue_depth_,
event_id);

event_queue.pop_front();
}

event_queue.emplace_back(std::move(event));
rmw_zenoh_event_status_t & status_to_update = event_statuses_[event_id];
status_to_update.total_count += std::max(0, current_count_change);
status_to_update.total_count_change += std::max(0, current_count_change);
status_to_update.current_count += current_count_change;
status_to_update.current_count_change = current_count_change;
status_to_update.changed = true;
}

// Since we added new data, trigger event callback and guard condition if they are available
// Since we updated data, trigger event callback and guard condition if they are available
trigger_event_callback(event_id);
notify_event(event_id);
}
Expand All @@ -211,7 +211,7 @@ bool EventsManager::queue_has_data_and_attach_condition_if_not(

std::lock_guard<std::mutex> lock(event_condition_mutex_);

if (!event_queues_[event_id].empty()) {
if (event_statuses_[event_id].changed) {
return true;
}

Expand All @@ -235,7 +235,7 @@ bool EventsManager::detach_condition_and_event_queue_is_empty(rmw_zenoh_event_ty

wait_set_data_[event_id] = nullptr;

return event_queues_[event_id].empty();
return !event_statuses_[event_id].changed;
}

///=============================================================================
Expand Down
31 changes: 14 additions & 17 deletions rmw_zenoh_cpp/src/detail/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,11 @@ struct rmw_zenoh_event_status_t
int32_t current_count_change;
// The data field can be used to store serialized information for more complex statuses.
std::string data;
// A boolean field to indicate if the status changed since the last take.
bool changed;

rmw_zenoh_event_status_t()
: total_count(0),
total_count_change(0),
current_count(0),
current_count_change(0)
{}
// Constructor.
rmw_zenoh_event_status_t();
};

///=============================================================================
Expand Down Expand Up @@ -110,16 +108,16 @@ class EventsManager
rmw_event_callback_t callback,
const void * user_data);

/// Pop the next event in the queue.
/// @param event_id the event id whose queue should be popped.
std::unique_ptr<rmw_zenoh_event_status_t> pop_next_event(
rmw_zenoh_event_type_t event_id);
/// @brief Get the status for an event.
/// @param event_id the id for the event whose status should be retrieved.
rmw_zenoh_event_status_t take_event_status(rmw_zenoh_event_type_t event_id);

/// Add an event status for an event.
/// @param event_id the event id queue to which the status should be added.
void add_new_event(
/// @brief Update the status for an event.
/// @param event_id the id for the event whose status should be changed.
/// @param current_count_change the change in the current count.
void update_event_status(
rmw_zenoh_event_type_t event_id,
std::unique_ptr<rmw_zenoh_event_status_t> event);
int32_t current_count_change);

/// @brief Attach the condition variable provided by rmw_wait.
/// @param condition_variable to attach.
Expand Down Expand Up @@ -148,9 +146,8 @@ class EventsManager
rmw_event_callback_t event_callback_[ZENOH_EVENT_ID_MAX + 1] {nullptr};
const void * event_data_[ZENOH_EVENT_ID_MAX + 1] {nullptr};
size_t event_unread_count_[ZENOH_EVENT_ID_MAX + 1] {0};
// A dequeue of events for each type of event this RMW supports.
std::deque<std::unique_ptr<rmw_zenoh_event_status_t>> event_queues_[ZENOH_EVENT_ID_MAX + 1] {};
const std::size_t event_queue_depth_ = 10;
// Statuses for events supported.
rmw_zenoh_event_status_t event_statuses_[ZENOH_EVENT_ID_MAX + 1];
};
} // namespace rmw_zenoh_cpp

Expand Down
121 changes: 31 additions & 90 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <algorithm>
#include <array>
#include <functional>
#include <limits>
Expand Down Expand Up @@ -207,8 +206,6 @@ void GraphCache::handle_matched_events_for_put(
}
const liveliness::TopicInfo topic_info = entity->topic_info().value();
const bool is_pub = is_entity_pub(*entity);
// Initialize a map that will be populated with any QoS events that may be detected.
EntityEventMap local_entities_with_events = {};
// The entity added may be local with callbacks registered but there
// may be other local entities in the graph that are matched.
int32_t match_count_for_entity = 0;
Expand All @@ -229,18 +226,16 @@ void GraphCache::handle_matched_events_for_put(
if (entity->topic_info()->topic_keyexpr_ ==
sub_entity->topic_info().value().topic_keyexpr_)
{
update_event_counters(topic_info.name_, ZENOH_EVENT_SUBSCRIPTION_MATCHED, 1);
if (is_entity_local(*sub_entity)) {
local_entities_with_events[sub_entity].insert(ZENOH_EVENT_SUBSCRIPTION_MATCHED);
update_event_counters(sub_entity, ZENOH_EVENT_SUBSCRIPTION_MATCHED, 1);
}
}
}
// Update event counters for the new entity->
update_event_counters(topic_info.name_,
ZENOH_EVENT_PUBLICATION_MATCHED,
match_count_for_entity);
// Update event counters for the new entity.
if (is_entity_local(*entity) && match_count_for_entity > 0) {
local_entities_with_events[entity].insert(ZENOH_EVENT_PUBLICATION_MATCHED);
update_event_counters(entity,
ZENOH_EVENT_PUBLICATION_MATCHED,
match_count_for_entity);
}
} else {
// Entity is a sub.
Expand All @@ -259,23 +254,20 @@ void GraphCache::handle_matched_events_for_put(
if (entity->topic_info()->topic_keyexpr_ ==
pub_entity->topic_info().value().topic_keyexpr_)
{
update_event_counters(topic_info.name_, ZENOH_EVENT_PUBLICATION_MATCHED, 1);
if (is_entity_local(*pub_entity)) {
local_entities_with_events[pub_entity].insert(ZENOH_EVENT_PUBLICATION_MATCHED);
update_event_counters(pub_entity, ZENOH_EVENT_PUBLICATION_MATCHED, 1);
}
}
}
// Update event counters for the new entity->
update_event_counters(
topic_info.name_,
ZENOH_EVENT_SUBSCRIPTION_MATCHED,
match_count_for_entity);
// Update event counters for the new entity.
if (is_entity_local(*entity) && match_count_for_entity > 0) {
local_entities_with_events[entity].insert(ZENOH_EVENT_SUBSCRIPTION_MATCHED);
update_event_counters(
entity,
ZENOH_EVENT_SUBSCRIPTION_MATCHED,
match_count_for_entity);
}
}
}
take_entities_with_events(local_entities_with_events);
}

///=============================================================================
Expand All @@ -290,52 +282,27 @@ void GraphCache::handle_matched_events_for_del(
return;
}
const liveliness::TopicInfo topic_info = entity->topic_info().value();
EntityEventMap local_entities_with_events;
if (is_entity_pub(*entity)) {
// Notify any local subs of a matched event with change -1.
for (const auto & [_, topic_data_ptr] : topic_qos_map) {
for (liveliness::ConstEntityPtr sub_entity : topic_data_ptr->subs_) {
update_event_counters(
topic_info.name_,
ZENOH_EVENT_SUBSCRIPTION_MATCHED,
static_cast<int32_t>(-1));
if (is_entity_local(*sub_entity)) {
local_entities_with_events[sub_entity].insert(ZENOH_EVENT_SUBSCRIPTION_MATCHED);
update_event_counters(
sub_entity,
ZENOH_EVENT_SUBSCRIPTION_MATCHED,
static_cast<int32_t>(-1));
}
}
}
} else {
// Notify any local pubs of a matched event with change -1.
for (const auto & [_, topic_data_ptr] : topic_qos_map) {
for (liveliness::ConstEntityPtr pub_entity : topic_data_ptr->pubs_) {
update_event_counters(
topic_info.name_,
ZENOH_EVENT_PUBLICATION_MATCHED,
static_cast<int32_t>(-1));
if (is_entity_local(*pub_entity)) {
local_entities_with_events[pub_entity].insert(ZENOH_EVENT_PUBLICATION_MATCHED);
}
}
}
}
take_entities_with_events(local_entities_with_events);
}

///=============================================================================
void GraphCache::take_entities_with_events(const EntityEventMap & entities_with_events)
{
for (const auto & [local_entity, event_set] : entities_with_events) {
// Trigger callback set for this entity for the event type.
GraphEventCallbackMap::const_iterator event_callbacks_it =
event_callbacks_.find(local_entity->keyexpr_hash());
if (event_callbacks_it != event_callbacks_.end()) {
for (const rmw_zenoh_event_type_t & event_type : event_set) {
GraphEventCallbacks::const_iterator callback_it =
event_callbacks_it->second.find(event_type);
if (callback_it != event_callbacks_it->second.end()) {
std::unique_ptr<rmw_zenoh_event_status_t> taken_event =
take_event_status(local_entity->topic_info()->name_, event_type);
callback_it->second(std::move(taken_event));
update_event_counters(
pub_entity,
ZENOH_EVENT_PUBLICATION_MATCHED,
static_cast<int32_t>(-1));
}
}
}
Expand Down Expand Up @@ -1218,7 +1185,7 @@ void GraphCache::set_qos_event_callback(
const rmw_zenoh_event_type_t & event_type,
GraphCacheEventCallback callback)
{
std::lock_guard<std::mutex> lock(graph_mutex_);
std::lock_guard<std::mutex> lock(events_mutex_);

if (event_type > ZENOH_EVENT_ID_MAX) {
RMW_ZENOH_LOG_WARN_NAMED(
Expand All @@ -1239,7 +1206,7 @@ void GraphCache::set_qos_event_callback(
///=============================================================================
void GraphCache::remove_qos_event_callbacks(std::size_t entity_keyexpr_hash)
{
std::lock_guard<std::mutex> lock(graph_mutex_);
std::lock_guard<std::mutex> lock(events_mutex_);
event_callbacks_.erase(entity_keyexpr_hash);
}

Expand All @@ -1265,7 +1232,7 @@ bool GraphCache::is_entity_pub(const liveliness::Entity & entity)

///=============================================================================
void GraphCache::update_event_counters(
const std::string & topic_name,
liveliness::ConstEntityPtr entity,
const rmw_zenoh_event_type_t event_id,
int32_t change)
{
Expand All @@ -1275,42 +1242,16 @@ void GraphCache::update_event_counters(

std::lock_guard<std::mutex> lock(events_mutex_);

auto event_statuses_it = event_statuses_.find(topic_name);
if (event_statuses_it == event_statuses_.end()) {
// Initialize statuses.
std::array<rmw_zenoh_event_status_t, ZENOH_EVENT_ID_MAX + 1> status_array {};
event_statuses_[topic_name] = std::move(status_array);
}

rmw_zenoh_event_status_t & status_to_update = event_statuses_[topic_name][event_id];
status_to_update.total_count += std::max(0, change);
status_to_update.total_count_change += std::max(0, change);
status_to_update.current_count += change;
status_to_update.current_count_change = change;
}

///=============================================================================
std::unique_ptr<rmw_zenoh_event_status_t> GraphCache::take_event_status(
const std::string & topic_name,
const rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
return nullptr;
}

std::lock_guard<std::mutex> lock(events_mutex_);

auto event_statuses_it = event_statuses_.find(topic_name);
if (event_statuses_it == event_statuses_.end()) {
return nullptr;
// Trigger callback set for this entity for the event type.
GraphEventCallbackMap::const_iterator event_callbacks_it =
event_callbacks_.find(entity->keyexpr_hash());
if (event_callbacks_it != event_callbacks_.end()) {
GraphEventCallbacks::const_iterator callback_it =
event_callbacks_it->second.find(event_id);
if (callback_it != event_callbacks_it->second.end()) {
callback_it->second(change);
}
}

rmw_zenoh_event_status_t & status_to_take = event_statuses_[topic_name][event_id];
auto result = std::make_unique<rmw_zenoh_event_status_t>(status_to_take);
// Reset changes.
status_to_take.total_count_change = 0;
status_to_take.current_count_change = 0;
return result;
}

///=============================================================================
Expand Down
Loading
Loading