diff --git a/rmf_notification/CMakeLists.txt b/rmf_notification/CMakeLists.txt index cfbd285..692f709 100644 --- a/rmf_notification/CMakeLists.txt +++ b/rmf_notification/CMakeLists.txt @@ -24,6 +24,8 @@ include_directories( add_library(${PROJECT_NAME} SHARED src/notification_manager.cpp + src/http_client.cpp + src/websocket_client.cpp src/message.cpp ) diff --git a/rmf_notification/example/notification_node.cpp b/rmf_notification/example/notification_node.cpp index 568e858..ecfa894 100644 --- a/rmf_notification/example/notification_node.cpp +++ b/rmf_notification/example/notification_node.cpp @@ -16,6 +16,8 @@ #include "rmf_notification/message.hpp" #include "rmf_notification/notification_manager.hpp" +#include "rmf_notification/websocket_client.hpp" +#include "rmf_notification/http_client.hpp" const rclcpp::Logger LOGGER = rclcpp::get_logger("test_notification"); @@ -32,29 +34,35 @@ class TestNotification : public rclcpp::Node std::bind(&TestNotification::timer_callback, this)); std::string websocket_uri = "ws://localhost:8000/_internal"; std::string mobile_uri = "http://localhost:8000/notification/telegram"; - notification_manager_ptr_ = new rmf_notification::NotificationManager( - websocket_uri, - mobile_uri - ); + notify_ptr_ = rmf_notification::NotificationManager::get(); + notify_ptr_->create_client(websocket_uri); + notify_ptr_->create_client(mobile_uri); RCLCPP_INFO(LOGGER, "Starting test notification node"); } ~TestNotification() { timer_->cancel(); - delete notification_manager_ptr_; } private: void timer_callback() { + auto connections = notify_ptr_->get_connections(); + RCLCPP_INFO(LOGGER, "Connections:"); + for (auto & connection : connections) { + RCLCPP_INFO( + LOGGER, "- endpoint_uri: %s, connected: %s", + connection.first.c_str(), + (connection.second ? "true" : "false")); + } counter_ = counter_ + 1; std::string message = "test message: " + std::to_string(counter_); - notification_manager_ptr_->publish_state(message, "maintenance_log_update"); + notify_ptr_->publish(message, "maintenance_log_update"); } rclcpp::TimerBase::SharedPtr timer_; - rmf_notification::NotificationManager * notification_manager_ptr_; + std::shared_ptr notify_ptr_; int counter_; }; diff --git a/rmf_notification/include/rmf_notification/http_client.hpp b/rmf_notification/include/rmf_notification/http_client.hpp new file mode 100644 index 0000000..c17a243 --- /dev/null +++ b/rmf_notification/include/rmf_notification/http_client.hpp @@ -0,0 +1,48 @@ +// Copyright 2023 ROS Industrial Consortium Asia Pacific +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RMF_NOTIFICATION__HTTP_CLIENT_HPP_ +#define RMF_NOTIFICATION__HTTP_CLIENT_HPP_ + +#include +#include + +#include "rmf_notification/notification_client_base.hpp" + +namespace rmf_notification +{ + +class HTTPClient : public NotificationClientBase +{ +public: + HTTPClient(); + virtual ~HTTPClient(); + void init(const std::string & endpoint_uri) override; + + bool is_connected() const override; + + void publish( + const Message & message) override; + + void shutdown() override; + + class Impl; + +private: + std::unique_ptr impl_ptr_; +}; + +} // namespace rmf_notification + +#endif // RMF_NOTIFICATION__HTTP_CLIENT_HPP_ diff --git a/rmf_notification/include/rmf_notification/message.hpp b/rmf_notification/include/rmf_notification/message.hpp index 1e3e3b0..6964d7b 100644 --- a/rmf_notification/include/rmf_notification/message.hpp +++ b/rmf_notification/include/rmf_notification/message.hpp @@ -21,9 +21,7 @@ namespace rmf_notification { -enum ConnectionState { CONNECTED, DISCONNECTED }; -enum NotificationType { WEBSOCKET, REST }; - +/// Message wrapper for notifications struct Message { explicit Message( diff --git a/rmf_notification/include/rmf_notification/notification_client_base.hpp b/rmf_notification/include/rmf_notification/notification_client_base.hpp new file mode 100644 index 0000000..cb52de7 --- /dev/null +++ b/rmf_notification/include/rmf_notification/notification_client_base.hpp @@ -0,0 +1,40 @@ +// Copyright 2023 ROS Industrial Consortium Asia Pacific +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RMF_NOTIFICATION__NOTIFICATION_CLIENT_BASE_HPP_ +#define RMF_NOTIFICATION__NOTIFICATION_CLIENT_BASE_HPP_ + +#include +#include "rmf_notification/message.hpp" + +namespace rmf_notification +{ + +class NotificationClientBase +{ +public: + virtual ~NotificationClientBase() {} + virtual void init( + const std::string & endpoint_uri) = 0; + + virtual bool is_connected() const = 0; + + virtual void publish( + const Message & message) = 0; + + virtual void shutdown() = 0; +}; +} // namespace rmf_notification + +#endif // RMF_NOTIFICATION__NOTIFICATION_CLIENT_BASE_HPP_ diff --git a/rmf_notification/include/rmf_notification/notification_manager.hpp b/rmf_notification/include/rmf_notification/notification_manager.hpp index d569d8a..ac5b1bf 100644 --- a/rmf_notification/include/rmf_notification/notification_manager.hpp +++ b/rmf_notification/include/rmf_notification/notification_manager.hpp @@ -19,9 +19,11 @@ #include #include #include +#include #include #include "rmf_notification/message.hpp" +#include "rmf_notification/notification_client_base.hpp" namespace rmf_notification { @@ -29,27 +31,88 @@ namespace rmf_notification class NotificationManager { public: - NotificationManager( - std::string websocket_uri, - std::string mobile_uri); - std::string publish_state( + /// Method to get singleton instance of notification manager + /** + * This method is threadsafe. It has mutex lock which allows only one + * thread to access this method at the same time frame + * + * \return[out] Shared pointer instance of notification manager + */ + static std::shared_ptr get(); + + /// Method to publish notifications via both websocket and telegram + /** + * This method is asynchronous type as the message to be sent is stored in a + * local cache and sent over by separate threads + * + * \param[in] payload message to be sent via notification system + * \param[in] type categories for different notifications + * \return[out] Id assigned to the messages to be sent + */ + std::string publish( const std::string payload, const std::string type); - std::vector> check_connection_state(); - void clear_messages(); - ~NotificationManager(); + + /// Method to check connection status with both websocket and telegram modules + /** + * For websocket live connection tracking is performed which means the connection status + * changes to disconnected on immediate disconnection. For telegram connection status + * is updated only when a new request is triggered + * + * \return[out] Vector specifying the endpoint uri and it status + */ + std::unordered_map get_connections() const; + + /// Method to create a client to a specified uri + /** + * This method is NOT thread safe + */ + template + std::shared_ptr create_client( + const std::string & uri, + ArgsT &&... args); + + virtual ~NotificationManager(); private: - class WebsocketImplementation; - class MobileImplementation; - - std::string websocket_uri_; - std::string mobile_uri_; - std::unique_ptr websocket_pimpl_; - std::unique_ptr mobile_pimpl_; - std::unordered_map message_map_; + // unique pointer to store instance for mobile notification manager + std::unordered_map> clients_; + + // shared pointer to store singleton instance for notification manager + static std::shared_ptr instance_; + // mutex lock which is used when acquiring instance of notification manager object + static std::mutex m_instance_; + + /// Constructor + NotificationManager(); + // Not copiable + NotificationManager(const NotificationManager &) = delete; + NotificationManager & operator=(const NotificationManager &) = delete; }; +template +std::shared_ptr NotificationManager::create_client( + const std::string & uri, + ArgsT &&... args) +{ + static_assert( + std::is_base_of_v, + "Notification Client must be derived from NotificationClientBase" + ); + + // use a local variable to mimic the constructor + auto ptr = std::make_shared(std::forward(args)...); + + // Initialize the client + ptr->init(uri); + + clients_.emplace( + uri, + std::static_pointer_cast(ptr)); + + return ptr; +} + } // namespace rmf_notification #endif // RMF_NOTIFICATION__NOTIFICATION_MANAGER_HPP_ diff --git a/rmf_notification/include/rmf_notification/websocket_client.hpp b/rmf_notification/include/rmf_notification/websocket_client.hpp new file mode 100644 index 0000000..437d074 --- /dev/null +++ b/rmf_notification/include/rmf_notification/websocket_client.hpp @@ -0,0 +1,48 @@ +// Copyright 2023 ROS Industrial Consortium Asia Pacific +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RMF_NOTIFICATION__WEBSOCKET_CLIENT_HPP_ +#define RMF_NOTIFICATION__WEBSOCKET_CLIENT_HPP_ + +#include +#include + +#include "rmf_notification/notification_client_base.hpp" + +namespace rmf_notification +{ + +class WebsocketClient : public NotificationClientBase +{ +public: + WebsocketClient(); + virtual ~WebsocketClient(); + void init(const std::string & endpoint_uri) override; + + bool is_connected() const override; + + void publish( + const Message & message) override; + + void shutdown() override; + + class Impl; + +private: + std::unique_ptr impl_ptr_; +}; + +} // namespace rmf_notification + +#endif // RMF_NOTIFICATION__WEBSOCKET_CLIENT_HPP_ diff --git a/rmf_notification/src/http_client.cpp b/rmf_notification/src/http_client.cpp new file mode 100644 index 0000000..cd3d129 --- /dev/null +++ b/rmf_notification/src/http_client.cpp @@ -0,0 +1,168 @@ +// Copyright 2023 ROS Industrial Consortium Asia Pacific +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "cpprest/http_client.h" + +#include "rmf_notification/http_client.hpp" +#include "rmf_scheduler/log.hpp" + +namespace rmf_notification +{ + +class HTTPClient::Impl +{ +public: + using http_request = web::http::http_request; + using http_response = web::http::http_response; + using http_client = web::http::client::http_client; + + void init(const std::string & url) + { + // creating http client instance + http_client_ptr_ = std::make_shared(U(url)); + // starting new thread for handling request process in background + shutdown_ = false; + connected_ = false; + processing_thread_ = std::thread( + [this]() + { + RS_LOG_INFO( + "starting mobile notification background runner in processing thread"); + runner(); + }); + } + + void publish(const Message & message) + { + RS_LOG_INFO( + "mobile notification request raised for message with id: %s", + message.message_id.c_str()); + std::unique_lock lock(notify_mtx_); + message_process_queue_.push(message); + notify_trigger_.notify_one(); + } + + bool is_connected() const + { + return connected_; + } + + bool is_shutdown() + { + return shutdown_; + } + + void shutdown() + { + shutdown_ = true; + notify_trigger_.notify_one(); + if (processing_thread_.joinable()) { + processing_thread_.join(); + } + connected_ = false; + } + +private: + void runner() + { + while (!shutdown_) { + std::unique_lock lock(notify_mtx_); + + while (!message_process_queue_.empty()) { + auto message = message_process_queue_.front(); + + web::http::uri_builder builder(U("")); + builder.append_query(U("message"), message.payload); + builder.append_query(U("type"), message.type); + + bool status = false; + + try { + http_client_ptr_->request( + web::http::methods::POST, builder.to_string()).then( + [message, &status](http_response response) + { + if (response.status_code() < 400) { + RS_LOG_INFO( + "HTTP notification sent for message with id: %s", + message.message_id.c_str()); + status = true; + } else { + RS_LOG_ERROR( + "HTTP notification error for message with id: %s", + message.message_id.c_str()); + status = false; + } + }).wait(); + } catch (const std::exception & e) { + RS_LOG_ERROR("HTTP notification error: %s", e.what()); + connected_ = false; + break; + } + + connected_ = status; + if (status) { + message_process_queue_.pop(); + } else { + break; + } + } + + notify_trigger_.wait(lock); + } + } + std::shared_ptr http_client_ptr_; + std::queue message_process_queue_; + std::mutex notify_mtx_; + std::condition_variable notify_trigger_; + std::thread processing_thread_; + std::atomic_bool connected_ = false; + std::atomic_bool shutdown_ = true; +}; + +HTTPClient::HTTPClient() +: impl_ptr_(std::make_unique()) +{ +} + +HTTPClient::~HTTPClient() +{ + if (!impl_ptr_->is_shutdown()) { + impl_ptr_->shutdown(); + } +} + +void HTTPClient::init(const std::string & endpoint_uri) +{ + return impl_ptr_->init(endpoint_uri); +} + +bool HTTPClient::is_connected() const +{ + return impl_ptr_->is_connected(); +} + +void HTTPClient::publish(const Message & message) +{ + impl_ptr_->publish(message); +} + +void HTTPClient::shutdown() +{ + impl_ptr_->shutdown(); +} + +} // namespace rmf_notification diff --git a/rmf_notification/src/notification_manager.cpp b/rmf_notification/src/notification_manager.cpp index c49b3b7..db29206 100644 --- a/rmf_notification/src/notification_manager.cpp +++ b/rmf_notification/src/notification_manager.cpp @@ -13,7 +13,6 @@ // limitations under the License. #include -#include #include #include "cpprest/http_client.h" @@ -22,7 +21,6 @@ #include "websocketpp/config/asio_client.hpp" #include "websocketpp/client.hpp" - #include "rmf_scheduler/utils/uuid.hpp" #include "rmf_scheduler/log.hpp" #include "rmf_notification/notification_manager.hpp" @@ -30,308 +28,29 @@ namespace rmf_notification { -using http_request = web::http::http_request; -using http_response = web::http::http_response; -using http_client = web::http::client::http_client; - -using asio_client = websocketpp::config::asio_client; +std::shared_ptr NotificationManager::instance_(nullptr); +std::mutex NotificationManager::m_instance_; -class NotificationManager::WebsocketImplementation +std::shared_ptr NotificationManager::get() { -public: - WebsocketImplementation( - std::string & websocket_uri, - std::unordered_map & message_map) - : websocket_uri_(websocket_uri), message_map_(message_map) - { - websocket_client_ptr_ = std::make_unique>(); - websocket_client_ptr_->clear_access_channels(websocketpp::log::alevel::all); - websocket_client_ptr_->clear_error_channels(websocketpp::log::elevel::all); - - websocket_client_ptr_->init_asio(); - websocket_client_ptr_->start_perpetual(); - - // websocket handler callback to be used when new connection is open - websocket_client_ptr_->set_open_handler( - [this]( - websocketpp::connection_hdl hdl) - { - RS_LOG_INFO("websocket connection is open"); - hdl_ptr_ = hdl.lock(); - connected_ = true; - }); - - // websocket handler callback to be used when existing connection is closed - websocket_client_ptr_->set_close_handler( - [this]( - websocketpp::connection_hdl) - { - RS_LOG_INFO("websocket connection is closed"); - connected_ = false; - }); - - // websocket handler callback to be used when connection request is failed - websocket_client_ptr_->set_fail_handler( - [this]( - websocketpp::connection_hdl) - { - RS_LOG_INFO("websocket connection request failed"); - connected_ = false; - }); - - websocketpp::lib::error_code ec_for_new; - websocket_client_ptr_->connect( - websocket_client_ptr_->get_connection(websocket_uri_, ec_for_new)); - if (ec_for_new) { - RS_LOG_ERROR( - "new websocket connection failed for reason: %s", - ec_for_new.message().c_str()); - } - - // new thread for handling websocket client connections - client_thread_ = std::thread( - [this]() - { - RS_LOG_INFO("starting websocket run inside client thread"); - websocket_client_ptr_->run(); - }); - - // new thread for background processing of messages to be sent via websocket client - processing_thread_ = std::thread( - [this]() - { - RS_LOG_INFO("starting websocket background runner in processing thread"); - runner(); - }); - } - - void publish_state(std::string & message_id) - { - RS_LOG_INFO( - "websocket notification request raised for message with id: %s", - message_id.c_str()); - std::unique_lock lock(notify_mtx_); - message_process_queue_.push(message_id); - notify_trigger_.notify_one(); + std::lock_guard lock(m_instance_); + if (!instance_) { + std::shared_ptr temp_instance( + new NotificationManager()); + instance_.swap(temp_instance); } + return instance_; +} - bool check_connection_state() - { - return connected_; - } - - ~WebsocketImplementation() - { - shutdown_ = true; - // triggering continuation of background runner for starting shutdown process - notify_trigger_.notify_one(); - // closing websocket connection - websocket_client_ptr_->close( - hdl_ptr_, - websocketpp::close::status::going_away, - ""); - // stopping all websocket process - websocket_client_ptr_->stop(); - - // waiting for processing thread to complete its run - if (processing_thread_.joinable()) { - processing_thread_.join(); - } - // waiting for client thread to complete its run - if (client_thread_.joinable()) { - client_thread_.join(); - } - } - -private: - std::string & websocket_uri_; - std::unordered_map & message_map_; - std::unique_ptr> websocket_client_ptr_; - std::weak_ptr hdl_ptr_; - std::queue message_process_queue_; - std::mutex notify_mtx_; - std::condition_variable notify_trigger_; - std::thread processing_thread_; - std::thread client_thread_; - bool connected_; - bool shutdown_; - - void runner() - { - while (!shutdown_) { - // acquiring lock for notification - std::unique_lock lock(notify_mtx_); - - // trying to reconnect if disconnected - if (!connected_) { - websocketpp::lib::error_code ec_for_reconnection; - websocket_client_ptr_->connect( - websocket_client_ptr_->get_connection( - websocket_uri_, - ec_for_reconnection - )); - - if (ec_for_reconnection) { - RS_LOG_INFO( - "reconnecting websocket failed for reason: %s", - ec_for_reconnection.message().c_str()); - } - } - - // looping through the message process queue for sending them over websocket - while (!message_process_queue_.empty() && connected_) { - std::string message_id = message_process_queue_.front(); - auto message = message_map_.at(message_id); - - nlohmann::json msg; - msg["type"] = message.type; - msg["payload"] = message.payload; - msg["timestamp"] = message.timestamp; - msg["message_id"] = message.message_id; - - websocketpp::lib::error_code ec_for_sending; - websocket_client_ptr_->send( - hdl_ptr_, - msg.dump(), - websocketpp::frame::opcode::text, - ec_for_sending - ); - - if (ec_for_sending) { - RS_LOG_ERROR( - "unable to publish websocket message: %s", - ec_for_sending.message().c_str()); - } else { - RS_LOG_INFO( - "websocket broacasted message with id: %s", - message.message_id.c_str()); - message_process_queue_.pop(); - } - } - - // releasing the lock and waiting for notify from other methods - notify_trigger_.wait(lock); - } - } -}; - -class NotificationManager::MobileImplementation +NotificationManager::NotificationManager() { -public: - MobileImplementation( - std::string & mobile_uri, - std::unordered_map & message_map) - : message_map_(message_map) - { - // creating http client instance - http_client_ptr_ = std::make_shared(U(mobile_uri)); - // starting new thread for handling request process in background - processing_thread_ = std::thread( - [this]() - { - RS_LOG_INFO( - "starting mobile notification background runner in processing thread"); - runner(); - }); - } - - void publish_state(std::string & message_id) - { - RS_LOG_INFO( - "mobile notification request raised for message with id: %s", - message_id.c_str()); - std::unique_lock lock(notify_mtx_); - message_process_queue_.push(message_id); - notify_trigger_.notify_one(); - } - - bool check_connection_state() - { - return connected_; - } - - ~MobileImplementation() - { - shutdown_ = true; - notify_trigger_.notify_one(); - if (processing_thread_.joinable()) { - processing_thread_.join(); - } - } - -private: - std::unordered_map & message_map_; - std::shared_ptr http_client_ptr_; - std::queue message_process_queue_; - std::mutex notify_mtx_; - std::condition_variable notify_trigger_; - std::thread processing_thread_; - bool connected_; - bool shutdown_; - - void runner() - { - while (!shutdown_) { - std::unique_lock lock(notify_mtx_); - - while (!message_process_queue_.empty()) { - std::string message_id = message_process_queue_.front(); - auto message = message_map_.at(message_id); - - web::http::uri_builder builder(U("")); - builder.append_query(U("message"), message.payload); - builder.append_query(U("type"), message.type); - - bool status = false; - - try { - http_client_ptr_->request( - web::http::methods::POST, builder.to_string()).then( - [message, &status](http_response response) - { - if (response.status_code() < 400) { - RS_LOG_INFO( - "mobile notification sent for message with id: %s", - message.message_id.c_str()); - status = true; - } else { - RS_LOG_ERROR( - "mobile notification error for message with id: %s", - message.message_id.c_str()); - status = false; - } - }).wait(); - } catch (const std::exception & e) { - RS_LOG_ERROR("mobile notification error: %s", e.what()); - break; - } - - if (status) { - message_process_queue_.pop(); - } else { - break; - } - } - - notify_trigger_.wait(lock); - } - } -}; +} -NotificationManager::NotificationManager( - std::string websocket_uri, - std::string mobile_uri) -: websocket_uri_(websocket_uri), mobile_uri_(mobile_uri) +NotificationManager::~NotificationManager() { - websocket_pimpl_ = std::make_unique( - websocket_uri_, - message_map_); - mobile_pimpl_ = std::make_unique( - mobile_uri_, - message_map_); } -std::string NotificationManager::publish_state( +std::string NotificationManager::publish( const std::string payload, const std::string type) { @@ -339,37 +58,19 @@ std::string NotificationManager::publish_state( message.payload = payload; message.type = type; message.message_id = rmf_scheduler::utils::gen_uuid(); - message_map_[message.message_id] = message; - websocket_pimpl_->publish_state(message.message_id); - mobile_pimpl_->publish_state(message.message_id); + for (auto & client : clients_) { + client.second->publish(message); + } return message.message_id; } -std::vector> NotificationManager::check_connection_state() +std::unordered_map NotificationManager::get_connections() const { - std::pair websocket_connection_state; - websocket_connection_state.first = NotificationType::WEBSOCKET; - websocket_connection_state.second = websocket_pimpl_->check_connection_state() ? - ConnectionState::CONNECTED : ConnectionState::DISCONNECTED; - - std::pair mobile_connection_state; - mobile_connection_state.first = NotificationType::REST; - mobile_connection_state.second = mobile_pimpl_->check_connection_state() ? - ConnectionState::CONNECTED : ConnectionState::DISCONNECTED; - - std::vector> results; - results.push_back(websocket_connection_state); - results.push_back(mobile_connection_state); - + std::unordered_map results; + for (auto & client : clients_) { + results[client.first] = client.second->is_connected(); + } return results; } -void NotificationManager::clear_messages() -{ - message_map_.clear(); -} - -NotificationManager::~NotificationManager() {} - } // namespace rmf_notification diff --git a/rmf_notification/src/websocket_client.cpp b/rmf_notification/src/websocket_client.cpp new file mode 100644 index 0000000..3321b52 --- /dev/null +++ b/rmf_notification/src/websocket_client.cpp @@ -0,0 +1,245 @@ +// Copyright 2023 ROS Industrial Consortium Asia Pacific +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "websocketpp/config/asio_client.hpp" +#include "websocketpp/client.hpp" + +#include "nlohmann/json.hpp" + +#include "rmf_notification/websocket_client.hpp" +#include "rmf_scheduler/log.hpp" + +namespace rmf_notification +{ + +class WebsocketClient::Impl +{ +public: + using asio_client = websocketpp::config::asio_client; + + void init(const std::string & websocket_uri) + { + websocket_uri_ = websocket_uri; + websocket_client_ptr_ = std::make_unique>(); + websocket_client_ptr_->clear_access_channels(websocketpp::log::alevel::all); + websocket_client_ptr_->clear_error_channels(websocketpp::log::elevel::all); + + websocket_client_ptr_->init_asio(); + websocket_client_ptr_->start_perpetual(); + + // websocket handler callback to be used when new connection is open + websocket_client_ptr_->set_open_handler( + [this]( + websocketpp::connection_hdl hdl) + { + RS_LOG_INFO("websocket connection is open"); + hdl_ptr_ = hdl.lock(); + connected_ = true; + }); + + // websocket handler callback to be used when existing connection is closed + websocket_client_ptr_->set_close_handler( + [this]( + websocketpp::connection_hdl) + { + RS_LOG_INFO("websocket connection is closed"); + connected_ = false; + }); + + // websocket handler callback to be used when connection request is failed + websocket_client_ptr_->set_fail_handler( + [this]( + websocketpp::connection_hdl) + { + RS_LOG_INFO("websocket connection request failed"); + connected_ = false; + }); + + websocketpp::lib::error_code ec_for_new; + websocket_client_ptr_->connect( + websocket_client_ptr_->get_connection(websocket_uri, ec_for_new)); + if (ec_for_new) { + RS_LOG_ERROR( + "new websocket connection failed for reason: %s", + ec_for_new.message().c_str()); + } + + // new thread for handling websocket client connections + client_thread_ = std::thread( + [this]() + { + RS_LOG_INFO("starting websocket run inside client thread"); + websocket_client_ptr_->run(); + }); + + // new thread for background processing of messages to be sent via websocket client + shutdown_ = false; + processing_thread_ = std::thread( + [this]() + { + RS_LOG_INFO("starting websocket background runner in processing thread"); + runner(); + }); + } + + void publish(const Message & message) + { + RS_LOG_INFO( + "websocket notification request raised for message with id: %s", + message.message_id.c_str()); + std::unique_lock lock(notify_mtx_); + message_process_queue_.push(message); + notify_trigger_.notify_one(); + } + + bool is_connected() const + { + return connected_; + } + + bool is_shutdown() const + { + return shutdown_; + } + + void shutdown() + { + shutdown_ = true; + // triggering continuation of background runner for starting shutdown process + notify_trigger_.notify_one(); + // closing websocket connection + if (connected_) { + websocket_client_ptr_->close( + hdl_ptr_, + websocketpp::close::status::going_away, + ""); + } + // stopping all websocket process + websocket_client_ptr_->stop(); + + // waiting for processing thread to complete its run + if (processing_thread_.joinable()) { + processing_thread_.join(); + } + // waiting for client thread to complete its run + if (client_thread_.joinable()) { + client_thread_.join(); + } + } + +private: + void runner() + { + while (!shutdown_) { + // acquiring lock for notification + std::unique_lock lock(notify_mtx_); + + // trying to reconnect if disconnected + if (!connected_) { + websocketpp::lib::error_code ec_for_reconnection; + websocket_client_ptr_->connect( + websocket_client_ptr_->get_connection( + websocket_uri_, + ec_for_reconnection + )); + + if (ec_for_reconnection) { + RS_LOG_INFO( + "reconnecting websocket failed for reason: %s", + ec_for_reconnection.message().c_str()); + } + } + + // looping through the message process queue for sending them over websocket + while (!message_process_queue_.empty() && connected_) { + auto message = message_process_queue_.front(); + + nlohmann::json msg; + msg["type"] = message.type; + msg["payload"] = message.payload; + msg["timestamp"] = message.timestamp; + msg["message_id"] = message.message_id; + + websocketpp::lib::error_code ec_for_sending; + websocket_client_ptr_->send( + hdl_ptr_, + msg.dump(), + websocketpp::frame::opcode::text, + ec_for_sending + ); + + if (ec_for_sending) { + RS_LOG_ERROR( + "unable to publish websocket message: %s", + ec_for_sending.message().c_str()); + } else { + RS_LOG_INFO( + "websocket broacasted message with id: %s", + message.message_id.c_str()); + message_process_queue_.pop(); + } + } + + // releasing the lock and waiting for notify from other methods + notify_trigger_.wait(lock); + } + } + + std::string websocket_uri_; + std::unique_ptr> websocket_client_ptr_; + std::weak_ptr hdl_ptr_; + std::queue message_process_queue_; + std::mutex notify_mtx_; + std::condition_variable notify_trigger_; + std::thread processing_thread_; + std::thread client_thread_; + + std::atomic_bool connected_ = false; + std::atomic_bool shutdown_ = true; +}; + +WebsocketClient::WebsocketClient() +: impl_ptr_(std::make_unique()) +{ +} + +WebsocketClient::~WebsocketClient() +{ + if (!impl_ptr_->is_shutdown()) { + impl_ptr_->shutdown(); + } +} + +void WebsocketClient::init( + const std::string & endpoint_uri) +{ + impl_ptr_->init(endpoint_uri); +} + +bool WebsocketClient::is_connected() const +{ + return impl_ptr_->is_connected(); +} + +void WebsocketClient::publish(const Message & message) +{ + impl_ptr_->publish(message); +} + +void WebsocketClient::shutdown() +{ + impl_ptr_->shutdown(); +} + +} // namespace rmf_notification