From dff35d5c40f9224dd4c092ca9e018bf4f05a7df7 Mon Sep 17 00:00:00 2001 From: Jari Nippula Date: Wed, 25 Oct 2023 15:19:28 +0300 Subject: [PATCH 1/8] Revert "Add ulog_stream_acked" This reverts commit 99e99f9cf6f05d6ae19ca62fbd0fdfbda9604405. --- msg/CMakeLists.txt | 1 - msg/UlogStreamAck.msg | 2 +- msg/UlogStreamAcked.msg | 19 ------------------- 3 files changed, 1 insertion(+), 21 deletions(-) delete mode 100644 msg/UlogStreamAcked.msg diff --git a/msg/CMakeLists.txt b/msg/CMakeLists.txt index 049ce8028ed3..fdbe1b94af24 100644 --- a/msg/CMakeLists.txt +++ b/msg/CMakeLists.txt @@ -192,7 +192,6 @@ set(msg_files UavcanParameterValue.msg UlogStream.msg UlogStreamAck.msg - UlogStreamAcked.msg VehicleAcceleration.msg VehicleAirData.msg VehicleAngularAccelerationSetpoint.msg diff --git a/msg/UlogStreamAck.msg b/msg/UlogStreamAck.msg index 9816d9bc0fdc..e3747fff6ea6 100644 --- a/msg/UlogStreamAck.msg +++ b/msg/UlogStreamAck.msg @@ -1,4 +1,4 @@ -# Ack a previously sent ulog_stream_acked message that had +# Ack a previously sent ulog_stream message that had # the NEED_ACK flag set uint64 timestamp # time since system start (microseconds) diff --git a/msg/UlogStreamAcked.msg b/msg/UlogStreamAcked.msg deleted file mode 100644 index 136b7b2dfe76..000000000000 --- a/msg/UlogStreamAcked.msg +++ /dev/null @@ -1,19 +0,0 @@ -# Message to stream ULog data from the logger. Corresponds to the LOGGING_DATA_ACKED -# mavlink message - -uint64 timestamp # time since system start (microseconds) - -# flags bitmasks -uint8 FLAGS_NEED_ACK = 1 # if set, this message requires to be acked. - # Acked messages are published synchronous: a - # publisher waits for an ack before sending the - # next message - -uint8 length # length of data -uint8 first_message_offset # offset into data where first message starts. This - # can be used for recovery, when a previous message got lost -uint16 msg_sequence # allows determine drops -uint8 flags # see FLAGS_* -uint8[249] data # ulog data - -uint8 ORB_QUEUE_LENGTH = 16 # TODO: we might be able to reduce this if mavlink polled on the topic From ccd604802b5a62087bf9c39846532e4aa45b4f1e Mon Sep 17 00:00:00 2001 From: Jari Nippula Date: Wed, 25 Oct 2023 15:20:02 +0300 Subject: [PATCH 2/8] Revert "mavlink_ulog: custom mavlink logging" This reverts commit 0a0777217780a7e00b1839db4e2cc4f0add89843. --- src/modules/mavlink/CMakeLists.txt | 5 -- src/modules/mavlink/Kconfig | 7 --- src/modules/mavlink/mavlink_ulog.cpp | 90 +--------------------------- src/modules/mavlink/mavlink_ulog.h | 8 +-- 4 files changed, 2 insertions(+), 108 deletions(-) diff --git a/src/modules/mavlink/CMakeLists.txt b/src/modules/mavlink/CMakeLists.txt index 599ec7e63e71..45f7737e6e03 100644 --- a/src/modules/mavlink/CMakeLists.txt +++ b/src/modules/mavlink/CMakeLists.txt @@ -31,10 +31,6 @@ # ############################################################################ -if(CONFIG_MAVLINK_PARALLEL_LOGGING) - set(MAVLINK_PARALLEL_COMPILE_FLAG "-DMAVLINK_PARALLEL_LOGGING") -endif() - set(MAVLINK_GIT_DIR "${CMAKE_CURRENT_LIST_DIR}/mavlink") set(MAVLINK_LIBRARY_DIR "${CMAKE_BINARY_DIR}/mavlink") file(RELATIVE_PATH MAVLINK_GIT_DIR_RELATIVE ${CMAKE_SOURCE_DIR} ${MAVLINK_GIT_DIR}) @@ -109,7 +105,6 @@ px4_add_module( MAIN mavlink COMPILE_FLAGS -Wno-enum-compare # ROTATION <-> MAV_SENSOR_ROTATION - ${MAVLINK_PARALLEL_COMPILE_FLAG} #-DDEBUG_BUILD SRCS mavlink.c diff --git a/src/modules/mavlink/Kconfig b/src/modules/mavlink/Kconfig index 7adde83e84cb..8fb9219d9d61 100644 --- a/src/modules/mavlink/Kconfig +++ b/src/modules/mavlink/Kconfig @@ -24,10 +24,3 @@ menuconfig USER_MAVLINK depends on BOARD_PROTECTED && MODULES_MAVLINK ---help--- Put mavlink in userspace memory - -menuconfig MAVLINK_PARALLEL_LOGGING - bool "Custom mavlink logging protocol" - default n - depends on MODULES_MAVLINK - ---help--- - Custom mavlink logging protocol for speed up logging start phase diff --git a/src/modules/mavlink/mavlink_ulog.cpp b/src/modules/mavlink/mavlink_ulog.cpp index 161ee0eae3eb..64b9ec981d20 100644 --- a/src/modules/mavlink/mavlink_ulog.cpp +++ b/src/modules/mavlink/mavlink_ulog.cpp @@ -60,15 +60,6 @@ MavlinkULog::MavlinkULog(int datarate, float max_rate_factor, uint8_t target_sys } -#ifdef MAVLINK_PARALLEL_LOGGING - - PX4_INFO("[Mavlink_ulog] Parallel logging enabled"); - - while (_ulog_stream_acked_sub.update()) { - - } - -#endif _waiting_for_initial_ack = true; _last_sent_time = hrt_absolute_time(); //(ab)use this timestamp during initialization _next_rate_check = _last_sent_time + _rate_calculation_delta_t; @@ -93,7 +84,7 @@ int MavlinkULog::handle_update(mavlink_channel_t channel) static_assert(sizeof(ulog_stream_s::data) == MAVLINK_MSG_LOGGING_DATA_FIELD_DATA_LEN, "Invalid uorb ulog_stream.data length"); static_assert(sizeof(ulog_stream_s::data) == MAVLINK_MSG_LOGGING_DATA_ACKED_FIELD_DATA_LEN, - "Invalid uorb ulog_stream_acked.data length"); + "Invalid uorb ulog_stream.data length"); if (_waiting_for_initial_ack) { if (hrt_elapsed_time(&_last_sent_time) > 3e5) { @@ -104,51 +95,6 @@ int MavlinkULog::handle_update(mavlink_channel_t channel) return 0; } -#ifdef MAVLINK_PARALLEL_LOGGING - - while ((_current_num_msgs < _max_num_messages) && _ulog_stream_sub.updated()) { - const unsigned last_generation = _ulog_stream_sub.get_last_generation(); - _ulog_stream_sub.update(); - - if (_ulog_stream_sub.get_last_generation() != last_generation + 1) { - perf_count(_msg_missed_ulog_stream_perf); - } - - const ulog_stream_s &ulog_data = _ulog_stream_sub.get(); - - if (ulog_data.timestamp > 0) { - mavlink_logging_data_t msg; - msg.sequence = ulog_data.msg_sequence; - msg.length = ulog_data.length; - msg.first_message_offset = ulog_data.first_message_offset; - msg.target_system = _target_system; - msg.target_component = _target_component; - memcpy(msg.data, ulog_data.data, sizeof(msg.data)); - mavlink_msg_logging_data_send_struct(channel, &msg); - } - - ++_current_num_msgs; - } - - //need to update the rate? - hrt_abstime t = hrt_absolute_time(); - - if (t > _next_rate_check) { - if (_current_num_msgs < _max_num_messages) { - _current_rate_factor = _max_rate_factor * (float)_current_num_msgs / _max_num_messages; - - } else { - _current_rate_factor = _max_rate_factor; - } - - _current_num_msgs = 0; - _next_rate_check = t + _rate_calculation_delta_t * 1.e6f; - PX4_DEBUG("current rate=%.3f (max=%i msgs in %.3fs)", (double)_current_rate_factor, _max_num_messages, - (double)_rate_calculation_delta_t); - } - -#endif - // check if we're waiting for an ACK if (_last_sent_time) { bool check_for_updates = false; @@ -167,11 +113,7 @@ int MavlinkULog::handle_update(mavlink_channel_t channel) PX4_DEBUG("re-sending ulog mavlink message (try=%i)", _sent_tries); _last_sent_time = hrt_absolute_time(); -#ifdef MAVLINK_PARALLEL_LOGGING - const ulog_stream_s &ulog_data = _ulog_stream_acked_sub.get(); -#else const ulog_stream_s &ulog_data = _ulog_stream_sub.get(); -#endif mavlink_logging_data_acked_t msg; msg.sequence = ulog_data.msg_sequence; @@ -190,35 +132,6 @@ int MavlinkULog::handle_update(mavlink_channel_t channel) } } -#ifdef MAVLINK_PARALLEL_LOGGING - - if (_ulog_stream_acked_sub.updated()) { - _ulog_stream_acked_sub.update(); - - const ulog_stream_s &ulog_data = _ulog_stream_acked_sub.get(); - - if (ulog_data.timestamp > 0) { - _sent_tries = 1; - _last_sent_time = hrt_absolute_time(); - lock(); - _wait_for_ack_sequence = ulog_data.msg_sequence; - _ack_received = false; - unlock(); - - mavlink_logging_data_acked_t msg; - msg.sequence = ulog_data.msg_sequence; - msg.length = ulog_data.length; - msg.first_message_offset = ulog_data.first_message_offset; - msg.target_system = _target_system; - msg.target_component = _target_component; - memcpy(msg.data, ulog_data.data, sizeof(msg.data)); - mavlink_msg_logging_data_acked_send_struct(channel, &msg); - - } - - } - -#else while ((_current_num_msgs < _max_num_messages) && _ulog_stream_sub.updated()) { const unsigned last_generation = _ulog_stream_sub.get_last_generation(); @@ -280,7 +193,6 @@ int MavlinkULog::handle_update(mavlink_channel_t channel) (double)(_rate_calculation_delta_t / 1e6)); } -#endif return 0; } diff --git a/src/modules/mavlink/mavlink_ulog.h b/src/modules/mavlink/mavlink_ulog.h index 15f4d8f70a7d..7c96553c6d49 100644 --- a/src/modules/mavlink/mavlink_ulog.h +++ b/src/modules/mavlink/mavlink_ulog.h @@ -52,9 +52,6 @@ #include #include -#ifdef MAVLINK_PARALLEL_LOGGING -#include -#endif #include "mavlink_bridge_header.h" using namespace time_literals; @@ -128,10 +125,7 @@ class MavlinkULog static constexpr hrt_abstime _rate_calculation_delta_t = 100_ms; ///< rate update interval uORB::SubscriptionData _ulog_stream_sub{ORB_ID(ulog_stream)}; -#ifdef MAVLINK_PARALLEL_LOGGING - uORB::SubscriptionData _ulog_stream_acked_sub {ORB_ID(ulog_stream_acked)}; -#endif - uORB::Publication _ulog_stream_ack_pub {ORB_ID(ulog_stream_ack)}; + uORB::Publication _ulog_stream_ack_pub{ORB_ID(ulog_stream_ack)}; uint16_t _wait_for_ack_sequence; uint8_t _sent_tries = 0; volatile bool _ack_received = false; ///< set to true if a matching ack received From 651aac5f9d2acb98a45adc4e7f80895e1adb73b2 Mon Sep 17 00:00:00 2001 From: Jari Nippula Date: Wed, 25 Oct 2023 15:21:22 +0300 Subject: [PATCH 3/8] Revert "Parallel_logging: Disable reliable_transfer for dynamic ADD_LOGGED_MSG" This reverts commit b338a3f50d91016a97f4b587e0a91a042b34bdbf. --- src/modules/logger/logger.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/modules/logger/logger.cpp b/src/modules/logger/logger.cpp index 4b62216b6c09..18b055a32c1a 100644 --- a/src/modules/logger/logger.cpp +++ b/src/modules/logger/logger.cpp @@ -1944,7 +1944,11 @@ void Logger::write_add_logged_msg(LogType type, LoggerSubscription &subscription bool prev_reliable = _writer.need_reliable_transfer(); _writer.set_need_reliable_transfer(true); - write_message(type, &msg, msg_size, acked); +#ifdef LOGGER_PARALLEL_LOGGING + write_message(type, &msg, msg_size, true); +#else + write_message(type, &msg, msg_size); +#endif _writer.set_need_reliable_transfer(prev_reliable); } From ed1dea575058ff527f71827d3572a2880639e723 Mon Sep 17 00:00:00 2001 From: Jari Nippula Date: Wed, 25 Oct 2023 15:21:43 +0300 Subject: [PATCH 4/8] Revert "logger: custom mavlink logging" This reverts commit 111b1ed3b7439cbbab32d848c4414f2bcc3f8c50. --- src/modules/logger/CMakeLists.txt | 4 - src/modules/logger/Kconfig | 7 - src/modules/logger/log_writer.cpp | 4 +- src/modules/logger/log_writer.h | 2 +- src/modules/logger/log_writer_mavlink.cpp | 70 ++---- src/modules/logger/log_writer_mavlink.h | 7 +- src/modules/logger/logger.cpp | 220 +++++------------- src/modules/logger/logger.h | 48 ++-- .../logger/parallel_mavlink_logging.md | 115 --------- 9 files changed, 102 insertions(+), 375 deletions(-) delete mode 100644 src/modules/logger/parallel_mavlink_logging.md diff --git a/src/modules/logger/CMakeLists.txt b/src/modules/logger/CMakeLists.txt index 8f06d20d54b6..ec07d9adae95 100644 --- a/src/modules/logger/CMakeLists.txt +++ b/src/modules/logger/CMakeLists.txt @@ -30,9 +30,6 @@ # POSSIBILITY OF SUCH DAMAGE. # ############################################################################ -if(CONFIG_LOGGER_PARALLEL_LOGGING) - set(LOGGER_PARALLEL_COMPILE_FLAG "-DLOGGER_PARALLEL_LOGGING") -endif() px4_add_module( MODULE modules__logger @@ -41,7 +38,6 @@ px4_add_module( COMPILE_FLAGS ${MAX_CUSTOM_OPT_LEVEL} -Wno-cast-align # TODO: fix and enable - ${LOGGER_PARALLEL_COMPILE_FLAG} SRCS logged_topics.cpp logger.cpp diff --git a/src/modules/logger/Kconfig b/src/modules/logger/Kconfig index 53e3566cd722..db2c5a75460e 100644 --- a/src/modules/logger/Kconfig +++ b/src/modules/logger/Kconfig @@ -10,10 +10,3 @@ menuconfig USER_LOGGER depends on BOARD_PROTECTED && MODULES_LOGGER ---help--- Put logger in userspace memory - -menuconfig LOGGER_PARALLEL_LOGGING - bool "Custom mavlink logging protocol in logger" - default n - depends on MAVLINK_PARALLEL_LOGGING && MODULES_LOGGER - ---help--- - Utilize custom mavlink logging protocol for speed up logging start phase diff --git a/src/modules/logger/log_writer.cpp b/src/modules/logger/log_writer.cpp index a16dd93f00f3..8511f9e92a3d 100644 --- a/src/modules/logger/log_writer.cpp +++ b/src/modules/logger/log_writer.cpp @@ -158,7 +158,7 @@ void LogWriter::thread_stop() } } -int LogWriter::write_message(LogType type, void *ptr, size_t size, uint64_t dropout_start, bool acked) +int LogWriter::write_message(LogType type, void *ptr, size_t size, uint64_t dropout_start) { int ret_file = 0, ret_mavlink = 0; @@ -167,7 +167,7 @@ int LogWriter::write_message(LogType type, void *ptr, size_t size, uint64_t drop } if (_log_writer_mavlink_for_write && type == LogType::Full) { - ret_mavlink = _log_writer_mavlink_for_write->write_message(ptr, size, acked); + ret_mavlink = _log_writer_mavlink_for_write->write_message(ptr, size); } // file backend errors takes precedence diff --git a/src/modules/logger/log_writer.h b/src/modules/logger/log_writer.h index ff256062359e..9635d22dadf0 100644 --- a/src/modules/logger/log_writer.h +++ b/src/modules/logger/log_writer.h @@ -90,7 +90,7 @@ class LogWriter * -1 if not enough space in the buffer left (file backend), -2 mavlink backend failed * add type -> pass through, but not to mavlink if mission log */ - int write_message(LogType type, void *ptr, size_t size, uint64_t dropout_start = 0, bool acked = false); + int write_message(LogType type, void *ptr, size_t size, uint64_t dropout_start = 0); /** * Select a backend, so that future calls to write_message() only write to the selected diff --git a/src/modules/logger/log_writer_mavlink.cpp b/src/modules/logger/log_writer_mavlink.cpp index 5446ef0a19eb..5322aef074e0 100644 --- a/src/modules/logger/log_writer_mavlink.cpp +++ b/src/modules/logger/log_writer_mavlink.cpp @@ -48,7 +48,6 @@ namespace logger LogWriterMavlink::LogWriterMavlink() { _ulog_stream_data.length = 0; - _ulog_stream_acked_data.length = 0; } bool LogWriterMavlink::init() @@ -77,51 +76,37 @@ void LogWriterMavlink::start_log() _ulog_stream_data.length = 0; _ulog_stream_data.first_message_offset = 0; - _ulog_stream_acked_data.msg_sequence = 0; - _ulog_stream_acked_data.length = 0; - _ulog_stream_acked_data.first_message_offset = 0; - _is_started = true; } void LogWriterMavlink::stop_log() { _ulog_stream_data.length = 0; - _ulog_stream_acked_data.length = 0; _is_started = false; } -int LogWriterMavlink::write_message(void *ptr, size_t size, bool acked) +int LogWriterMavlink::write_message(void *ptr, size_t size) { if (!is_started()) { return 0; } - ulog_stream_s *ulog_s_p; - - if (acked) { - ulog_s_p = &_ulog_stream_acked_data; - - } else { - ulog_s_p = &_ulog_stream_data; - } - - const uint8_t data_len = (uint8_t)sizeof(ulog_s_p->data); + const uint8_t data_len = (uint8_t)sizeof(_ulog_stream_data.data); uint8_t *ptr_data = (uint8_t *)ptr; - if (ulog_s_p->first_message_offset == 255) { - ulog_s_p->first_message_offset = ulog_s_p->length; + if (_ulog_stream_data.first_message_offset == 255) { + _ulog_stream_data.first_message_offset = _ulog_stream_data.length; } while (size > 0) { - size_t send_len = math::min((size_t)data_len - ulog_s_p->length, size); - memcpy(ulog_s_p->data + ulog_s_p->length, ptr_data, send_len); - ulog_s_p->length += send_len; + size_t send_len = math::min((size_t)data_len - _ulog_stream_data.length, size); + memcpy(_ulog_stream_data.data + _ulog_stream_data.length, ptr_data, send_len); + _ulog_stream_data.length += send_len; ptr_data += send_len; size -= send_len; - if (ulog_s_p->length >= data_len) { - if (publish_message(acked)) { + if (_ulog_stream_data.length >= data_len) { + if (publish_message()) { return -2; } } @@ -132,8 +117,6 @@ int LogWriterMavlink::write_message(void *ptr, size_t size, bool acked) void LogWriterMavlink::set_need_reliable_transfer(bool need_reliable) { -#ifndef LOGGER_PARALLEL_LOGGING - if (!need_reliable && _need_reliable_transfer) { if (_ulog_stream_data.length > 0) { // make sure to send previous data using reliable transfer @@ -142,32 +125,12 @@ void LogWriterMavlink::set_need_reliable_transfer(bool need_reliable) } _need_reliable_transfer = need_reliable; -#endif } -int LogWriterMavlink::publish_message(bool acked) +int LogWriterMavlink::publish_message() { - ulog_stream_s *ulog_s_p; - - if (acked) { - ulog_s_p = &_ulog_stream_acked_data; - - } else { - ulog_s_p = &_ulog_stream_data; - } - - ulog_s_p->timestamp = hrt_absolute_time(); - ulog_s_p->flags = 0; - -#ifdef LOGGER_PARALLEL_LOGGING - - if (!acked) { - _ulog_stream_pub.publish(*ulog_s_p); - - } else { - ulog_s_p->flags = ulog_s_p->FLAGS_NEED_ACK; - _ulog_stream_acked_pub.publish(*ulog_s_p); -#else + _ulog_stream_data.timestamp = hrt_absolute_time(); + _ulog_stream_data.flags = 0; if (_need_reliable_transfer) { _ulog_stream_data.flags = _ulog_stream_data.FLAGS_NEED_ACK; @@ -176,7 +139,6 @@ int LogWriterMavlink::publish_message(bool acked) _ulog_stream_pub.publish(_ulog_stream_data); if (_need_reliable_transfer) { -#endif // we need to wait for an ack. Note that this blocks the main logger thread, so if a file logging // is already running, it will miss samples. px4_pollfd_struct_t fds[1]; @@ -198,7 +160,7 @@ int LogWriterMavlink::publish_message(bool acked) ulog_stream_ack_s ack; orb_copy(ORB_ID(ulog_stream_ack), _ulog_stream_ack_sub, &ack); - if (ack.msg_sequence == ulog_s_p->msg_sequence) { + if (ack.msg_sequence == _ulog_stream_data.msg_sequence) { got_ack = true; } @@ -216,9 +178,9 @@ int LogWriterMavlink::publish_message(bool acked) PX4_DEBUG("got ack in %i ms", (int)(hrt_elapsed_time(&started) / 1000)); } - ulog_s_p->msg_sequence++; - ulog_s_p->length = 0; - ulog_s_p->first_message_offset = 255; + _ulog_stream_data.msg_sequence++; + _ulog_stream_data.length = 0; + _ulog_stream_data.first_message_offset = 255; return 0; } diff --git a/src/modules/logger/log_writer_mavlink.h b/src/modules/logger/log_writer_mavlink.h index 9b1cfa073892..8aabe2d79235 100644 --- a/src/modules/logger/log_writer_mavlink.h +++ b/src/modules/logger/log_writer_mavlink.h @@ -36,7 +36,6 @@ #include #include #include -#include #include namespace px4 @@ -63,7 +62,7 @@ class LogWriterMavlink bool is_started() const { return _is_started; } /** @see LogWriter::write_message() */ - int write_message(void *ptr, size_t size, bool acked = false); + int write_message(void *ptr, size_t size); void set_need_reliable_transfer(bool need_reliable); @@ -75,12 +74,10 @@ class LogWriterMavlink private: /** publish message, wait for ack if needed & reset message */ - int publish_message(bool acked = false); + int publish_message(); ulog_stream_s _ulog_stream_data{}; - ulog_stream_s _ulog_stream_acked_data{}; uORB::Publication _ulog_stream_pub{ORB_ID(ulog_stream)}; - uORB::Publication _ulog_stream_acked_pub{ORB_ID(ulog_stream_acked)}; orb_sub_t _ulog_stream_ack_sub{ORB_SUB_INVALID}; bool _need_reliable_transfer{false}; bool _is_started{false}; diff --git a/src/modules/logger/logger.cpp b/src/modules/logger/logger.cpp index 18b055a32c1a..50753c1e697f 100644 --- a/src/modules/logger/logger.cpp +++ b/src/modules/logger/logger.cpp @@ -1187,16 +1187,8 @@ void Logger::handle_vehicle_command_update() } } -bool Logger::write_message(LogType type, void *ptr, size_t size, bool acked) +bool Logger::write_message(LogType type, void *ptr, size_t size) { - if (acked) { - if (_writer.write_message(type, ptr, size, 0, true) != -1) { - return true; - } - - return false; - } - Statistics &stats = _statistics[(int)type]; if (_writer.write_message(type, ptr, size, stats.dropout_start) != -1) { @@ -1463,36 +1455,6 @@ void Logger::stop_log_file(LogType type) _writer.stop_log_file(type); } -#ifdef LOGGER_PARALLEL_LOGGING -void *Logger::mav_start_steps_helper(void *context) -{ - px4_prctl(PR_SET_NAME, "log_writer_mavlink_headers", px4_getpid()); - static_cast(context)->mav_start_steps(); - return nullptr; -} - -void Logger::mav_start_steps() -{ - /* This is running in separate thread to keep logging data while sending header&descriptions */ - PX4_INFO("[log_writer_mavlink_headers] - Begin"); - LogWriter::Backend bkend = _writer.backend(); - _writer.select_write_backend(LogWriter::BackendMavlink); - write_header(LogType::Full, true); - write_version(LogType::Full, true); - write_formats(LogType::Full, true); - write_parameters(LogType::Full, true); - write_parameter_defaults(LogType::Full, true); - write_perf_data(true, true); - write_console_output(true); - write_events_file(LogType::Full, true); - write_excluded_optional_topics(LogType::Full, true); - write_all_add_logged_msg(LogType::Full, true); - _writer.select_write_backend(bkend); - _writer.notify(); - PX4_INFO("[log_writer_mavlink_headers] - End"); -} -#endif - void Logger::start_log_mavlink() { if (!can_start_mavlink_log()) { @@ -1509,32 +1471,6 @@ void Logger::start_log_mavlink() PX4_INFO("Start mavlink log"); _writer.start_log_mavlink(); -#ifdef LOGGER_PARALLEL_LOGGING - - for (int sub = 0; sub < _num_subscriptions; ++sub) { - if (_subscriptions[sub].valid() && _subscriptions[sub].msg_id == MSG_ID_INVALID) { - if (_next_topic_id == MSG_ID_INVALID) { - // if we land here an uint8 is too small -> switch to uint16 - PX4_ERR("limit for _next_topic_id reached"); - return; - } - - _subscriptions[sub].msg_id = _next_topic_id++; - } - } - - pthread_attr_t thr_attr; - pthread_attr_init(&thr_attr); - pthread_attr_setstacksize(&thr_attr, PX4_STACK_ADJUSTED(8500)); - PX4_INFO("create mav_start_thread"); - int ret = pthread_create(&_mav_start_thread, &thr_attr, &Logger::mav_start_steps_helper, this); - - if (ret) { - PX4_WARN("mav_start_thread create failed: %d", ret); - } - - pthread_attr_destroy(&thr_attr); -#else _writer.select_write_backend(LogWriter::BackendMavlink); _writer.set_need_reliable_transfer(true); write_header(LogType::Full); @@ -1550,9 +1486,7 @@ void Logger::start_log_mavlink() _writer.set_need_reliable_transfer(false); _writer.unselect_write_backend(); _writer.notify(); -#endif - PX4_INFO("Mavlink logging started"); adjust_subscription_updates(); // redistribute updates as sending the header can take some time } @@ -1561,28 +1495,12 @@ void Logger::stop_log_mavlink() // don't write perf data since a client does not expect more data after a stop command PX4_INFO("Stop mavlink log"); -#ifdef LOGGER_PARALLEL_LOGGING - int ret = pthread_join(_mav_start_thread, nullptr); - - if (ret) { - PX4_WARN("mav_start_thread join failed: %d", ret); - } - -#endif - if (_writer.is_started(LogType::Full, LogWriter::BackendMavlink)) { -#ifdef LOGGER_PARALLEL_LOGGING - LogWriter::Backend bkend = _writer.backend(); - _writer.select_write_backend(LogWriter::BackendMavlink); - write_perf_data(PrintLoadReason::Postflight, true); - _writer.select_write_backend(bkend); -#else _writer.select_write_backend(LogWriter::BackendMavlink); _writer.set_need_reliable_transfer(true); write_perf_data(PrintLoadReason::Postflight); _writer.set_need_reliable_transfer(false); _writer.unselect_write_backend(); -#endif _writer.notify(); _writer.stop_log_mavlink(); } @@ -1592,7 +1510,6 @@ struct perf_callback_data_t { Logger *logger; int counter; Logger::PrintLoadReason reason; - bool acked; char *buffer; }; @@ -1620,18 +1537,16 @@ void Logger::perf_iterate_callback(perf_counter_t handle, void *user) break; } - callback_data->logger->write_info_multiple(LogType::Full, perf_name, buffer, callback_data->counter != 0, - callback_data->acked); + callback_data->logger->write_info_multiple(LogType::Full, perf_name, buffer, callback_data->counter != 0); ++callback_data->counter; } -void Logger::write_perf_data(PrintLoadReason reason, bool acked) +void Logger::write_perf_data(PrintLoadReason reason) { perf_callback_data_t callback_data = {}; callback_data.logger = this; callback_data.counter = 0; callback_data.reason = reason; - callback_data.acked = acked; // write the perf counters perf_iterate_all(perf_iterate_callback, &callback_data); @@ -1667,8 +1582,6 @@ void Logger::print_load_callback(void *user) callback_data->logger->write_info_multiple(LogType::Full, perf_name, callback_data->buffer, callback_data->counter != 0); - // TODO: Add call callback_data->acked to the last param. Currenlty it fails due to conflict with - // startup header/definition sending ++callback_data->counter; } @@ -1699,11 +1612,6 @@ void Logger::write_load_output() char buffer[140]; callback_data.logger = this; callback_data.counter = 0; -#ifdef LOGGER_PARALLEL_LOGGING - callback_data.acked = true; -#else - callback_data.acked = false; -#endif callback_data.buffer = buffer; // TODO: maybe we should restrict the output to a selected backend (eg. when file logging is running // and mavlink log is started, this will be added to the file as well) @@ -1712,7 +1620,7 @@ void Logger::write_load_output() _writer.set_need_reliable_transfer(false); } -void Logger::write_console_output(bool acked) +void Logger::write_console_output() { const int buffer_length = 220; char buffer[buffer_length]; @@ -1726,7 +1634,7 @@ void Logger::write_console_output(bool acked) if (read_size <= 0) { break; } buffer[math::min(read_size, size)] = '\0'; - write_info_multiple(LogType::Full, "boot_console_output", buffer, !first, acked); + write_info_multiple(LogType::Full, "boot_console_output", buffer, !first); size -= read_size; first = false; @@ -1735,7 +1643,7 @@ void Logger::write_console_output(bool acked) } void Logger::write_format(LogType type, const orb_metadata &meta, WrittenFormats &written_formats, - ulog_message_format_s &msg, int subscription_index, int level, bool acked) + ulog_message_format_s &msg, int subscription_index, int level) { if (level > 3) { // precaution: limit recursion level. If we land here it's either a bug or nested topic definitions. In the @@ -1796,7 +1704,7 @@ void Logger::write_format(LogType type, const orb_metadata &meta, WrittenFormats size_t msg_size = sizeof(msg) - sizeof(msg.format) + format_len; msg.msg_size = msg_size - ULOG_MSG_HEADER_LEN; - write_message(type, &msg, msg_size, acked); + write_message(type, &msg, msg_size); if (level > 1 && !written_formats.push_back(&meta)) { PX4_ERR("Array too small"); @@ -1850,7 +1758,7 @@ void Logger::write_format(LogType type, const orb_metadata &meta, WrittenFormats if (found_topic) { - write_format(type, *found_topic, written_formats, msg, subscription_index, level + 1, acked); + write_format(type, *found_topic, written_formats, msg, subscription_index, level + 1); } else { PX4_ERR("No definition for topic %s found", fmt); @@ -1863,7 +1771,7 @@ void Logger::write_format(LogType type, const orb_metadata &meta, WrittenFormats } } -void Logger::write_formats(LogType type, bool acked) +void Logger::write_formats(LogType type) { _writer.lock(); @@ -1880,15 +1788,15 @@ void Logger::write_formats(LogType type, bool acked) for (int i = 0; i < sub_count; ++i) { const LoggerSubscription &sub = _subscriptions[i]; - write_format(type, *sub.get_topic(), written_formats, msg, i, 1, acked); + write_format(type, *sub.get_topic(), written_formats, msg, i); } - write_format(type, *_event_subscription.get_topic(), written_formats, msg, sub_count, 1, acked); + write_format(type, *_event_subscription.get_topic(), written_formats, msg, sub_count); _writer.unlock(); } -void Logger::write_all_add_logged_msg(LogType type, bool acked) +void Logger::write_all_add_logged_msg(LogType type) { _writer.lock(); @@ -1904,12 +1812,12 @@ void Logger::write_all_add_logged_msg(LogType type, bool acked) LoggerSubscription &sub = _subscriptions[i]; if (sub.valid()) { - write_add_logged_msg(type, sub, acked); + write_add_logged_msg(type, sub); added_subscriptions = true; } } - write_add_logged_msg(type, _event_subscription, acked); // always add, even if not valid + write_add_logged_msg(type, _event_subscription); // always add, even if not valid _writer.unlock(); @@ -1918,7 +1826,7 @@ void Logger::write_all_add_logged_msg(LogType type, bool acked) } } -void Logger::write_add_logged_msg(LogType type, LoggerSubscription &subscription, bool acked) +void Logger::write_add_logged_msg(LogType type, LoggerSubscription &subscription) { ulog_message_add_logged_s msg; @@ -1944,15 +1852,11 @@ void Logger::write_add_logged_msg(LogType type, LoggerSubscription &subscription bool prev_reliable = _writer.need_reliable_transfer(); _writer.set_need_reliable_transfer(true); -#ifdef LOGGER_PARALLEL_LOGGING - write_message(type, &msg, msg_size, true); -#else write_message(type, &msg, msg_size); -#endif _writer.set_need_reliable_transfer(prev_reliable); } -void Logger::write_info(LogType type, const char *name, const char *value, bool acked) +void Logger::write_info(LogType type, const char *name, const char *value) { _writer.lock(); ulog_message_info_s msg = {}; @@ -1971,13 +1875,13 @@ void Logger::write_info(LogType type, const char *name, const char *value, bool msg.msg_size = msg_size - ULOG_MSG_HEADER_LEN; - write_message(type, buffer, msg_size, acked); + write_message(type, buffer, msg_size); } _writer.unlock(); } -void Logger::write_info_multiple(LogType type, const char *name, const char *value, bool is_continued, bool acked) +void Logger::write_info_multiple(LogType type, const char *name, const char *value, bool is_continued) { _writer.lock(); ulog_message_info_multiple_s msg; @@ -1997,7 +1901,7 @@ void Logger::write_info_multiple(LogType type, const char *name, const char *val msg.msg_size = msg_size - ULOG_MSG_HEADER_LEN; - write_message(type, buffer, msg_size, acked); + write_message(type, buffer, msg_size); } else { PX4_ERR("info_multiple str too long (%" PRIu8 "), key=%s", msg.key_len, msg.key_value_str); @@ -2006,7 +1910,7 @@ void Logger::write_info_multiple(LogType type, const char *name, const char *val _writer.unlock(); } -void Logger::write_info_multiple(LogType type, const char *name, int fd, bool acked) +void Logger::write_info_multiple(LogType type, const char *name, int fd) { // Get the file length struct stat stat_data; @@ -2042,7 +1946,7 @@ void Logger::write_info_multiple(LogType type, const char *name, int fd, bool ac msg_size += read_length; msg.msg_size = msg_size - ULOG_MSG_HEADER_LEN; - write_message(type, buffer, msg_size, acked); + write_message(type, buffer, msg_size); file_offset += ret; } else { @@ -2056,19 +1960,19 @@ void Logger::write_info_multiple(LogType type, const char *name, int fd, bool ac } } -void Logger::write_info(LogType type, const char *name, int32_t value, bool acked) +void Logger::write_info(LogType type, const char *name, int32_t value) { - write_info_template(type, name, value, "int32_t", acked); + write_info_template(type, name, value, "int32_t"); } -void Logger::write_info(LogType type, const char *name, uint32_t value, bool acked) +void Logger::write_info(LogType type, const char *name, uint32_t value) { - write_info_template(type, name, value, "uint32_t", acked); + write_info_template(type, name, value, "uint32_t"); } template -void Logger::write_info_template(LogType type, const char *name, T value, const char *type_str, bool acked) +void Logger::write_info_template(LogType type, const char *name, T value, const char *type_str) { _writer.lock(); ulog_message_info_s msg = {}; @@ -2085,23 +1989,23 @@ void Logger::write_info_template(LogType type, const char *name, T value, const msg.msg_size = msg_size - ULOG_MSG_HEADER_LEN; - write_message(type, buffer, msg_size, acked); + write_message(type, buffer, msg_size); _writer.unlock(); } -void Logger::write_excluded_optional_topics(LogType type, bool acked) +void Logger::write_excluded_optional_topics(LogType type) { for (int i = 0; i < _num_excluded_optional_topic_ids; ++i) { orb_id_t meta = get_orb_meta((ORB_ID)_excluded_optional_topic_ids[i]); if (meta) { - write_info_multiple(type, "excluded_optional_topics", meta->o_name, false, acked); + write_info_multiple(type, "excluded_optional_topics", meta->o_name, false); } } } -void Logger::write_header(LogType type, bool acked) +void Logger::write_header(LogType type) { ulog_file_header_s header = {}; header.magic[0] = 'U'; @@ -2114,7 +2018,7 @@ void Logger::write_header(LogType type, bool acked) header.magic[7] = 0x01; //file version 1 header.timestamp = hrt_absolute_time(); _writer.lock(); - write_message(type, &header, sizeof(header), acked); + write_message(type, &header, sizeof(header)); // write the Flags message: this MUST be written right after the ulog header ulog_message_flag_bits_s flag_bits{}; @@ -2124,57 +2028,57 @@ void Logger::write_header(LogType type, bool acked) flag_bits.msg_size = sizeof(flag_bits) - ULOG_MSG_HEADER_LEN; flag_bits.msg_type = static_cast(ULogMessageType::FLAG_BITS); - write_message(type, &flag_bits, sizeof(flag_bits), acked); + write_message(type, &flag_bits, sizeof(flag_bits)); _writer.unlock(); } -void Logger::write_version(LogType type, bool acked) +void Logger::write_version(LogType type) { - write_info(type, "ver_sw", px4_firmware_version_string(), acked); - write_info(type, "ver_sw_release", px4_firmware_version(), acked); + write_info(type, "ver_sw", px4_firmware_version_string()); + write_info(type, "ver_sw_release", px4_firmware_version()); uint32_t vendor_version = px4_firmware_vendor_version(); if (vendor_version > 0) { - write_info(type, "ver_vendor_sw_release", vendor_version, acked); + write_info(type, "ver_vendor_sw_release", vendor_version); } - write_info(type, "ver_hw", px4_board_name(), acked); + write_info(type, "ver_hw", px4_board_name()); const char *board_sub_type = px4_board_sub_type(); if (board_sub_type && board_sub_type[0]) { - write_info(type, "ver_hw_subtype", board_sub_type, acked); + write_info(type, "ver_hw_subtype", board_sub_type); } - write_info(type, "sys_name", "PX4", acked); - write_info(type, "sys_os_name", px4_os_name(), acked); + write_info(type, "sys_name", "PX4"); + write_info(type, "sys_os_name", px4_os_name()); const char *os_version = px4_os_version_string(); const char *git_branch = px4_firmware_git_branch(); if (git_branch && git_branch[0]) { - write_info(type, "ver_sw_branch", git_branch, acked); + write_info(type, "ver_sw_branch", git_branch); } if (os_version) { - write_info(type, "sys_os_ver", os_version, acked); + write_info(type, "sys_os_ver", os_version); } const char *oem_version = px4_firmware_oem_version_string(); if (oem_version && oem_version[0]) { - write_info(type, "ver_oem", oem_version, acked); + write_info(type, "ver_oem", oem_version); } - write_info(type, "sys_os_ver_release", px4_os_version(), acked); - write_info(type, "sys_toolchain", px4_toolchain_name(), acked); - write_info(type, "sys_toolchain_ver", px4_toolchain_version(), acked); + write_info(type, "sys_os_ver_release", px4_os_version()); + write_info(type, "sys_toolchain", px4_toolchain_name()); + write_info(type, "sys_toolchain_ver", px4_toolchain_version()); const char *ecl_version = px4_ecl_lib_version_string(); if (ecl_version && ecl_version[0]) { - write_info(type, "sys_lib_ecl_ver", ecl_version, acked); + write_info(type, "sys_lib_ecl_ver", ecl_version); } char revision = 'U'; @@ -2183,12 +2087,12 @@ void Logger::write_version(LogType type, bool acked) if (board_mcu_version(&revision, &chip_name, nullptr) >= 0) { char mcu_ver[64]; snprintf(mcu_ver, sizeof(mcu_ver), "%s, rev. %c", chip_name, revision); - write_info(type, "sys_mcu", mcu_ver, acked); + write_info(type, "sys_mcu", mcu_ver); } // data versioning: increase this on every larger data change (format/semantic) // 1: switch to FIFO drivers (disabled on-chip DLPF) - write_info(type, "ver_data_format", static_cast(1), acked); + write_info(type, "ver_data_format", static_cast(1)); #ifndef BOARD_HAS_NO_UUID @@ -2196,23 +2100,23 @@ void Logger::write_version(LogType type, bool acked) if (_param_sdlog_uuid.get() == 1) { char px4_uuid_string[PX4_GUID_FORMAT_SIZE]; board_get_px4_guid_formated(px4_uuid_string, sizeof(px4_uuid_string)); - write_info(type, "sys_uuid", px4_uuid_string, acked); + write_info(type, "sys_uuid", px4_uuid_string); } #endif /* BOARD_HAS_NO_UUID */ - write_info(type, "time_ref_utc", _param_sdlog_utc_offset.get() * 60, acked); + write_info(type, "time_ref_utc", _param_sdlog_utc_offset.get() * 60); if (_replay_file_name) { - write_info(type, "replay", _replay_file_name, acked); + write_info(type, "replay", _replay_file_name); } if (type == LogType::Mission) { - write_info(type, "log_type", "mission", acked); + write_info(type, "log_type", "mission"); } } -void Logger::write_parameter_defaults(LogType type, bool acked) +void Logger::write_parameter_defaults(LogType type) { _writer.lock(); ulog_message_parameter_default_s msg = {}; @@ -2283,20 +2187,20 @@ void Logger::write_parameter_defaults(LogType type, bool acked) if (memcmp(&value, &default_value, value_size) != 0) { memcpy(&buffer[msg_size - value_size], default_value, value_size); msg.default_types = ulog_parameter_default_type_t::current_setup | ulog_parameter_default_type_t::system; - write_message(type, buffer, msg_size, acked); + write_message(type, buffer, msg_size); } } else { if (memcmp(&value, &default_value, value_size) != 0) { memcpy(&buffer[msg_size - value_size], default_value, value_size); msg.default_types = ulog_parameter_default_type_t::current_setup; - write_message(type, buffer, msg_size, acked); + write_message(type, buffer, msg_size); } if (memcmp(&value, &system_default_value, value_size) != 0) { memcpy(&buffer[msg_size - value_size], system_default_value, value_size); msg.default_types = ulog_parameter_default_type_t::system; - write_message(type, buffer, msg_size, acked); + write_message(type, buffer, msg_size); } } } @@ -2306,7 +2210,7 @@ void Logger::write_parameter_defaults(LogType type, bool acked) _writer.notify(); } -void Logger::write_parameters(LogType type, bool acked) +void Logger::write_parameters(LogType type) { _writer.lock(); ulog_message_parameter_s msg = {}; @@ -2367,7 +2271,7 @@ void Logger::write_parameters(LogType type, bool acked) msg.msg_size = msg_size - ULOG_MSG_HEADER_LEN; - write_message(type, buffer, msg_size, acked); + write_message(type, buffer, msg_size); } } while ((param != PARAM_INVALID) && (param_idx < (int) param_count())); @@ -2446,7 +2350,7 @@ void Logger::write_changed_parameters(LogType type) _writer.notify(); } -void Logger::write_events_file(LogType type, bool acked) +void Logger::write_events_file(LogType type) { int fd = open(PX4_ROOTFSDIR "/etc/extras/all_events.json.xz", O_RDONLY); @@ -2458,11 +2362,11 @@ void Logger::write_events_file(LogType type, bool acked) return; } - write_info_multiple(type, "metadata_events", fd, acked); + write_info_multiple(type, "metadata_events", fd); close(fd); - write_info(type, "metadata_events_sha256", component_information::all_events_sha256, acked); + write_info(type, "metadata_events_sha256", component_information::all_events_sha256); } void Logger::ack_vehicle_command(vehicle_command_s *cmd, uint32_t result) diff --git a/src/modules/logger/logger.h b/src/modules/logger/logger.h index 7dc9daf66379..65a9f1d179b3 100644 --- a/src/modules/logger/logger.h +++ b/src/modules/logger/logger.h @@ -190,13 +190,13 @@ class Logger : public ModuleBase, public ModuleParams /** * Write an ADD_LOGGED_MSG to the log for a all current subscriptions and instances */ - void write_all_add_logged_msg(LogType type, bool acked = false); + void write_all_add_logged_msg(LogType type); /** * Write an ADD_LOGGED_MSG to the log for a given subscription and instance. * _writer.lock() must be held when calling this. */ - void write_add_logged_msg(LogType type, LoggerSubscription &subscription, bool acked = false); + void write_add_logged_msg(LogType type, LoggerSubscription &subscription); /** * Create logging directory @@ -221,16 +221,6 @@ class Logger : public ModuleBase, public ModuleParams void stop_log_mavlink(); -#ifdef LOGGER_PARALLEL_LOGGING - /** - * Run in separate thread to continue data logging while sending header&descriptions - */ - void mav_start_steps(); - - static void *mav_start_steps_helper(void *); - pthread_t _mav_start_thread = 0; -#endif - /** check if mavlink logging can be started */ bool can_start_mavlink_log() const { @@ -244,24 +234,24 @@ class Logger : public ModuleBase, public ModuleParams /** * write the file header with file magic and timestamp. */ - void write_header(LogType type, bool acked = false); + void write_header(LogType type); /// Array to store written formats for nested definitions (only) using WrittenFormats = Array < const orb_metadata *, 20 >; void write_format(LogType type, const orb_metadata &meta, WrittenFormats &written_formats, ulog_message_format_s &msg, - int subscription_index, int level = 1, bool acked = false); - void write_formats(LogType type, bool acked = false); + int subscription_index, int level = 1); + void write_formats(LogType type); /** * write performance counters */ - void write_perf_data(PrintLoadReason reason, bool acked = false); + void write_perf_data(PrintLoadReason reason); /** * write bootup console output */ - void write_console_output(bool acked = false); + void write_console_output(); /** * callback to write the performance counters @@ -273,25 +263,25 @@ class Logger : public ModuleBase, public ModuleParams */ static void print_load_callback(void *user); - void write_version(LogType type, bool acked = false); + void write_version(LogType type); - void write_excluded_optional_topics(LogType type, bool acked = false); + void write_excluded_optional_topics(LogType type); - void write_info(LogType type, const char *name, const char *value, bool acked = false); - void write_info_multiple(LogType type, const char *name, const char *value, bool is_continued, bool acked = false); - void write_info_multiple(LogType type, const char *name, int fd, bool acked = false); - void write_info(LogType type, const char *name, int32_t value, bool acked = false); - void write_info(LogType type, const char *name, uint32_t value, bool acked = false); + void write_info(LogType type, const char *name, const char *value); + void write_info_multiple(LogType type, const char *name, const char *value, bool is_continued); + void write_info_multiple(LogType type, const char *name, int fd); + void write_info(LogType type, const char *name, int32_t value); + void write_info(LogType type, const char *name, uint32_t value); /** generic common template method for write_info variants */ template - void write_info_template(LogType type, const char *name, T value, const char *type_str, bool acked = false); + void write_info_template(LogType type, const char *name, T value, const char *type_str); - void write_parameters(LogType type, bool acked = false); - void write_parameter_defaults(LogType type, bool acked = false); + void write_parameters(LogType type); + void write_parameter_defaults(LogType type); void write_changed_parameters(LogType type); - void write_events_file(LogType type, bool acked = false); + void write_events_file(LogType type); inline bool copy_if_updated(int sub_idx, void *buffer, bool try_to_subscribe); @@ -300,7 +290,7 @@ class Logger : public ModuleBase, public ModuleParams * Must be called with _writer.lock() held. * @return true if data written, false otherwise (on overflow) */ - bool write_message(LogType type, void *ptr, size_t size, bool acked = false); + bool write_message(LogType type, void *ptr, size_t size); /** * Add topic subscriptions from SD file if it exists, otherwise add topics based on the configured profile. diff --git a/src/modules/logger/parallel_mavlink_logging.md b/src/modules/logger/parallel_mavlink_logging.md deleted file mode 100644 index 3d31b52cd916..000000000000 --- a/src/modules/logger/parallel_mavlink_logging.md +++ /dev/null @@ -1,115 +0,0 @@ -# Parallel data logging over MAVLink - -### Problem -Starting flight logging over mavlink is a slow operation. In case logging is started from ARMING event the log file most likely have several seconds long missing trace period from the takeoff part. This is caused by the protocol that the log is first filled with header and all the static definitions, configuration, boot-log, etc. which are sent using *reliable_transfer*, meaning that every log message/package the px4 logger is sending needs to be acknowleged by the receiver. The round-trip time for this may be long depending on the MAVLink transfer media and the module receiving the log data in the other system behind the Mavlink. - -### Solution -To speed up logging startup and reduce that blackout period in the beginning of the log, the parallel data logging option is implemented. The trick here is that the actual uorb data logging is started as soon as possible when logging is started and the static definitions/configs are sent at the same time through another channel. The receiver end reads both streams and store them to two separate files and in the end of logging it combines them into one ulog file by appending the topic data file in the end of static definition data file. - - -This new protocol breaks the backward compatibility, so BOTH px4 logger and the reveicer MUST or MUST NOT implement the parallel logging to make it work! - - -### Original protocol -For Logger, there is one **ulog_stream** uorb channel for transfer data to receiver side and another **ulog_stream_ack** for receiveing ack events. First it collects all the static definitions and send them using *reliable_transfer* method. After static defs are sent it starts sending actual dynamic topic data. - -Mavlink_uorb module listen to **ulog_stream** topic and depending on FLAGS_NEED_ACK flag in the topic msg it sends either **MAVLINK_MSG_LOGGING_DATA** or **MAVLINK_MSG_LOGGING_DATA_ACKED** msg over mavlink. -If **MAVLINK_MSG_LOGGING_DATA_ACKED** is sent it starts waiting for **MAVLINK_MSG_LOGGING_DATA_ACK** and continue sending only after it receives the ack message. publish it to **ulog_stream_ack** - -The receiver reads **MAVLINK_MSG_LOGGING_DATA/_ACKED** messages and store them to ulg file. If **MAVLINK_MSG_LOGGING_DATA_ACKED** is received then **MAVLINK_MSG_LOGGING_DATA_ACK** is sent back to PX4. - -``` -+----------------------------------------------+ -| Logger | -| | -|----------------------------------------------| -| Static data | -| Dyn data | -+----------------------------------------------+ - | ^ - | Publish | Subs - | | - V | -+----------------------------------------------+ -| Mavlink_ulog | -| | -| | -| | -+----------------------------------------------+ - | ^ - Send | -MAVLINK_MSG_LOGGING_DATA Recv -MAVLINK_MSG_LOGGING_DATA_ACKED MAVLINK_MSG_ - | LOGGING_DATA_ACK - V | -+----------------------------------------------+ -| Receiver | -|----------------------------------------------| -| Static data | -| Dyn data | -+----------------------------------------------+ - | - | - | - V -+------------+ -| .ulg file | -+------------+ - -``` - - -### Parallel protocol -Logger spawns new thread for sending Static definitions data and continues to send dynamic topic data at the same time. Static data is published into **ulog_stream_acked** topic and the dynamic data into **ulog_stream** topic. The thread sending dynamic data does not need to wait anything, but continuously sending the data without waiting any ack. The static data sender thread publishes one msg at a time and waits for ack until it can publish next one. - -mavlink_uorb reads both **ulog_stream** and **ulog_stream_acked** streams and sends either **MAVLINK_MSG_LOGGING_DATA** or **MAVLINK_MSG_LOGGING_DATA_ACKED** msgs accordingly. Also it listens to **MAVLINK_MSG_LOGGING_DATA_ACK** messages and publish to **ulog_stream_ack** if one received. -Sending **MAVLINK_MSG_LOGGING_DATA_ACKED** raises a flag to wait for ack until next _acked message is sent, but **MAVLINK_MSG_LOGGING_DATA** messages are always sent without any blocking. - -Receiver listens to both **MAVLINK_MSG_LOGGING_DATA** and **MAVLINK_MSG_LOGGING_DATA_ACKED** messages and write them to separate files accordingly: _DATA into .data file and _DATA_ACKED into .ulg file. When logging is stopped, the receiver append the .data file content into the end of .ulg file. - -``` -+----------------------------------------------+ -| Logger | -| | -|-----------+ +--------------------------| -| Dyn data | | Static data | -+----------------------------------------------+ - | | ^ - | Publish | Publish | Subs - | | | - V V | -+----------------------------------------------+ -| MAvlink_ulog | -| | -| | -| | -+----------------------------------------------+ - | | ^ - Send Send Recv -MAVLINK_MSG_ MAVLINK_MSG_ MAVLINK_MSG_ -LOGGING_DATA LOGGING_DATA_ACKED LOGGING_DATA_ACK - | | | - V V | -+----------------------------------------------+ -| Receiver | -|-----------+ +--------------------------| -| Dyn data | | Static data | -+----------------------------------------------+ - | | - | | - | | - V V -+------------+ +------------+ -| .data file | | .ulg file | -+------------+ +------------+ - | | - | ---- Stop logging---- | - | | - +------------+--------------+ - | - V - +-----------+ - | .ulg file | - +-----------+ - -``` From 3e82771fe786347a20068af6daa43f16afbc5f70 Mon Sep 17 00:00:00 2001 From: Jari Nippula Date: Fri, 22 Sep 2023 20:51:02 +0300 Subject: [PATCH 5/8] Add ulog_stream_acked --- msg/UlogStream.msg | 1 + msg/UlogStreamAck.msg | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/msg/UlogStream.msg b/msg/UlogStream.msg index d206b4a42595..471683417674 100644 --- a/msg/UlogStream.msg +++ b/msg/UlogStream.msg @@ -17,3 +17,4 @@ uint8 flags # see FLAGS_* uint8[249] data # ulog data uint8 ORB_QUEUE_LENGTH = 16 # TODO: we might be able to reduce this if mavlink polled on the topic +# TOPICS ulog_stream ulog_stream_acked diff --git a/msg/UlogStreamAck.msg b/msg/UlogStreamAck.msg index e3747fff6ea6..9816d9bc0fdc 100644 --- a/msg/UlogStreamAck.msg +++ b/msg/UlogStreamAck.msg @@ -1,4 +1,4 @@ -# Ack a previously sent ulog_stream message that had +# Ack a previously sent ulog_stream_acked message that had # the NEED_ACK flag set uint64 timestamp # time since system start (microseconds) From de894a9cb7d1ccdd5eef13a724b3318782541780 Mon Sep 17 00:00:00 2001 From: Jari Nippula Date: Wed, 25 Oct 2023 15:24:40 +0300 Subject: [PATCH 6/8] saluki-v2: enable pthread_key feature --- boards/ssrc/saluki-v2 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/boards/ssrc/saluki-v2 b/boards/ssrc/saluki-v2 index 6e5545d6cb62..b2b18de1897a 160000 --- a/boards/ssrc/saluki-v2 +++ b/boards/ssrc/saluki-v2 @@ -1 +1 @@ -Subproject commit 6e5545d6cb623d832eb1ec397566780dc41590fe +Subproject commit b2b18de1897aa2b204e573a6d33823db68204899 From ed52432b13fe5aad09964cc267a1ce9c433c0fda Mon Sep 17 00:00:00 2001 From: Jari Nippula Date: Fri, 22 Sep 2023 20:53:23 +0300 Subject: [PATCH 7/8] mavlink_ulog: parallel mavlink logging --- src/modules/mavlink/mavlink_ulog.cpp | 85 +++++++++++++++++++--------- src/modules/mavlink/mavlink_ulog.h | 4 +- 2 files changed, 62 insertions(+), 27 deletions(-) diff --git a/src/modules/mavlink/mavlink_ulog.cpp b/src/modules/mavlink/mavlink_ulog.cpp index 64b9ec981d20..bc7f05d33205 100644 --- a/src/modules/mavlink/mavlink_ulog.cpp +++ b/src/modules/mavlink/mavlink_ulog.cpp @@ -60,6 +60,12 @@ MavlinkULog::MavlinkULog(int datarate, float max_rate_factor, uint8_t target_sys } + PX4_INFO("[Mavlink_ulog] Parallel logging supported"); + + while (_ulog_stream_acked_sub.update()) { + + } + _waiting_for_initial_ack = true; _last_sent_time = hrt_absolute_time(); //(ab)use this timestamp during initialization _next_rate_check = _last_sent_time + _rate_calculation_delta_t; @@ -84,7 +90,7 @@ int MavlinkULog::handle_update(mavlink_channel_t channel) static_assert(sizeof(ulog_stream_s::data) == MAVLINK_MSG_LOGGING_DATA_FIELD_DATA_LEN, "Invalid uorb ulog_stream.data length"); static_assert(sizeof(ulog_stream_s::data) == MAVLINK_MSG_LOGGING_DATA_ACKED_FIELD_DATA_LEN, - "Invalid uorb ulog_stream.data length"); + "Invalid uorb ulog_stream_acked.data length"); if (_waiting_for_initial_ack) { if (hrt_elapsed_time(&_last_sent_time) > 3e5) { @@ -96,8 +102,10 @@ int MavlinkULog::handle_update(mavlink_channel_t channel) } // check if we're waiting for an ACK + bool check_for_updates = true; + if (_last_sent_time) { - bool check_for_updates = false; + check_for_updates = false; if (_ack_received) { _last_sent_time = 0; @@ -113,7 +121,7 @@ int MavlinkULog::handle_update(mavlink_channel_t channel) PX4_DEBUG("re-sending ulog mavlink message (try=%i)", _sent_tries); _last_sent_time = hrt_absolute_time(); - const ulog_stream_s &ulog_data = _ulog_stream_sub.get(); + const ulog_stream_s &ulog_data = _ulog_stream_acked_sub.get(); mavlink_logging_data_acked_t msg; msg.sequence = ulog_data.msg_sequence; @@ -126,25 +134,58 @@ int MavlinkULog::handle_update(mavlink_channel_t channel) } } } - - if (!check_for_updates) { - return 0; - } } + while ((_current_num_msgs < _max_num_messages) && (_ulog_stream_sub.updated() || _ulog_stream_acked_sub.updated())) { + if (_ulog_stream_sub.updated()) { + const unsigned last_generation = _ulog_stream_sub.get_last_generation(); + _ulog_stream_sub.update(); + + if (_ulog_stream_sub.get_last_generation() != last_generation + 1) { + perf_count(_msg_missed_ulog_stream_perf); + } + + const ulog_stream_s &ulog_data = _ulog_stream_sub.get(); + + if (ulog_data.timestamp > 0) { + if (ulog_data.flags & ulog_stream_s::FLAGS_NEED_ACK) { + _sent_tries = 1; + _last_sent_time = hrt_absolute_time(); + lock(); + _wait_for_ack_sequence = ulog_data.msg_sequence; + _ack_received = false; + unlock(); + + mavlink_logging_data_acked_t msg; + msg.sequence = ulog_data.msg_sequence; + msg.length = ulog_data.length; + msg.first_message_offset = ulog_data.first_message_offset; + msg.target_system = _target_system; + msg.target_component = _target_component; + memcpy(msg.data, ulog_data.data, sizeof(msg.data)); + mavlink_msg_logging_data_acked_send_struct(channel, &msg); - while ((_current_num_msgs < _max_num_messages) && _ulog_stream_sub.updated()) { - const unsigned last_generation = _ulog_stream_sub.get_last_generation(); - _ulog_stream_sub.update(); + } else { + mavlink_logging_data_t msg; + msg.sequence = ulog_data.msg_sequence; + msg.length = ulog_data.length; + msg.first_message_offset = ulog_data.first_message_offset; + msg.target_system = _target_system; + msg.target_component = _target_component; + memcpy(msg.data, ulog_data.data, sizeof(msg.data)); + mavlink_msg_logging_data_send_struct(channel, &msg); + } + } - if (_ulog_stream_sub.get_last_generation() != last_generation + 1) { - perf_count(_msg_missed_ulog_stream_perf); + ++_current_num_msgs; } - const ulog_stream_s &ulog_data = _ulog_stream_sub.get(); + if (check_for_updates && _ulog_stream_acked_sub.updated()) { + _ulog_stream_acked_sub.update(); + + const ulog_stream_s &ulog_data = _ulog_stream_acked_sub.get(); - if (ulog_data.timestamp > 0) { - if (ulog_data.flags & ulog_stream_s::FLAGS_NEED_ACK) { + if (ulog_data.timestamp > 0) { _sent_tries = 1; _last_sent_time = hrt_absolute_time(); lock(); @@ -161,19 +202,10 @@ int MavlinkULog::handle_update(mavlink_channel_t channel) memcpy(msg.data, ulog_data.data, sizeof(msg.data)); mavlink_msg_logging_data_acked_send_struct(channel, &msg); - } else { - mavlink_logging_data_t msg; - msg.sequence = ulog_data.msg_sequence; - msg.length = ulog_data.length; - msg.first_message_offset = ulog_data.first_message_offset; - msg.target_system = _target_system; - msg.target_component = _target_component; - memcpy(msg.data, ulog_data.data, sizeof(msg.data)); - mavlink_msg_logging_data_send_struct(channel, &msg); } - } - ++_current_num_msgs; + ++_current_num_msgs; + } } //need to update the rate? @@ -247,6 +279,7 @@ void MavlinkULog::handle_ack(mavlink_logging_ack_t ack) lock(); if (_instance) { // make sure stop() was not called right before + //PX4_INFO("<< %u (wait: %u)", ack.sequence, _wait_for_ack_sequence); if (_wait_for_ack_sequence == ack.sequence) { _ack_received = true; publish_ack(ack.sequence); diff --git a/src/modules/mavlink/mavlink_ulog.h b/src/modules/mavlink/mavlink_ulog.h index 7c96553c6d49..9e534075920a 100644 --- a/src/modules/mavlink/mavlink_ulog.h +++ b/src/modules/mavlink/mavlink_ulog.h @@ -125,7 +125,9 @@ class MavlinkULog static constexpr hrt_abstime _rate_calculation_delta_t = 100_ms; ///< rate update interval uORB::SubscriptionData _ulog_stream_sub{ORB_ID(ulog_stream)}; - uORB::Publication _ulog_stream_ack_pub{ORB_ID(ulog_stream_ack)}; + uORB::SubscriptionData _ulog_stream_acked_sub {ORB_ID(ulog_stream_acked)}; + + uORB::Publication _ulog_stream_ack_pub {ORB_ID(ulog_stream_ack)}; uint16_t _wait_for_ack_sequence; uint8_t _sent_tries = 0; volatile bool _ack_received = false; ///< set to true if a matching ack received From 2f9d8dc5bf77ef4fd64627ffc44c514e87625dbe Mon Sep 17 00:00:00 2001 From: Jari Nippula Date: Thu, 12 Oct 2023 09:50:19 +0300 Subject: [PATCH 8/8] logger: parallel mavlink logging --- src/modules/logger/CMakeLists.txt | 4 + src/modules/logger/Kconfig | 7 + src/modules/logger/log_writer.cpp | 13 +- src/modules/logger/log_writer.h | 23 +- src/modules/logger/log_writer_mavlink.cpp | 234 ++++++++++++++++-- src/modules/logger/log_writer_mavlink.h | 52 +++- src/modules/logger/logger.cpp | 156 +++++++++++- src/modules/logger/logger.h | 26 +- .../logger/parallel_mavlink_logging.md | 118 +++++++++ 9 files changed, 603 insertions(+), 30 deletions(-) create mode 100644 src/modules/logger/parallel_mavlink_logging.md diff --git a/src/modules/logger/CMakeLists.txt b/src/modules/logger/CMakeLists.txt index ec07d9adae95..477c87df976e 100644 --- a/src/modules/logger/CMakeLists.txt +++ b/src/modules/logger/CMakeLists.txt @@ -30,6 +30,9 @@ # POSSIBILITY OF SUCH DAMAGE. # ############################################################################ +if(CONFIG_LOGGER_PARALLEL_LOGGING) + set(LOGGER_PARALLEL_COMPILE_FLAG "-DLOGGER_PARALLEL_LOGGING") +endif() px4_add_module( MODULE modules__logger @@ -38,6 +41,7 @@ px4_add_module( COMPILE_FLAGS ${MAX_CUSTOM_OPT_LEVEL} -Wno-cast-align # TODO: fix and enable + ${LOGGER_PARALLEL_COMPILE_FLAG} SRCS logged_topics.cpp logger.cpp diff --git a/src/modules/logger/Kconfig b/src/modules/logger/Kconfig index db2c5a75460e..519afccd190c 100644 --- a/src/modules/logger/Kconfig +++ b/src/modules/logger/Kconfig @@ -10,3 +10,10 @@ menuconfig USER_LOGGER depends on BOARD_PROTECTED && MODULES_LOGGER ---help--- Put logger in userspace memory + +menuconfig LOGGER_PARALLEL_LOGGING + bool "Custom mavlink logging protocol in logger" + default n + depends on MODULES_LOGGER + ---help--- + Utilize custom mavlink logging protocol for speed up logging start phase diff --git a/src/modules/logger/log_writer.cpp b/src/modules/logger/log_writer.cpp index 8511f9e92a3d..402bc888486f 100644 --- a/src/modules/logger/log_writer.cpp +++ b/src/modules/logger/log_writer.cpp @@ -158,7 +158,7 @@ void LogWriter::thread_stop() } } -int LogWriter::write_message(LogType type, void *ptr, size_t size, uint64_t dropout_start) +int LogWriter::write_message(LogType type, void *ptr, size_t size, uint64_t dropout_start, bool reliable, bool wait) { int ret_file = 0, ret_mavlink = 0; @@ -167,7 +167,16 @@ int LogWriter::write_message(LogType type, void *ptr, size_t size, uint64_t drop } if (_log_writer_mavlink_for_write && type == LogType::Full) { - ret_mavlink = _log_writer_mavlink_for_write->write_message(ptr, size); +#ifdef LOGGER_PARALLEL_LOGGING + + if (reliable) { + ret_mavlink = _log_writer_mavlink_for_write->write_reliable_message(ptr, size, wait); + + } else +#endif + { + ret_mavlink = _log_writer_mavlink_for_write->write_message(ptr, size); + } } // file backend errors takes precedence diff --git a/src/modules/logger/log_writer.h b/src/modules/logger/log_writer.h index 9635d22dadf0..59ec75bc9945 100644 --- a/src/modules/logger/log_writer.h +++ b/src/modules/logger/log_writer.h @@ -73,6 +73,19 @@ class LogWriter void stop_log_mavlink(); +#ifdef LOGGER_PARALLEL_LOGGING + void wait_fifo_empty() + { + if (_log_writer_mavlink) { + _log_writer_mavlink->wait_fifo_count(0); + + while (_log_writer_mavlink->reliable_fifo_is_sending()) { + usleep(10000); + } + } + } +#endif + /** * whether logging is currently active or not (any of the selected backends). */ @@ -90,7 +103,8 @@ class LogWriter * -1 if not enough space in the buffer left (file backend), -2 mavlink backend failed * add type -> pass through, but not to mavlink if mission log */ - int write_message(LogType type, void *ptr, size_t size, uint64_t dropout_start = 0); + int write_message(LogType type, void *ptr, size_t size, uint64_t dropout_start = 0, bool reliable = false, + bool wait = false); /** * Select a backend, so that future calls to write_message() only write to the selected @@ -98,7 +112,7 @@ class LogWriter * @param backend */ void select_write_backend(Backend sel_backend); - void unselect_write_backend() { select_write_backend(BackendAll); } + void unselect_write_backend() { select_write_backend(_backend); } /* file logging methods */ @@ -154,7 +168,11 @@ class LogWriter { if (_log_writer_file) { _log_writer_file->set_need_reliable_transfer(need_reliable); } +#ifndef LOGGER_PARALLEL_LOGGING + if (_log_writer_mavlink) { _log_writer_mavlink->set_need_reliable_transfer(need_reliable && mavlink_backed_too); } + +#endif } bool need_reliable_transfer() const @@ -165,7 +183,6 @@ class LogWriter return false; } - #if defined(PX4_CRYPTO) void set_encryption_parameters(px4_crypto_algorithm_t algorithm, uint8_t key_idx, uint8_t exchange_key_idx) { diff --git a/src/modules/logger/log_writer_mavlink.cpp b/src/modules/logger/log_writer_mavlink.cpp index 5322aef074e0..971c6913a5c4 100644 --- a/src/modules/logger/log_writer_mavlink.cpp +++ b/src/modules/logger/log_writer_mavlink.cpp @@ -48,18 +48,142 @@ namespace logger LogWriterMavlink::LogWriterMavlink() { _ulog_stream_data.length = 0; +#ifdef LOGGER_PARALLEL_LOGGING + _ulog_stream_acked_data.length = 0; + pthread_mutex_init(&_fifo.mtx, nullptr); + pthread_cond_init(&_fifo.cv, nullptr); +#endif } +#ifdef LOGGER_PARALLEL_LOGGING +ReliableMsg *LogWriterMavlink::reliable_fifo_pop() +{ + pthread_mutex_lock(&_fifo.mtx); + PX4_DEBUG("reliable POP - wait.."); + + while (_fifo.empty() && !_fifo.sender_should_exit.load()) { + pthread_cond_wait(&_fifo.cv, &_fifo.mtx); + } + + PX4_DEBUG("reliable POP: signalled"); + ReliableMsg *node = _fifo.getHead(); + _fifo.remove(node); + + if (node) { + _fifo.sending = true; + } + + pthread_mutex_unlock(&_fifo.mtx); + return node; +} + +bool LogWriterMavlink::reliable_fifo_push(ReliableMsg *node) +{ + if (!node) { + PX4_ERR("[reliable_fifo_push] nullptr"); + return false; + } + + pthread_mutex_lock(&_fifo.mtx); + _fifo.add(node); + PX4_DEBUG("reliable PUSH - signal sender"); + pthread_cond_signal(&_fifo.cv); + pthread_mutex_unlock(&_fifo.mtx); + return true; +} + +void LogWriterMavlink::reliable_fifo_set_sender_idle() +{ + pthread_mutex_lock(&_fifo.mtx); + _fifo.sending = false; + pthread_mutex_unlock(&_fifo.mtx); +} + +bool LogWriterMavlink::reliable_fifo_is_sending() +{ + bool sending; + pthread_mutex_lock(&_fifo.mtx); + sending = _fifo.sending; + pthread_mutex_unlock(&_fifo.mtx); + return sending; +} + +void LogWriterMavlink::wait_fifo_count(size_t count) +{ + while (reliable_fifo_count() > count) { + usleep(30000); + } +} + +size_t LogWriterMavlink::reliable_fifo_count() +{ + size_t count = 0; + pthread_mutex_lock(&_fifo.mtx); + count = _fifo.size(); + pthread_mutex_unlock(&_fifo.mtx); + return count; +} + +void *LogWriterMavlink::mav_reliable_sender_helper(void *context) +{ + px4_prctl(PR_SET_NAME, "log_writer_mav_reliable_sender", px4_getpid()); + static_cast(context)->mav_reliable_sender(); + return nullptr; +} + +void LogWriterMavlink::mav_reliable_sender() +{ + while (!_fifo.sender_should_exit.load()) { + ReliableMsg *msg = reliable_fifo_pop(); + + PX4_DEBUG("[sender] - msg:%p", msg); + + if (msg) { + write_message(msg->data, msg->len, true); + PX4_DEBUG("[sender] - delete msg"); + delete msg; + reliable_fifo_set_sender_idle(); + } + } +} +#endif + bool LogWriterMavlink::init() { +#ifdef LOGGER_PARALLEL_LOGGING + pthread_attr_t thr_attr; + pthread_attr_init(&thr_attr); + pthread_attr_setstacksize(&thr_attr, PX4_STACK_ADJUSTED(8500)); + PX4_INFO("create mav_reliable_sender_thread"); + int ret = pthread_create(&_mav_reliable_sender_thread, &thr_attr, &LogWriterMavlink::mav_reliable_sender_helper, this); + + if (ret) { + PX4_ERR("mav_reliable_sender_thread create failed: %d", ret); + } + + pthread_attr_destroy(&thr_attr); +#endif return true; } LogWriterMavlink::~LogWriterMavlink() { +#ifdef LOGGER_PARALLEL_LOGGING + pthread_mutex_lock(&_fifo.mtx); + _fifo.sender_should_exit.store(true); + pthread_cond_signal(&_fifo.cv); + pthread_mutex_unlock(&_fifo.mtx); + pthread_join(_mav_reliable_sender_thread, nullptr); +#endif + if (orb_sub_valid(_ulog_stream_ack_sub)) { orb_unsubscribe(_ulog_stream_ack_sub); } + +#ifdef LOGGER_PARALLEL_LOGGING + pthread_mutex_destroy(&_fifo.mtx); + pthread_cond_destroy(&_fifo.cv); +#endif } void LogWriterMavlink::start_log() @@ -76,37 +200,60 @@ void LogWriterMavlink::start_log() _ulog_stream_data.length = 0; _ulog_stream_data.first_message_offset = 0; +#ifdef LOGGER_PARALLEL_LOGGING + _ulog_stream_acked_data.msg_sequence = 0; + _ulog_stream_acked_data.length = 0; + _ulog_stream_acked_data.first_message_offset = 0; +#endif _is_started = true; } void LogWriterMavlink::stop_log() { _ulog_stream_data.length = 0; +#ifdef LOGGER_PARALLEL_LOGGING + _ulog_stream_acked_data.length = 0; +#endif _is_started = false; } -int LogWriterMavlink::write_message(void *ptr, size_t size) +int LogWriterMavlink::write_message(void *ptr, size_t size, bool reliable) { if (!is_started()) { return 0; } - const uint8_t data_len = (uint8_t)sizeof(_ulog_stream_data.data); + ulog_stream_s *ulog_s_p; + +#ifdef LOGGER_PARALLEL_LOGGING + + if (reliable) { + ulog_s_p = &_ulog_stream_acked_data; + + } else { + ulog_s_p = &_ulog_stream_data; + } + +#else + ulog_s_p = &_ulog_stream_data; +#endif + + const uint8_t data_len = (uint8_t)sizeof(ulog_s_p->data); uint8_t *ptr_data = (uint8_t *)ptr; - if (_ulog_stream_data.first_message_offset == 255) { - _ulog_stream_data.first_message_offset = _ulog_stream_data.length; + if (ulog_s_p->first_message_offset == 255) { + ulog_s_p->first_message_offset = ulog_s_p->length; } while (size > 0) { - size_t send_len = math::min((size_t)data_len - _ulog_stream_data.length, size); - memcpy(_ulog_stream_data.data + _ulog_stream_data.length, ptr_data, send_len); - _ulog_stream_data.length += send_len; + size_t send_len = math::min((size_t)data_len - ulog_s_p->length, size); + memcpy(ulog_s_p->data + ulog_s_p->length, ptr_data, send_len); + ulog_s_p->length += send_len; ptr_data += send_len; size -= send_len; - if (_ulog_stream_data.length >= data_len) { - if (publish_message()) { + if (ulog_s_p->length >= data_len) { + if (publish_message(reliable)) { return -2; } } @@ -115,6 +262,30 @@ int LogWriterMavlink::write_message(void *ptr, size_t size) return 0; } +#ifdef LOGGER_PARALLEL_LOGGING +int LogWriterMavlink::write_reliable_message(void *ptr, size_t size, bool wait) +{ + if (wait) { + wait_fifo_count(LOGGER_RELIABLE_FIFO_WAIT_THRESHOLD); + } + + uint8_t *p = (uint8_t *) ptr; + + while (size > 0) { + size_t len = math::min(size, LOGGER_ULOG_STREAM_DATA_LEN); + size -= len; + ReliableMsg *msg = new ReliableMsg(); + memcpy(msg->data, p, len); + p += len; + msg->len = len; + reliable_fifo_push(msg); + } + + return 0; +} +#endif + +#ifndef LOGGER_PARALLEL_LOGGING void LogWriterMavlink::set_need_reliable_transfer(bool need_reliable) { if (!need_reliable && _need_reliable_transfer) { @@ -126,19 +297,46 @@ void LogWriterMavlink::set_need_reliable_transfer(bool need_reliable) _need_reliable_transfer = need_reliable; } +#endif -int LogWriterMavlink::publish_message() +int LogWriterMavlink::publish_message(bool reliable) { - _ulog_stream_data.timestamp = hrt_absolute_time(); - _ulog_stream_data.flags = 0; + ulog_stream_s *ulog_s_p; + +#ifdef LOGGER_PARALLEL_LOGGING + + if (reliable) { + ulog_s_p = &_ulog_stream_acked_data; + + } else { + ulog_s_p = &_ulog_stream_data; + } + +#else + ulog_s_p = &_ulog_stream_data; +#endif + + ulog_s_p->timestamp = hrt_absolute_time(); + ulog_s_p->flags = 0; + +#ifdef LOGGER_PARALLEL_LOGGING + + if (!reliable) { + _ulog_stream_pub.publish(*ulog_s_p); + + } else { + ulog_s_p->flags = ulog_s_p->FLAGS_NEED_ACK; + _ulog_stream_acked_pub.publish(*ulog_s_p); +#else if (_need_reliable_transfer) { - _ulog_stream_data.flags = _ulog_stream_data.FLAGS_NEED_ACK; + ulog_s_p->flags = ulog_s_p->FLAGS_NEED_ACK; } - _ulog_stream_pub.publish(_ulog_stream_data); + _ulog_stream_pub.publish(*ulog_s_p); if (_need_reliable_transfer) { +#endif // we need to wait for an ack. Note that this blocks the main logger thread, so if a file logging // is already running, it will miss samples. px4_pollfd_struct_t fds[1]; @@ -160,7 +358,7 @@ int LogWriterMavlink::publish_message() ulog_stream_ack_s ack; orb_copy(ORB_ID(ulog_stream_ack), _ulog_stream_ack_sub, &ack); - if (ack.msg_sequence == _ulog_stream_data.msg_sequence) { + if (ack.msg_sequence == ulog_s_p->msg_sequence) { got_ack = true; } @@ -178,9 +376,9 @@ int LogWriterMavlink::publish_message() PX4_DEBUG("got ack in %i ms", (int)(hrt_elapsed_time(&started) / 1000)); } - _ulog_stream_data.msg_sequence++; - _ulog_stream_data.length = 0; - _ulog_stream_data.first_message_offset = 255; + ulog_s_p->msg_sequence++; + ulog_s_p->length = 0; + ulog_s_p->first_message_offset = 255; return 0; } diff --git a/src/modules/logger/log_writer_mavlink.h b/src/modules/logger/log_writer_mavlink.h index 8aabe2d79235..a3722dbdb64e 100644 --- a/src/modules/logger/log_writer_mavlink.h +++ b/src/modules/logger/log_writer_mavlink.h @@ -37,12 +37,37 @@ #include #include #include +#include + +#ifdef LOGGER_PARALLEL_LOGGING +static constexpr size_t LOGGER_ULOG_STREAM_DATA_LEN {249}; // Size of ulog_stream data buffer +static constexpr size_t LOGGER_RELIABLE_FIFO_WAIT_THRESHOLD{10}; // Msg count threshold for wait fifo trigger +#endif namespace px4 { namespace logger { +#ifdef LOGGER_PARALLEL_LOGGING +class ReliableMsg : public ListNode +{ +public: + uint16_t len; + uint8_t data[LOGGER_ULOG_STREAM_DATA_LEN]; +}; + +class ReliableFifo : public List +{ +public: + pthread_mutex_t mtx; + pthread_cond_t cv; + px4::atomic_bool sender_should_exit{false}; + bool sending{false}; +}; + +#endif + /** * @class LogWriterMavlink * Writes logging data to uORB, and then sent via mavlink @@ -62,9 +87,14 @@ class LogWriterMavlink bool is_started() const { return _is_started; } /** @see LogWriter::write_message() */ - int write_message(void *ptr, size_t size); - + int write_message(void *ptr, size_t size, bool reliable = false); +#ifdef LOGGER_PARALLEL_LOGGING + int write_reliable_message(void *ptr, size_t size, bool wait = false); + bool reliable_fifo_is_sending(); + void wait_fifo_count(size_t count); +#else void set_need_reliable_transfer(bool need_reliable); +#endif bool need_reliable_transfer() const { @@ -72,15 +102,31 @@ class LogWriterMavlink } private: +#ifdef LOGGER_PARALLEL_LOGGING + static void *mav_reliable_sender_helper(void *context); + void mav_reliable_sender(); + + ReliableMsg *reliable_fifo_pop(); + bool reliable_fifo_push(ReliableMsg *node); + void reliable_fifo_set_sender_idle(); + + size_t reliable_fifo_count(); +#endif /** publish message, wait for ack if needed & reset message */ - int publish_message(); + int publish_message(bool reliable = false); ulog_stream_s _ulog_stream_data{}; uORB::Publication _ulog_stream_pub{ORB_ID(ulog_stream)}; orb_sub_t _ulog_stream_ack_sub{ORB_SUB_INVALID}; bool _need_reliable_transfer{false}; bool _is_started{false}; +#ifdef LOGGER_PARALLEL_LOGGING + ulog_stream_s _ulog_stream_acked_data {}; + uORB::Publication _ulog_stream_acked_pub{ORB_ID(ulog_stream_acked)}; + ReliableFifo _fifo; + pthread_t _mav_reliable_sender_thread = 0; +#endif }; } diff --git a/src/modules/logger/logger.cpp b/src/modules/logger/logger.cpp index 50753c1e697f..18aaee5893a3 100644 --- a/src/modules/logger/logger.cpp +++ b/src/modules/logger/logger.cpp @@ -64,6 +64,7 @@ #include #include #include +#include //#define DBGPRINT //write status output every few seconds @@ -400,6 +401,18 @@ Logger::Logger(LogWriter::Backend backend, size_t buffer_size, uint32_t log_inte PX4_ERR("Failed to find topic %s", poll_topic_name); } } + +#ifdef LOGGER_PARALLEL_LOGGING + + if (pthread_key_create(&pthread_data_key, NULL) < 0) { + PX4_ERR("Creating pthread data key failed"); + pthread_setspecific(pthread_data_key, (void *)nullptr); + + } else { + pthread_setspecific(pthread_data_key, (void *)&_thread_main_data); + } + +#endif } Logger::~Logger() @@ -593,7 +606,7 @@ void Logger::run() return; } - //all topics added. Get required message buffer size + //Get required message buffer size int max_msg_size = 0; for (int sub = 0; sub < _num_subscriptions; ++sub) { @@ -1187,11 +1200,11 @@ void Logger::handle_vehicle_command_update() } } -bool Logger::write_message(LogType type, void *ptr, size_t size) +bool Logger::write_message(LogType type, void *ptr, size_t size, bool reliable, bool wait) { Statistics &stats = _statistics[(int)type]; - if (_writer.write_message(type, ptr, size, stats.dropout_start) != -1) { + if (_writer.write_message(type, ptr, size, stats.dropout_start, reliable, wait) != -1) { if (stats.dropout_start) { float dropout_duration = (float)(hrt_elapsed_time(&stats.dropout_start) / 1000) / 1.e3f; @@ -1455,6 +1468,35 @@ void Logger::stop_log_file(LogType type) _writer.stop_log_file(type); } +#ifdef LOGGER_PARALLEL_LOGGING +void *Logger::mav_start_steps_helper(void *context) +{ + px4_prctl(PR_SET_NAME, "log_writer_mavlink_headers", px4_getpid()); + static_cast(context)->mav_start_steps(); + return nullptr; +} + +void Logger::mav_start_steps() +{ + /* This is running in separate thread to keep logging data while sending header&descriptions */ + _thread_mav_start_sender_data.wait_for_ack = true; + pthread_setspecific(pthread_data_key, (void *)&_thread_mav_start_sender_data); + + PX4_INFO("Write static data - Begin"); + write_header(LogType::Full); + write_version(LogType::Full); + write_formats(LogType::Full); + write_parameters(LogType::Full); + write_parameter_defaults(LogType::Full); + write_perf_data(PrintLoadReason::Preflight); + write_console_output(); + write_events_file(LogType::Full); + write_excluded_optional_topics(LogType::Full); + write_all_add_logged_msg(LogType::Full); + PX4_INFO("Write static data - End"); +} +#endif + void Logger::start_log_mavlink() { if (!can_start_mavlink_log()) { @@ -1472,6 +1514,33 @@ void Logger::start_log_mavlink() _writer.start_log_mavlink(); _writer.select_write_backend(LogWriter::BackendMavlink); + +#ifdef LOGGER_PARALLEL_LOGGING + + for (int sub = 0; sub < _num_subscriptions; ++sub) { + if (_subscriptions[sub].valid() && _subscriptions[sub].msg_id == MSG_ID_INVALID) { + if (_next_topic_id == MSG_ID_INVALID) { + // if we land here an uint8 is too small -> switch to uint16 + PX4_ERR("limit for _next_topic_id reached"); + return; + } + + _subscriptions[sub].msg_id = _next_topic_id++; + } + } + + pthread_attr_t thr_attr; + pthread_attr_init(&thr_attr); + pthread_attr_setstacksize(&thr_attr, PX4_STACK_ADJUSTED(8500)); + PX4_INFO("create mav_start_thread"); + int ret = pthread_create(&_mav_start_thread, &thr_attr, &Logger::mav_start_steps_helper, this); + + if (ret) { + PX4_WARN("mav_start_thread create failed: %d", ret); + } + + pthread_attr_destroy(&thr_attr); +#else _writer.set_need_reliable_transfer(true); write_header(LogType::Full); write_version(LogType::Full); @@ -1484,9 +1553,11 @@ void Logger::start_log_mavlink() write_excluded_optional_topics(LogType::Full); write_all_add_logged_msg(LogType::Full); _writer.set_need_reliable_transfer(false); +#endif _writer.unselect_write_backend(); _writer.notify(); + PX4_INFO("Mavlink logging started"); adjust_subscription_updates(); // redistribute updates as sending the header can take some time } @@ -1499,11 +1570,16 @@ void Logger::stop_log_mavlink() _writer.select_write_backend(LogWriter::BackendMavlink); _writer.set_need_reliable_transfer(true); write_perf_data(PrintLoadReason::Postflight); +#ifdef LOGGER_PARALLEL_LOGGING + _writer.wait_fifo_empty(); +#endif _writer.set_need_reliable_transfer(false); _writer.unselect_write_backend(); _writer.notify(); _writer.stop_log_mavlink(); } + + PX4_INFO("Mavlink logging stopped"); } struct perf_callback_data_t { @@ -1704,7 +1780,12 @@ void Logger::write_format(LogType type, const orb_metadata &meta, WrittenFormats size_t msg_size = sizeof(msg) - sizeof(msg.format) + format_len; msg.msg_size = msg_size - ULOG_MSG_HEADER_LEN; +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); + write_message(type, &msg, msg_size, true, th_data->wait_for_ack); +#else write_message(type, &msg, msg_size); +#endif if (level > 1 && !written_formats.push_back(&meta)) { PX4_ERR("Array too small"); @@ -1852,7 +1933,12 @@ void Logger::write_add_logged_msg(LogType type, LoggerSubscription &subscription bool prev_reliable = _writer.need_reliable_transfer(); _writer.set_need_reliable_transfer(true); +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); + write_message(type, &msg, msg_size, true, th_data->wait_for_ack); +#else write_message(type, &msg, msg_size); +#endif _writer.set_need_reliable_transfer(prev_reliable); } @@ -1875,7 +1961,12 @@ void Logger::write_info(LogType type, const char *name, const char *value) msg.msg_size = msg_size - ULOG_MSG_HEADER_LEN; +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); + write_message(type, buffer, msg_size, true, th_data->wait_for_ack); +#else write_message(type, buffer, msg_size); +#endif } _writer.unlock(); @@ -1901,7 +1992,12 @@ void Logger::write_info_multiple(LogType type, const char *name, const char *val msg.msg_size = msg_size - ULOG_MSG_HEADER_LEN; +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); + write_message(type, buffer, msg_size, true, th_data->wait_for_ack); +#else write_message(type, buffer, msg_size); +#endif } else { PX4_ERR("info_multiple str too long (%" PRIu8 "), key=%s", msg.key_len, msg.key_value_str); @@ -1930,6 +2026,10 @@ void Logger::write_info_multiple(LogType type, const char *name, int fd) int file_offset = 0; +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); +#endif + while (file_offset < file_size) { _writer.lock(); @@ -1946,7 +2046,11 @@ void Logger::write_info_multiple(LogType type, const char *name, int fd) msg_size += read_length; msg.msg_size = msg_size - ULOG_MSG_HEADER_LEN; +#ifdef LOGGER_PARALLEL_LOGGING + write_message(type, buffer, msg_size, true, th_data->wait_for_ack); +#else write_message(type, buffer, msg_size); +#endif file_offset += ret; } else { @@ -1989,7 +2093,12 @@ void Logger::write_info_template(LogType type, const char *name, T value, const msg.msg_size = msg_size - ULOG_MSG_HEADER_LEN; +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); + write_message(type, buffer, msg_size, true, th_data->wait_for_ack); +#else write_message(type, buffer, msg_size); +#endif _writer.unlock(); } @@ -2018,7 +2127,13 @@ void Logger::write_header(LogType type) header.magic[7] = 0x01; //file version 1 header.timestamp = hrt_absolute_time(); _writer.lock(); + +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); + write_message(type, &header, sizeof(header), true, th_data->wait_for_ack); +#else write_message(type, &header, sizeof(header)); +#endif // write the Flags message: this MUST be written right after the ulog header ulog_message_flag_bits_s flag_bits{}; @@ -2028,7 +2143,11 @@ void Logger::write_header(LogType type) flag_bits.msg_size = sizeof(flag_bits) - ULOG_MSG_HEADER_LEN; flag_bits.msg_type = static_cast(ULogMessageType::FLAG_BITS); +#ifdef LOGGER_PARALLEL_LOGGING + write_message(type, &flag_bits, sizeof(flag_bits), true, th_data->wait_for_ack); +#else write_message(type, &flag_bits, sizeof(flag_bits)); +#endif _writer.unlock(); } @@ -2125,6 +2244,9 @@ void Logger::write_parameter_defaults(LogType type) msg.msg_type = static_cast(ULogMessageType::PARAMETER_DEFAULT); int param_idx = 0; param_t param = 0; +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); +#endif do { // skip over all parameters which are not used @@ -2187,20 +2309,32 @@ void Logger::write_parameter_defaults(LogType type) if (memcmp(&value, &default_value, value_size) != 0) { memcpy(&buffer[msg_size - value_size], default_value, value_size); msg.default_types = ulog_parameter_default_type_t::current_setup | ulog_parameter_default_type_t::system; +#ifdef LOGGER_PARALLEL_LOGGING + write_message(type, buffer, msg_size, true, th_data->wait_for_ack); +#else write_message(type, buffer, msg_size); +#endif } } else { if (memcmp(&value, &default_value, value_size) != 0) { memcpy(&buffer[msg_size - value_size], default_value, value_size); msg.default_types = ulog_parameter_default_type_t::current_setup; +#ifdef LOGGER_PARALLEL_LOGGING + write_message(type, buffer, msg_size, true, th_data->wait_for_ack); +#else write_message(type, buffer, msg_size); +#endif } if (memcmp(&value, &system_default_value, value_size) != 0) { memcpy(&buffer[msg_size - value_size], system_default_value, value_size); msg.default_types = ulog_parameter_default_type_t::system; +#ifdef LOGGER_PARALLEL_LOGGING + write_message(type, buffer, msg_size, true, th_data->wait_for_ack); +#else write_message(type, buffer, msg_size); +#endif } } } @@ -2220,6 +2354,10 @@ void Logger::write_parameters(LogType type) int param_idx = 0; param_t param = 0; +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); +#endif + do { // skip over all parameters which are not used do { @@ -2271,7 +2409,11 @@ void Logger::write_parameters(LogType type) msg.msg_size = msg_size - ULOG_MSG_HEADER_LEN; +#ifdef LOGGER_PARALLEL_LOGGING + write_message(type, buffer, msg_size, true, th_data->wait_for_ack); +#else write_message(type, buffer, msg_size); +#endif } } while ((param != PARAM_INVALID) && (param_idx < (int) param_count())); @@ -2289,6 +2431,10 @@ void Logger::write_changed_parameters(LogType type) int param_idx = 0; param_t param = 0; +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); +#endif + do { // skip over all parameters which are not used do { @@ -2342,7 +2488,11 @@ void Logger::write_changed_parameters(LogType type) // msg_size is now 1 (msg_type) + 2 (msg_size) + 1 (key_len) + key_len + value_size msg.msg_size = msg_size - ULOG_MSG_HEADER_LEN; +#ifdef LOGGER_PARALLEL_LOGGING + write_message(type, buffer, msg_size, true, th_data->wait_for_ack); +#else write_message(type, buffer, msg_size); +#endif } } while ((param != PARAM_INVALID) && (param_idx < (int) param_count())); diff --git a/src/modules/logger/logger.h b/src/modules/logger/logger.h index 65a9f1d179b3..e215401d0861 100644 --- a/src/modules/logger/logger.h +++ b/src/modules/logger/logger.h @@ -69,6 +69,14 @@ namespace px4 namespace logger { +#ifdef LOGGER_PARALLEL_LOGGING +typedef struct thread_data { + bool wait_for_ack{false}; +} thread_data_t; + +pthread_key_t pthread_data_key; +#endif + static constexpr uint8_t MSG_ID_INVALID = UINT8_MAX; struct LoggerSubscription : public uORB::SubscriptionInterval { @@ -221,6 +229,16 @@ class Logger : public ModuleBase, public ModuleParams void stop_log_mavlink(); +#ifdef LOGGER_PARALLEL_LOGGING + /** + * Run in separate thread to continue data logging while sending header&descriptions + */ + void mav_start_steps(); + + static void *mav_start_steps_helper(void *); + +#endif + /** check if mavlink logging can be started */ bool can_start_mavlink_log() const { @@ -290,7 +308,7 @@ class Logger : public ModuleBase, public ModuleParams * Must be called with _writer.lock() held. * @return true if data written, false otherwise (on overflow) */ - bool write_message(LogType type, void *ptr, size_t size); + bool write_message(LogType type, void *ptr, size_t size, bool reliable = false, bool wait = false); /** * Add topic subscriptions from SD file if it exists, otherwise add topics based on the configured profile. @@ -341,6 +359,12 @@ class Logger : public ModuleBase, public ModuleParams void adjust_subscription_updates(); +#ifdef LOGGER_PARALLEL_LOGGING + pthread_t _mav_start_thread {0}; + thread_data_t _thread_main_data; + thread_data_t _thread_mav_start_sender_data; +#endif + uint8_t *_msg_buffer{nullptr}; int _msg_buffer_len{0}; diff --git a/src/modules/logger/parallel_mavlink_logging.md b/src/modules/logger/parallel_mavlink_logging.md new file mode 100644 index 000000000000..d506a53aba65 --- /dev/null +++ b/src/modules/logger/parallel_mavlink_logging.md @@ -0,0 +1,118 @@ +# Parallel data logging over MAVLink + +### Problem +Starting flight logging over mavlink is a slow operation. In case logging is started from ARMING event the log file most likely have several seconds long missing trace period from the takeoff part. This is caused by the protocol that the log is first filled with header and all the static definitions, configuration, boot-log, etc. which are sent using *reliable_transfer*, meaning that every log message/package the px4 logger is sending needs to be acknowleged by the receiver. The round-trip time for this may be long depending on the MAVLink transfer media and the module receiving the log data in the other system behind the Mavlink. + +### Solution +To speed up logging startup and reduce that blackout period in the beginning of the log, the parallel data logging option is implemented. The trick here is that the actual uorb data logging is started as soon as possible when logging is started and the static definitions/configs are sent at the same time through another channel. The receiver end reads both streams and store them to two separate files and in the end of logging it combines them into one ulog file by appending the topic data file in the end of static definition data file. + + +This new protocol is not backward compatible, so BOTH px4 logger and the receiver MUST or MUST NOT implement the parallel logging to make it work! + + +### Original protocol +For Logger, there is one **ulog_stream** uorb channel for transfer data to receiver side and another **ulog_stream_ack** for receiveing ack events. First it collects all the static definitions and send them using *reliable_transfer* method. After static defs are sent it starts sending actual dynamic topic data. + +Mavlink_uorb module listen to **ulog_stream** topic and depending on FLAGS_NEED_ACK flag in the topic msg it sends either **MAVLINK_MSG_LOGGING_DATA** or **MAVLINK_MSG_LOGGING_DATA_ACKED** msg over mavlink. +If **MAVLINK_MSG_LOGGING_DATA_ACKED** is sent it starts waiting for **MAVLINK_MSG_LOGGING_DATA_ACK** and continue sending only after it receives the ack message. publish it to **ulog_stream_ack** + +The receiver reads **MAVLINK_MSG_LOGGING_DATA/_ACKED** messages and store them to ulg file. If **MAVLINK_MSG_LOGGING_DATA_ACKED** is received then **MAVLINK_MSG_LOGGING_DATA_ACK** is sent back to PX4. + +``` ++----------------------------------------------+ +| Logger | +| | +|----------------------------------------------| +| Static data | +| Dyn data | ++----------------------------------------------+ + | ^ + | Publish | Subs + | | + V | ++----------------------------------------------+ +| Mavlink_ulog | +| | +| | +| | ++----------------------------------------------+ + | ^ + Send | +MAVLINK_MSG_LOGGING_DATA Recv +MAVLINK_MSG_LOGGING_DATA_ACKED MAVLINK_MSG_ + | LOGGING_DATA_ACK + V | ++----------------------------------------------+ +| Receiver | +|----------------------------------------------| +| Static data | +| Dyn data | ++----------------------------------------------+ + | + | + | + V ++------------+ +| .ulg file | ++------------+ + +``` + + +### Parallel logging protocol +Logger spawns new thread for sending Static definitions data (reliable transfer enabled) and continues to send dynamic topic data at the same time. Static data is published into **ulog_stream_acked** topic and the dynamic data into **ulog_stream** topic. The thread sending dynamic data does not need to wait anything, but continuously sending the data without waiting any ack. The static data sender thread publishes one message at a time and waits for ack until publishing next one. + +mavlink_uorb reads both **ulog_stream** and **ulog_stream_acked** streams and sends either **MAVLINK_MSG_LOGGING_DATA** or **MAVLINK_MSG_LOGGING_DATA_ACKED** mavlink msgs accordingly. Also it listens to **MAVLINK_MSG_LOGGING_DATA_ACK** messages and publish to **ulog_stream_ack** if one received. +Sending **MAVLINK_MSG_LOGGING_DATA_ACKED** raises a flag to wait for ack. A **MAVLINK_MSG_LOGGING_DATA_ACK** message with expected sequence number shall be received before next _acked message can be sent, but the **MAVLINK_MSG_LOGGING_DATA** messages are always sent in parallel of that without any blocking. + +Receiver listens to both **MAVLINK_MSG_LOGGING_DATA** and **MAVLINK_MSG_LOGGING_DATA_ACKED** messages and write them to separate files accordingly: _DATA into .data file and _DATA_ACKED into .ulg file. For each **MAVLINK_MSG_LOGGING_DATA_ACKED** message it sends back a **MAVLINK_MSG_LOGGING_DATA_ACK** message. When logging is stopped, the receiver append the .data file content into the end of .ulg file and removes the .data file. + +``` ++----------------------------------------------+ +| Logger | +| | +|-----------+ +--------------------------| +| Dyn data | | Static data | ++----------------------------------------------+ + | | ^ + | Publish | Publish | Subs + | | | + v v | ++----------------------------------------------+ +| MAvlink_ulog | +| | +| | +| | ++----------------------------------------------+ + | | ^ +MAVLINK_MSG_ MAVLINK_MSG_ | +LOGGING_DATA LOGGING_DATA_ACKED | + | | MAVLINK_MSG_ + | | LOGGING_DATA_ACK + v v | ++----------------------------------------------+ +| Receiver | +|-----------+ +--------------------------| +| Dyn data | | Static data | ++----------------------------------------------+ + | | + | | + | | + v v ++------------+ +------------+ +| .data file | | .ulg file | +| DATA | | DEFS | ++------------+ +------------+ + | | + | ---- Stop logging---- | + | | + +-------------------------->+ + | + v + +------------+ + | .ulg file | + | DEFS | + | DATA | + +------------+ + +```