diff --git a/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp b/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp index 19d30327..6f128b47 100644 --- a/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp +++ b/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp @@ -12,13 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include #include -#include #include -#include #include @@ -33,7 +32,7 @@ namespace rmw_zenoh_cpp AttachmentData::AttachmentData( const int64_t sequence_number, const int64_t source_timestamp, - const std::vector source_gid) + const std::array source_gid) : sequence_number_(sequence_number), source_timestamp_(source_timestamp), source_gid_(source_gid), @@ -62,7 +61,7 @@ int64_t AttachmentData::source_timestamp() const } ///============================================================================= -std::vector AttachmentData::copy_gid() const +std::array AttachmentData::copy_gid() const { return source_gid_; } @@ -104,7 +103,7 @@ AttachmentData::AttachmentData(const zenoh::Bytes & bytes) if (source_gid_str != "source_gid") { throw std::runtime_error("source_gid is not found in the attachment."); } - this->source_gid_ = deserializer.deserialize>(); + this->source_gid_ = deserializer.deserialize>(); gid_hash_ = hash_gid(this->source_gid_); } } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp b/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp index 7f9aa3ee..64f444d1 100644 --- a/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp +++ b/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp @@ -15,8 +15,8 @@ #ifndef DETAIL__ATTACHMENT_HELPERS_HPP_ #define DETAIL__ATTACHMENT_HELPERS_HPP_ +#include #include -#include #include @@ -31,14 +31,14 @@ class AttachmentData final AttachmentData( const int64_t sequence_number, const int64_t source_timestamp, - const std::vector source_gid); + const std::array source_gid); explicit AttachmentData(const zenoh::Bytes & bytes); explicit AttachmentData(AttachmentData && data); int64_t sequence_number() const; int64_t source_timestamp() const; - std::vector copy_gid() const; + std::array copy_gid() const; size_t gid_hash() const; zenoh::Bytes serialize_to_zbytes(); @@ -46,7 +46,7 @@ class AttachmentData final private: int64_t sequence_number_; int64_t source_timestamp_; - std::vector source_gid_; + std::array source_gid_; size_t gid_hash_; }; } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index fef824c8..a1af2d46 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -1168,7 +1168,6 @@ rmw_ret_t GraphCache::get_entities_info_by_topic( } } - memset(ep.endpoint_gid, 0, RMW_GID_STORAGE_SIZE); memcpy(ep.endpoint_gid, entity->copy_gid().data(), RMW_GID_STORAGE_SIZE); endpoints.push_back(ep); diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index 214ab1b1..c1792091 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -437,7 +437,6 @@ Entity::Entity( // returned to the RMW layer as necessary. simplified_XXH128_hash_t keyexpr_gid = simplified_XXH3_128bits(this->liveliness_keyexpr_.c_str(), this->liveliness_keyexpr_.length()); - this->gid_.resize(RMW_GID_STORAGE_SIZE); memcpy(this->gid_.data(), &keyexpr_gid.low64, sizeof(keyexpr_gid.low64)); memcpy(this->gid_.data() + sizeof(keyexpr_gid.low64), &keyexpr_gid.high64, sizeof(keyexpr_gid.high64)); @@ -632,7 +631,7 @@ std::string Entity::liveliness_keyexpr() const } ///============================================================================= -std::vector Entity::copy_gid() const +std::array Entity::copy_gid() const { return gid_; } @@ -673,7 +672,7 @@ std::string demangle_name(const std::string & input) } // namespace liveliness ///============================================================================= -size_t hash_gid(const std::vector gid) +size_t hash_gid(const std::array gid) { std::stringstream hash_str; hash_str << std::hex; diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp index 8dbadfb8..81880df6 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp @@ -15,6 +15,7 @@ #ifndef DETAIL__LIVELINESS_UTILS_HPP_ #define DETAIL__LIVELINESS_UTILS_HPP_ +#include #include #include #include @@ -172,7 +173,7 @@ class Entity // Two entities are equal if their keyexpr_hash are equal. bool operator==(const Entity & other) const; - std::vector copy_gid() const; + std::array copy_gid() const; private: Entity( @@ -191,7 +192,7 @@ class Entity NodeInfo node_info_; std::optional topic_info_; std::string liveliness_keyexpr_; - std::vector gid_; + std::array gid_{}; }; ///============================================================================= @@ -234,7 +235,7 @@ std::optional keyexpr_to_qos(const std::string & keyexpr); } // namespace liveliness ///============================================================================= -size_t hash_gid(const std::vector gid); +size_t hash_gid(const std::array gid); } // namespace rmw_zenoh_cpp ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index 35093240..1c24570d 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -16,6 +16,7 @@ #include +#include #include #include #include @@ -141,11 +142,6 @@ std::shared_ptr ClientData::make( response_type_support }); - if (!client_data->init()) { - // init() already set the error. - return nullptr; - } - return client_data; } @@ -171,12 +167,6 @@ ClientData::ClientData( sequence_number_(1), is_shutdown_(false), initialized_(false) -{ - // Do nothing. -} - -///============================================================================= -bool ClientData::init() { std::string topic_keyexpr = this->entity_->topic_info().value().topic_keyexpr_; keyexpr_ = zenoh::KeyExpr(topic_keyexpr); @@ -190,12 +180,10 @@ bool ClientData::init() RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create liveliness token for the client."); - return false; + throw std::runtime_error("Unable to create liveliness token for the client."); } initialized_ = true; - - return true; } ///============================================================================= @@ -216,7 +204,7 @@ bool ClientData::liveliness_is_valid() const } ///============================================================================= -std::vector ClientData::copy_gid() const +std::array ClientData::copy_gid() const { return entity_->copy_gid(); } @@ -276,54 +264,55 @@ rmw_ret_t ClientData::take_response( const zenoh::Sample & sample = reply.get_ok(); // Object that manages the raw buffer - auto payload = sample.get_payload().as_vector(); - if (payload.size() > 0) { - eprosima::fastcdr::FastBuffer fastbuffer( - reinterpret_cast(const_cast(payload.data())), payload.size()); - - // Object that serializes the data - rmw_zenoh_cpp::Cdr deser(fastbuffer); - if (!response_type_support_->deserialize_ros_message( - deser.get_cdr(), - ros_response, - response_type_support_impl_)) - { - RMW_SET_ERROR_MSG("could not deserialize ROS response"); - return RMW_RET_ERROR; - } - - // Fill in the request_header - if (!sample.get_attachment().has_value()) { - RMW_ZENOH_LOG_DEBUG_NAMED( - "rmw_zenoh_cpp", - "ClientData take_request attachment is empty"); - return RMW_RET_ERROR; - } - rmw_zenoh_cpp::AttachmentData attachment(std::move(sample.get_attachment().value().get())); - request_header->request_id.sequence_number = attachment.sequence_number(); - if (request_header->request_id.sequence_number < 0) { - RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment"); - return RMW_RET_ERROR; - } - request_header->source_timestamp = attachment.source_timestamp(); - if (request_header->source_timestamp < 0) { - RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment"); - return RMW_RET_ERROR; - } - memcpy( - request_header->request_id.writer_guid, - attachment.copy_gid().data(), - RMW_GID_STORAGE_SIZE); - request_header->received_timestamp = latest_reply->get_received_timestamp(); - - *taken = true; - } else { + std::vector payload = sample.get_payload().as_vector(); + if (payload.size() == 0) { RMW_ZENOH_LOG_DEBUG_NAMED( "rmw_zenoh_cpp", "ClientData not able to get slice data"); return RMW_RET_ERROR; } + // Fill in the request_header + if (!sample.get_attachment().has_value()) { + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "ClientData take_request attachment is empty"); + return RMW_RET_ERROR; + } + + eprosima::fastcdr::FastBuffer fastbuffer( + reinterpret_cast(const_cast(payload.data())), payload.size()); + + // Object that serializes the data + rmw_zenoh_cpp::Cdr deser(fastbuffer); + if (!response_type_support_->deserialize_ros_message( + deser.get_cdr(), + ros_response, + response_type_support_impl_)) + { + RMW_SET_ERROR_MSG("could not deserialize ROS response"); + return RMW_RET_ERROR; + } + + rmw_zenoh_cpp::AttachmentData attachment(std::move(sample.get_attachment().value().get())); + request_header->request_id.sequence_number = attachment.sequence_number(); + if (request_header->request_id.sequence_number < 0) { + RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment"); + return RMW_RET_ERROR; + } + request_header->source_timestamp = attachment.source_timestamp(); + if (request_header->source_timestamp < 0) { + RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment"); + return RMW_RET_ERROR; + } + memcpy( + request_header->request_id.writer_guid, + attachment.copy_gid().data(), + RMW_GID_STORAGE_SIZE); + request_header->received_timestamp = latest_reply->get_received_timestamp(); + + *taken = true; + return RMW_RET_OK; } @@ -376,8 +365,7 @@ rmw_ret_t ClientData::send_request( // Send request zenoh::Session::GetOptions opts = zenoh::Session::GetOptions::create_default(); - std::vector local_gid; - local_gid = entity_->copy_gid(); + std::array local_gid = entity_->copy_gid(); opts.attachment = rmw_zenoh_cpp::create_map_and_set_sequence_num(*sequence_id, local_gid); opts.target = Z_QUERY_TARGET_ALL_COMPLETE; // The default timeout for a z_get query is 10 seconds and if a response is not received within @@ -425,11 +413,10 @@ rmw_ret_t ClientData::send_request( return; } - std::chrono::nanoseconds::rep received_timestamp = - std::chrono::system_clock::now().time_since_epoch().count(); + std::chrono::time_point now = std::chrono::system_clock::now(); sub_data->add_new_reply( - std::make_unique(reply, received_timestamp)); + std::make_unique(reply, now.time_since_epoch().count())); }, zenoh::closures::none, std::move(opts), diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp index d31ac343..44b187ba 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp @@ -15,6 +15,7 @@ #ifndef DETAIL__RMW_CLIENT_DATA_HPP_ #define DETAIL__RMW_CLIENT_DATA_HPP_ +#include #include #include #include @@ -23,7 +24,6 @@ #include #include #include -#include #include @@ -65,7 +65,7 @@ class ClientData final : public std::enable_shared_from_this bool liveliness_is_valid() const; // Copy the GID of this ClientData into an rmw_gid_t. - std::vector copy_gid() const; + std::array copy_gid() const; // Add a new ZenohReply to the queue. void add_new_reply(std::unique_ptr reply); @@ -114,9 +114,6 @@ class ClientData final : public std::enable_shared_from_this std::shared_ptr request_type_support, std::shared_ptr response_type_support); - // Initialize the Zenoh objects for this entity. - bool init(); - // Internal mutex. mutable std::recursive_mutex mutex_; // The parent node. diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index b91e7687..206d2d3f 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -96,8 +96,6 @@ class rmw_context_impl_s::Data final throw std::runtime_error("Error setting up zenoh session. "); } - rmw_ret_t ret = RMW_RET_ERROR; - // Verify if the zenoh router is running if configured. const std::optional configured_connection_attempts = rmw_zenoh_cpp::zenoh_router_check_attempts(); @@ -106,25 +104,23 @@ class rmw_context_impl_s::Data final // Retry until the connection is successful. constexpr std::chrono::milliseconds sleep_time(1000); constexpr int64_t ticks_between_print(std::chrono::milliseconds(1000) / sleep_time); - while (ret != RMW_RET_OK && connection_attempts < configured_connection_attempts.value()) { + do { + zenoh::ZResult result; + this->session_->get_routers_z_id(&result); + if (result == Z_OK) { + break; + } if ((connection_attempts % ticks_between_print) == 0) { RMW_ZENOH_LOG_WARN_NAMED( "rmw_zenoh_cpp", "Unable to connect to a Zenoh router. " "Have you started a router with `ros2 run rmw_zenoh_cpp rmw_zenohd`?"); } - zenoh::ZResult result; - this->session_->get_routers_z_id(&result); - if (result != Z_OK) { - ++connection_attempts; - } else { - ret = RMW_RET_OK; - } if (++connection_attempts >= configured_connection_attempts.value()) { break; } std::this_thread::sleep_for(sleep_time); - } + } while(connection_attempts < configured_connection_attempts.value()); } // Initialize the graph cache. diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index 3b112892..b228c7cb 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -16,6 +16,7 @@ #include +#include #include #include #include @@ -326,7 +327,7 @@ liveliness::TopicInfo PublisherData::topic_info() const return entity_->topic_info().value(); } -std::vector PublisherData::copy_gid() const +std::array PublisherData::copy_gid() const { std::lock_guard lock(mutex_); return entity_->copy_gid(); diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp index d5e8a502..52c22523 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp @@ -15,13 +15,13 @@ #ifndef DETAIL__RMW_PUBLISHER_DATA_HPP_ #define DETAIL__RMW_PUBLISHER_DATA_HPP_ +#include #include #include #include #include #include #include -#include #include @@ -70,7 +70,7 @@ class PublisherData final liveliness::TopicInfo topic_info() const; // Return a copy of the GID of this publisher. - std::vector copy_gid() const; + std::array copy_gid() const; // Returns true if liveliness token is still valid. bool liveliness_is_valid() const; diff --git a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp index a5c83c8d..9fe075a9 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp @@ -16,6 +16,7 @@ #include +#include #include #include #include @@ -248,7 +249,7 @@ void ServiceData::add_new_query(std::unique_ptr query) RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Query queue depth of %ld reached, discarding oldest Query " - "for service for %.*s", + "for service for %s", adapted_qos_profile.depth, keyexpr_.c_str()); query_queue_.pop_front(); @@ -331,11 +332,8 @@ rmw_ret_t ServiceData::take_request( return RMW_RET_ERROR; } - auto writter_gid_v = attachment.copy_gid(); - memcpy( - request_header->request_id.writer_guid, - writter_gid_v.data(), - RMW_GID_STORAGE_SIZE); + std::array writer_guid = attachment.copy_gid(); + memcpy(request_header->request_id.writer_guid, writer_guid.data(), RMW_GID_STORAGE_SIZE); request_header->source_timestamp = attachment.source_timestamp(); if (request_header->source_timestamp < 0) { @@ -345,7 +343,7 @@ rmw_ret_t ServiceData::take_request( request_header->received_timestamp = query->get_received_timestamp(); // Add this query to the map, so that rmw_send_response can quickly look it up later. - const size_t hash = rmw_zenoh_cpp::hash_gid(writter_gid_v); + const size_t hash = rmw_zenoh_cpp::hash_gid(writer_guid); std::unordered_map::iterator it = sequence_to_query_map_.find(hash); if (it == sequence_to_query_map_.end()) { SequenceToQuery stq; @@ -379,8 +377,7 @@ rmw_ret_t ServiceData::send_response( return RMW_RET_OK; } - std::vector writer_guid; - writer_guid.resize(RMW_GID_STORAGE_SIZE); + std::array writer_guid; memcpy(writer_guid.data(), request_id->writer_guid, RMW_GID_STORAGE_SIZE); // Create the queryable payload @@ -439,10 +436,8 @@ rmw_ret_t ServiceData::send_response( const zenoh::Query & loaned_query = query->get_query(); zenoh::Query::ReplyOptions options = zenoh::Query::ReplyOptions::create_default(); - std::vector writer_gid; - for (size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) { - writer_gid.push_back(request_id->writer_guid[RMW_GID_STORAGE_SIZE - 1 - i]); - } + std::array writer_gid; + memcpy(writer_gid.data(), request_id->writer_guid, RMW_GID_STORAGE_SIZE); options.attachment = create_map_and_set_sequence_num( request_id->sequence_number, writer_gid); diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp index 3e0f6a42..35173656 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp @@ -14,10 +14,10 @@ #include "zenoh_utils.hpp" +#include #include #include #include -#include #include "attachment_helpers.hpp" #include "rcpputils/scope_exit.hpp" @@ -42,7 +42,7 @@ ZenohSession::~ZenohSession() ///============================================================================= zenoh::Bytes create_map_and_set_sequence_num( - int64_t sequence_number, std::vector gid) + int64_t sequence_number, std::array gid) { auto now = std::chrono::system_clock::now().time_since_epoch(); auto now_ns = std::chrono::duration_cast(now); diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp index 4642799b..1b1efc87 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp @@ -17,10 +17,10 @@ #include +#include #include #include #include -#include #include "rmw/types.h" @@ -43,7 +43,7 @@ class ZenohSession final ///============================================================================= zenoh::Bytes create_map_and_set_sequence_num( - int64_t sequence_number, std::vector gid); + int64_t sequence_number, std::array gid); ///============================================================================= // A class to store the replies to service requests.