diff --git a/boards/ssrc/saluki-v2 b/boards/ssrc/saluki-v2 index 6e5545d6cb62..b2b18de1897a 160000 --- a/boards/ssrc/saluki-v2 +++ b/boards/ssrc/saluki-v2 @@ -1 +1 @@ -Subproject commit 6e5545d6cb623d832eb1ec397566780dc41590fe +Subproject commit b2b18de1897aa2b204e573a6d33823db68204899 diff --git a/msg/CMakeLists.txt b/msg/CMakeLists.txt index 049ce8028ed3..fdbe1b94af24 100644 --- a/msg/CMakeLists.txt +++ b/msg/CMakeLists.txt @@ -192,7 +192,6 @@ set(msg_files UavcanParameterValue.msg UlogStream.msg UlogStreamAck.msg - UlogStreamAcked.msg VehicleAcceleration.msg VehicleAirData.msg VehicleAngularAccelerationSetpoint.msg diff --git a/msg/UlogStream.msg b/msg/UlogStream.msg index d206b4a42595..471683417674 100644 --- a/msg/UlogStream.msg +++ b/msg/UlogStream.msg @@ -17,3 +17,4 @@ uint8 flags # see FLAGS_* uint8[249] data # ulog data uint8 ORB_QUEUE_LENGTH = 16 # TODO: we might be able to reduce this if mavlink polled on the topic +# TOPICS ulog_stream ulog_stream_acked diff --git a/msg/UlogStreamAcked.msg b/msg/UlogStreamAcked.msg deleted file mode 100644 index 136b7b2dfe76..000000000000 --- a/msg/UlogStreamAcked.msg +++ /dev/null @@ -1,19 +0,0 @@ -# Message to stream ULog data from the logger. Corresponds to the LOGGING_DATA_ACKED -# mavlink message - -uint64 timestamp # time since system start (microseconds) - -# flags bitmasks -uint8 FLAGS_NEED_ACK = 1 # if set, this message requires to be acked. - # Acked messages are published synchronous: a - # publisher waits for an ack before sending the - # next message - -uint8 length # length of data -uint8 first_message_offset # offset into data where first message starts. This - # can be used for recovery, when a previous message got lost -uint16 msg_sequence # allows determine drops -uint8 flags # see FLAGS_* -uint8[249] data # ulog data - -uint8 ORB_QUEUE_LENGTH = 16 # TODO: we might be able to reduce this if mavlink polled on the topic diff --git a/src/modules/logger/CMakeLists.txt b/src/modules/logger/CMakeLists.txt index 8f06d20d54b6..477c87df976e 100644 --- a/src/modules/logger/CMakeLists.txt +++ b/src/modules/logger/CMakeLists.txt @@ -31,7 +31,7 @@ # ############################################################################ if(CONFIG_LOGGER_PARALLEL_LOGGING) - set(LOGGER_PARALLEL_COMPILE_FLAG "-DLOGGER_PARALLEL_LOGGING") + set(LOGGER_PARALLEL_COMPILE_FLAG "-DLOGGER_PARALLEL_LOGGING") endif() px4_add_module( diff --git a/src/modules/logger/Kconfig b/src/modules/logger/Kconfig index 53e3566cd722..519afccd190c 100644 --- a/src/modules/logger/Kconfig +++ b/src/modules/logger/Kconfig @@ -14,6 +14,6 @@ menuconfig USER_LOGGER menuconfig LOGGER_PARALLEL_LOGGING bool "Custom mavlink logging protocol in logger" default n - depends on MAVLINK_PARALLEL_LOGGING && MODULES_LOGGER + depends on MODULES_LOGGER ---help--- Utilize custom mavlink logging protocol for speed up logging start phase diff --git a/src/modules/logger/log_writer.cpp b/src/modules/logger/log_writer.cpp index a16dd93f00f3..402bc888486f 100644 --- a/src/modules/logger/log_writer.cpp +++ b/src/modules/logger/log_writer.cpp @@ -158,7 +158,7 @@ void LogWriter::thread_stop() } } -int LogWriter::write_message(LogType type, void *ptr, size_t size, uint64_t dropout_start, bool acked) +int LogWriter::write_message(LogType type, void *ptr, size_t size, uint64_t dropout_start, bool reliable, bool wait) { int ret_file = 0, ret_mavlink = 0; @@ -167,7 +167,16 @@ int LogWriter::write_message(LogType type, void *ptr, size_t size, uint64_t drop } if (_log_writer_mavlink_for_write && type == LogType::Full) { - ret_mavlink = _log_writer_mavlink_for_write->write_message(ptr, size, acked); +#ifdef LOGGER_PARALLEL_LOGGING + + if (reliable) { + ret_mavlink = _log_writer_mavlink_for_write->write_reliable_message(ptr, size, wait); + + } else +#endif + { + ret_mavlink = _log_writer_mavlink_for_write->write_message(ptr, size); + } } // file backend errors takes precedence diff --git a/src/modules/logger/log_writer.h b/src/modules/logger/log_writer.h index ff256062359e..59ec75bc9945 100644 --- a/src/modules/logger/log_writer.h +++ b/src/modules/logger/log_writer.h @@ -73,6 +73,19 @@ class LogWriter void stop_log_mavlink(); +#ifdef LOGGER_PARALLEL_LOGGING + void wait_fifo_empty() + { + if (_log_writer_mavlink) { + _log_writer_mavlink->wait_fifo_count(0); + + while (_log_writer_mavlink->reliable_fifo_is_sending()) { + usleep(10000); + } + } + } +#endif + /** * whether logging is currently active or not (any of the selected backends). */ @@ -90,7 +103,8 @@ class LogWriter * -1 if not enough space in the buffer left (file backend), -2 mavlink backend failed * add type -> pass through, but not to mavlink if mission log */ - int write_message(LogType type, void *ptr, size_t size, uint64_t dropout_start = 0, bool acked = false); + int write_message(LogType type, void *ptr, size_t size, uint64_t dropout_start = 0, bool reliable = false, + bool wait = false); /** * Select a backend, so that future calls to write_message() only write to the selected @@ -98,7 +112,7 @@ class LogWriter * @param backend */ void select_write_backend(Backend sel_backend); - void unselect_write_backend() { select_write_backend(BackendAll); } + void unselect_write_backend() { select_write_backend(_backend); } /* file logging methods */ @@ -154,7 +168,11 @@ class LogWriter { if (_log_writer_file) { _log_writer_file->set_need_reliable_transfer(need_reliable); } +#ifndef LOGGER_PARALLEL_LOGGING + if (_log_writer_mavlink) { _log_writer_mavlink->set_need_reliable_transfer(need_reliable && mavlink_backed_too); } + +#endif } bool need_reliable_transfer() const @@ -165,7 +183,6 @@ class LogWriter return false; } - #if defined(PX4_CRYPTO) void set_encryption_parameters(px4_crypto_algorithm_t algorithm, uint8_t key_idx, uint8_t exchange_key_idx) { diff --git a/src/modules/logger/log_writer_mavlink.cpp b/src/modules/logger/log_writer_mavlink.cpp index 5446ef0a19eb..971c6913a5c4 100644 --- a/src/modules/logger/log_writer_mavlink.cpp +++ b/src/modules/logger/log_writer_mavlink.cpp @@ -48,19 +48,142 @@ namespace logger LogWriterMavlink::LogWriterMavlink() { _ulog_stream_data.length = 0; +#ifdef LOGGER_PARALLEL_LOGGING _ulog_stream_acked_data.length = 0; + pthread_mutex_init(&_fifo.mtx, nullptr); + pthread_cond_init(&_fifo.cv, nullptr); +#endif +} + +#ifdef LOGGER_PARALLEL_LOGGING +ReliableMsg *LogWriterMavlink::reliable_fifo_pop() +{ + pthread_mutex_lock(&_fifo.mtx); + PX4_DEBUG("reliable POP - wait.."); + + while (_fifo.empty() && !_fifo.sender_should_exit.load()) { + pthread_cond_wait(&_fifo.cv, &_fifo.mtx); + } + + PX4_DEBUG("reliable POP: signalled"); + ReliableMsg *node = _fifo.getHead(); + _fifo.remove(node); + + if (node) { + _fifo.sending = true; + } + + pthread_mutex_unlock(&_fifo.mtx); + return node; } +bool LogWriterMavlink::reliable_fifo_push(ReliableMsg *node) +{ + if (!node) { + PX4_ERR("[reliable_fifo_push] nullptr"); + return false; + } + + pthread_mutex_lock(&_fifo.mtx); + _fifo.add(node); + PX4_DEBUG("reliable PUSH - signal sender"); + pthread_cond_signal(&_fifo.cv); + pthread_mutex_unlock(&_fifo.mtx); + return true; +} + +void LogWriterMavlink::reliable_fifo_set_sender_idle() +{ + pthread_mutex_lock(&_fifo.mtx); + _fifo.sending = false; + pthread_mutex_unlock(&_fifo.mtx); +} + +bool LogWriterMavlink::reliable_fifo_is_sending() +{ + bool sending; + pthread_mutex_lock(&_fifo.mtx); + sending = _fifo.sending; + pthread_mutex_unlock(&_fifo.mtx); + return sending; +} + +void LogWriterMavlink::wait_fifo_count(size_t count) +{ + while (reliable_fifo_count() > count) { + usleep(30000); + } +} + +size_t LogWriterMavlink::reliable_fifo_count() +{ + size_t count = 0; + pthread_mutex_lock(&_fifo.mtx); + count = _fifo.size(); + pthread_mutex_unlock(&_fifo.mtx); + return count; +} + +void *LogWriterMavlink::mav_reliable_sender_helper(void *context) +{ + px4_prctl(PR_SET_NAME, "log_writer_mav_reliable_sender", px4_getpid()); + static_cast(context)->mav_reliable_sender(); + return nullptr; +} + +void LogWriterMavlink::mav_reliable_sender() +{ + while (!_fifo.sender_should_exit.load()) { + ReliableMsg *msg = reliable_fifo_pop(); + + PX4_DEBUG("[sender] - msg:%p", msg); + + if (msg) { + write_message(msg->data, msg->len, true); + PX4_DEBUG("[sender] - delete msg"); + delete msg; + reliable_fifo_set_sender_idle(); + } + } +} +#endif + bool LogWriterMavlink::init() { +#ifdef LOGGER_PARALLEL_LOGGING + pthread_attr_t thr_attr; + pthread_attr_init(&thr_attr); + pthread_attr_setstacksize(&thr_attr, PX4_STACK_ADJUSTED(8500)); + PX4_INFO("create mav_reliable_sender_thread"); + int ret = pthread_create(&_mav_reliable_sender_thread, &thr_attr, &LogWriterMavlink::mav_reliable_sender_helper, this); + + if (ret) { + PX4_ERR("mav_reliable_sender_thread create failed: %d", ret); + } + + pthread_attr_destroy(&thr_attr); +#endif return true; } LogWriterMavlink::~LogWriterMavlink() { +#ifdef LOGGER_PARALLEL_LOGGING + pthread_mutex_lock(&_fifo.mtx); + _fifo.sender_should_exit.store(true); + pthread_cond_signal(&_fifo.cv); + pthread_mutex_unlock(&_fifo.mtx); + pthread_join(_mav_reliable_sender_thread, nullptr); +#endif + if (orb_sub_valid(_ulog_stream_ack_sub)) { orb_unsubscribe(_ulog_stream_ack_sub); } + +#ifdef LOGGER_PARALLEL_LOGGING + pthread_mutex_destroy(&_fifo.mtx); + pthread_cond_destroy(&_fifo.cv); +#endif } void LogWriterMavlink::start_log() @@ -77,21 +200,24 @@ void LogWriterMavlink::start_log() _ulog_stream_data.length = 0; _ulog_stream_data.first_message_offset = 0; +#ifdef LOGGER_PARALLEL_LOGGING _ulog_stream_acked_data.msg_sequence = 0; _ulog_stream_acked_data.length = 0; _ulog_stream_acked_data.first_message_offset = 0; - +#endif _is_started = true; } void LogWriterMavlink::stop_log() { _ulog_stream_data.length = 0; +#ifdef LOGGER_PARALLEL_LOGGING _ulog_stream_acked_data.length = 0; +#endif _is_started = false; } -int LogWriterMavlink::write_message(void *ptr, size_t size, bool acked) +int LogWriterMavlink::write_message(void *ptr, size_t size, bool reliable) { if (!is_started()) { return 0; @@ -99,13 +225,19 @@ int LogWriterMavlink::write_message(void *ptr, size_t size, bool acked) ulog_stream_s *ulog_s_p; - if (acked) { +#ifdef LOGGER_PARALLEL_LOGGING + + if (reliable) { ulog_s_p = &_ulog_stream_acked_data; } else { ulog_s_p = &_ulog_stream_data; } +#else + ulog_s_p = &_ulog_stream_data; +#endif + const uint8_t data_len = (uint8_t)sizeof(ulog_s_p->data); uint8_t *ptr_data = (uint8_t *)ptr; @@ -121,7 +253,7 @@ int LogWriterMavlink::write_message(void *ptr, size_t size, bool acked) size -= send_len; if (ulog_s_p->length >= data_len) { - if (publish_message(acked)) { + if (publish_message(reliable)) { return -2; } } @@ -130,10 +262,32 @@ int LogWriterMavlink::write_message(void *ptr, size_t size, bool acked) return 0; } -void LogWriterMavlink::set_need_reliable_transfer(bool need_reliable) +#ifdef LOGGER_PARALLEL_LOGGING +int LogWriterMavlink::write_reliable_message(void *ptr, size_t size, bool wait) { -#ifndef LOGGER_PARALLEL_LOGGING + if (wait) { + wait_fifo_count(LOGGER_RELIABLE_FIFO_WAIT_THRESHOLD); + } + + uint8_t *p = (uint8_t *) ptr; + + while (size > 0) { + size_t len = math::min(size, LOGGER_ULOG_STREAM_DATA_LEN); + size -= len; + ReliableMsg *msg = new ReliableMsg(); + memcpy(msg->data, p, len); + p += len; + msg->len = len; + reliable_fifo_push(msg); + } + return 0; +} +#endif + +#ifndef LOGGER_PARALLEL_LOGGING +void LogWriterMavlink::set_need_reliable_transfer(bool need_reliable) +{ if (!need_reliable && _need_reliable_transfer) { if (_ulog_stream_data.length > 0) { // make sure to send previous data using reliable transfer @@ -142,26 +296,32 @@ void LogWriterMavlink::set_need_reliable_transfer(bool need_reliable) } _need_reliable_transfer = need_reliable; -#endif } +#endif -int LogWriterMavlink::publish_message(bool acked) +int LogWriterMavlink::publish_message(bool reliable) { ulog_stream_s *ulog_s_p; - if (acked) { +#ifdef LOGGER_PARALLEL_LOGGING + + if (reliable) { ulog_s_p = &_ulog_stream_acked_data; } else { ulog_s_p = &_ulog_stream_data; } +#else + ulog_s_p = &_ulog_stream_data; +#endif + ulog_s_p->timestamp = hrt_absolute_time(); ulog_s_p->flags = 0; #ifdef LOGGER_PARALLEL_LOGGING - if (!acked) { + if (!reliable) { _ulog_stream_pub.publish(*ulog_s_p); } else { @@ -170,10 +330,10 @@ int LogWriterMavlink::publish_message(bool acked) #else if (_need_reliable_transfer) { - _ulog_stream_data.flags = _ulog_stream_data.FLAGS_NEED_ACK; + ulog_s_p->flags = ulog_s_p->FLAGS_NEED_ACK; } - _ulog_stream_pub.publish(_ulog_stream_data); + _ulog_stream_pub.publish(*ulog_s_p); if (_need_reliable_transfer) { #endif diff --git a/src/modules/logger/log_writer_mavlink.h b/src/modules/logger/log_writer_mavlink.h index 9b1cfa073892..a3722dbdb64e 100644 --- a/src/modules/logger/log_writer_mavlink.h +++ b/src/modules/logger/log_writer_mavlink.h @@ -36,14 +36,38 @@ #include #include #include -#include #include +#include + +#ifdef LOGGER_PARALLEL_LOGGING +static constexpr size_t LOGGER_ULOG_STREAM_DATA_LEN {249}; // Size of ulog_stream data buffer +static constexpr size_t LOGGER_RELIABLE_FIFO_WAIT_THRESHOLD{10}; // Msg count threshold for wait fifo trigger +#endif namespace px4 { namespace logger { +#ifdef LOGGER_PARALLEL_LOGGING +class ReliableMsg : public ListNode +{ +public: + uint16_t len; + uint8_t data[LOGGER_ULOG_STREAM_DATA_LEN]; +}; + +class ReliableFifo : public List +{ +public: + pthread_mutex_t mtx; + pthread_cond_t cv; + px4::atomic_bool sender_should_exit{false}; + bool sending{false}; +}; + +#endif + /** * @class LogWriterMavlink * Writes logging data to uORB, and then sent via mavlink @@ -63,9 +87,14 @@ class LogWriterMavlink bool is_started() const { return _is_started; } /** @see LogWriter::write_message() */ - int write_message(void *ptr, size_t size, bool acked = false); - + int write_message(void *ptr, size_t size, bool reliable = false); +#ifdef LOGGER_PARALLEL_LOGGING + int write_reliable_message(void *ptr, size_t size, bool wait = false); + bool reliable_fifo_is_sending(); + void wait_fifo_count(size_t count); +#else void set_need_reliable_transfer(bool need_reliable); +#endif bool need_reliable_transfer() const { @@ -73,17 +102,31 @@ class LogWriterMavlink } private: +#ifdef LOGGER_PARALLEL_LOGGING + static void *mav_reliable_sender_helper(void *context); + void mav_reliable_sender(); + + ReliableMsg *reliable_fifo_pop(); + bool reliable_fifo_push(ReliableMsg *node); + void reliable_fifo_set_sender_idle(); + + size_t reliable_fifo_count(); +#endif /** publish message, wait for ack if needed & reset message */ - int publish_message(bool acked = false); + int publish_message(bool reliable = false); ulog_stream_s _ulog_stream_data{}; - ulog_stream_s _ulog_stream_acked_data{}; uORB::Publication _ulog_stream_pub{ORB_ID(ulog_stream)}; - uORB::Publication _ulog_stream_acked_pub{ORB_ID(ulog_stream_acked)}; orb_sub_t _ulog_stream_ack_sub{ORB_SUB_INVALID}; bool _need_reliable_transfer{false}; bool _is_started{false}; +#ifdef LOGGER_PARALLEL_LOGGING + ulog_stream_s _ulog_stream_acked_data {}; + uORB::Publication _ulog_stream_acked_pub{ORB_ID(ulog_stream_acked)}; + ReliableFifo _fifo; + pthread_t _mav_reliable_sender_thread = 0; +#endif }; } diff --git a/src/modules/logger/logger.cpp b/src/modules/logger/logger.cpp index 4b62216b6c09..18aaee5893a3 100644 --- a/src/modules/logger/logger.cpp +++ b/src/modules/logger/logger.cpp @@ -64,6 +64,7 @@ #include #include #include +#include //#define DBGPRINT //write status output every few seconds @@ -400,6 +401,18 @@ Logger::Logger(LogWriter::Backend backend, size_t buffer_size, uint32_t log_inte PX4_ERR("Failed to find topic %s", poll_topic_name); } } + +#ifdef LOGGER_PARALLEL_LOGGING + + if (pthread_key_create(&pthread_data_key, NULL) < 0) { + PX4_ERR("Creating pthread data key failed"); + pthread_setspecific(pthread_data_key, (void *)nullptr); + + } else { + pthread_setspecific(pthread_data_key, (void *)&_thread_main_data); + } + +#endif } Logger::~Logger() @@ -593,7 +606,7 @@ void Logger::run() return; } - //all topics added. Get required message buffer size + //Get required message buffer size int max_msg_size = 0; for (int sub = 0; sub < _num_subscriptions; ++sub) { @@ -1187,19 +1200,11 @@ void Logger::handle_vehicle_command_update() } } -bool Logger::write_message(LogType type, void *ptr, size_t size, bool acked) +bool Logger::write_message(LogType type, void *ptr, size_t size, bool reliable, bool wait) { - if (acked) { - if (_writer.write_message(type, ptr, size, 0, true) != -1) { - return true; - } - - return false; - } - Statistics &stats = _statistics[(int)type]; - if (_writer.write_message(type, ptr, size, stats.dropout_start) != -1) { + if (_writer.write_message(type, ptr, size, stats.dropout_start, reliable, wait) != -1) { if (stats.dropout_start) { float dropout_duration = (float)(hrt_elapsed_time(&stats.dropout_start) / 1000) / 1.e3f; @@ -1474,22 +1479,21 @@ void *Logger::mav_start_steps_helper(void *context) void Logger::mav_start_steps() { /* This is running in separate thread to keep logging data while sending header&descriptions */ - PX4_INFO("[log_writer_mavlink_headers] - Begin"); - LogWriter::Backend bkend = _writer.backend(); - _writer.select_write_backend(LogWriter::BackendMavlink); - write_header(LogType::Full, true); - write_version(LogType::Full, true); - write_formats(LogType::Full, true); - write_parameters(LogType::Full, true); - write_parameter_defaults(LogType::Full, true); - write_perf_data(true, true); - write_console_output(true); - write_events_file(LogType::Full, true); - write_excluded_optional_topics(LogType::Full, true); - write_all_add_logged_msg(LogType::Full, true); - _writer.select_write_backend(bkend); - _writer.notify(); - PX4_INFO("[log_writer_mavlink_headers] - End"); + _thread_mav_start_sender_data.wait_for_ack = true; + pthread_setspecific(pthread_data_key, (void *)&_thread_mav_start_sender_data); + + PX4_INFO("Write static data - Begin"); + write_header(LogType::Full); + write_version(LogType::Full); + write_formats(LogType::Full); + write_parameters(LogType::Full); + write_parameter_defaults(LogType::Full); + write_perf_data(PrintLoadReason::Preflight); + write_console_output(); + write_events_file(LogType::Full); + write_excluded_optional_topics(LogType::Full); + write_all_add_logged_msg(LogType::Full); + PX4_INFO("Write static data - End"); } #endif @@ -1509,6 +1513,8 @@ void Logger::start_log_mavlink() PX4_INFO("Start mavlink log"); _writer.start_log_mavlink(); + _writer.select_write_backend(LogWriter::BackendMavlink); + #ifdef LOGGER_PARALLEL_LOGGING for (int sub = 0; sub < _num_subscriptions; ++sub) { @@ -1535,7 +1541,6 @@ void Logger::start_log_mavlink() pthread_attr_destroy(&thr_attr); #else - _writer.select_write_backend(LogWriter::BackendMavlink); _writer.set_need_reliable_transfer(true); write_header(LogType::Full); write_version(LogType::Full); @@ -1548,9 +1553,9 @@ void Logger::start_log_mavlink() write_excluded_optional_topics(LogType::Full); write_all_add_logged_msg(LogType::Full); _writer.set_need_reliable_transfer(false); +#endif _writer.unselect_write_backend(); _writer.notify(); -#endif PX4_INFO("Mavlink logging started"); adjust_subscription_updates(); // redistribute updates as sending the header can take some time @@ -1561,38 +1566,26 @@ void Logger::stop_log_mavlink() // don't write perf data since a client does not expect more data after a stop command PX4_INFO("Stop mavlink log"); -#ifdef LOGGER_PARALLEL_LOGGING - int ret = pthread_join(_mav_start_thread, nullptr); - - if (ret) { - PX4_WARN("mav_start_thread join failed: %d", ret); - } - -#endif - if (_writer.is_started(LogType::Full, LogWriter::BackendMavlink)) { -#ifdef LOGGER_PARALLEL_LOGGING - LogWriter::Backend bkend = _writer.backend(); - _writer.select_write_backend(LogWriter::BackendMavlink); - write_perf_data(PrintLoadReason::Postflight, true); - _writer.select_write_backend(bkend); -#else _writer.select_write_backend(LogWriter::BackendMavlink); _writer.set_need_reliable_transfer(true); write_perf_data(PrintLoadReason::Postflight); +#ifdef LOGGER_PARALLEL_LOGGING + _writer.wait_fifo_empty(); +#endif _writer.set_need_reliable_transfer(false); _writer.unselect_write_backend(); -#endif _writer.notify(); _writer.stop_log_mavlink(); } + + PX4_INFO("Mavlink logging stopped"); } struct perf_callback_data_t { Logger *logger; int counter; Logger::PrintLoadReason reason; - bool acked; char *buffer; }; @@ -1620,18 +1613,16 @@ void Logger::perf_iterate_callback(perf_counter_t handle, void *user) break; } - callback_data->logger->write_info_multiple(LogType::Full, perf_name, buffer, callback_data->counter != 0, - callback_data->acked); + callback_data->logger->write_info_multiple(LogType::Full, perf_name, buffer, callback_data->counter != 0); ++callback_data->counter; } -void Logger::write_perf_data(PrintLoadReason reason, bool acked) +void Logger::write_perf_data(PrintLoadReason reason) { perf_callback_data_t callback_data = {}; callback_data.logger = this; callback_data.counter = 0; callback_data.reason = reason; - callback_data.acked = acked; // write the perf counters perf_iterate_all(perf_iterate_callback, &callback_data); @@ -1667,8 +1658,6 @@ void Logger::print_load_callback(void *user) callback_data->logger->write_info_multiple(LogType::Full, perf_name, callback_data->buffer, callback_data->counter != 0); - // TODO: Add call callback_data->acked to the last param. Currenlty it fails due to conflict with - // startup header/definition sending ++callback_data->counter; } @@ -1699,11 +1688,6 @@ void Logger::write_load_output() char buffer[140]; callback_data.logger = this; callback_data.counter = 0; -#ifdef LOGGER_PARALLEL_LOGGING - callback_data.acked = true; -#else - callback_data.acked = false; -#endif callback_data.buffer = buffer; // TODO: maybe we should restrict the output to a selected backend (eg. when file logging is running // and mavlink log is started, this will be added to the file as well) @@ -1712,7 +1696,7 @@ void Logger::write_load_output() _writer.set_need_reliable_transfer(false); } -void Logger::write_console_output(bool acked) +void Logger::write_console_output() { const int buffer_length = 220; char buffer[buffer_length]; @@ -1726,7 +1710,7 @@ void Logger::write_console_output(bool acked) if (read_size <= 0) { break; } buffer[math::min(read_size, size)] = '\0'; - write_info_multiple(LogType::Full, "boot_console_output", buffer, !first, acked); + write_info_multiple(LogType::Full, "boot_console_output", buffer, !first); size -= read_size; first = false; @@ -1735,7 +1719,7 @@ void Logger::write_console_output(bool acked) } void Logger::write_format(LogType type, const orb_metadata &meta, WrittenFormats &written_formats, - ulog_message_format_s &msg, int subscription_index, int level, bool acked) + ulog_message_format_s &msg, int subscription_index, int level) { if (level > 3) { // precaution: limit recursion level. If we land here it's either a bug or nested topic definitions. In the @@ -1796,7 +1780,12 @@ void Logger::write_format(LogType type, const orb_metadata &meta, WrittenFormats size_t msg_size = sizeof(msg) - sizeof(msg.format) + format_len; msg.msg_size = msg_size - ULOG_MSG_HEADER_LEN; - write_message(type, &msg, msg_size, acked); +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); + write_message(type, &msg, msg_size, true, th_data->wait_for_ack); +#else + write_message(type, &msg, msg_size); +#endif if (level > 1 && !written_formats.push_back(&meta)) { PX4_ERR("Array too small"); @@ -1850,7 +1839,7 @@ void Logger::write_format(LogType type, const orb_metadata &meta, WrittenFormats if (found_topic) { - write_format(type, *found_topic, written_formats, msg, subscription_index, level + 1, acked); + write_format(type, *found_topic, written_formats, msg, subscription_index, level + 1); } else { PX4_ERR("No definition for topic %s found", fmt); @@ -1863,7 +1852,7 @@ void Logger::write_format(LogType type, const orb_metadata &meta, WrittenFormats } } -void Logger::write_formats(LogType type, bool acked) +void Logger::write_formats(LogType type) { _writer.lock(); @@ -1880,15 +1869,15 @@ void Logger::write_formats(LogType type, bool acked) for (int i = 0; i < sub_count; ++i) { const LoggerSubscription &sub = _subscriptions[i]; - write_format(type, *sub.get_topic(), written_formats, msg, i, 1, acked); + write_format(type, *sub.get_topic(), written_formats, msg, i); } - write_format(type, *_event_subscription.get_topic(), written_formats, msg, sub_count, 1, acked); + write_format(type, *_event_subscription.get_topic(), written_formats, msg, sub_count); _writer.unlock(); } -void Logger::write_all_add_logged_msg(LogType type, bool acked) +void Logger::write_all_add_logged_msg(LogType type) { _writer.lock(); @@ -1904,12 +1893,12 @@ void Logger::write_all_add_logged_msg(LogType type, bool acked) LoggerSubscription &sub = _subscriptions[i]; if (sub.valid()) { - write_add_logged_msg(type, sub, acked); + write_add_logged_msg(type, sub); added_subscriptions = true; } } - write_add_logged_msg(type, _event_subscription, acked); // always add, even if not valid + write_add_logged_msg(type, _event_subscription); // always add, even if not valid _writer.unlock(); @@ -1918,7 +1907,7 @@ void Logger::write_all_add_logged_msg(LogType type, bool acked) } } -void Logger::write_add_logged_msg(LogType type, LoggerSubscription &subscription, bool acked) +void Logger::write_add_logged_msg(LogType type, LoggerSubscription &subscription) { ulog_message_add_logged_s msg; @@ -1944,11 +1933,16 @@ void Logger::write_add_logged_msg(LogType type, LoggerSubscription &subscription bool prev_reliable = _writer.need_reliable_transfer(); _writer.set_need_reliable_transfer(true); - write_message(type, &msg, msg_size, acked); +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); + write_message(type, &msg, msg_size, true, th_data->wait_for_ack); +#else + write_message(type, &msg, msg_size); +#endif _writer.set_need_reliable_transfer(prev_reliable); } -void Logger::write_info(LogType type, const char *name, const char *value, bool acked) +void Logger::write_info(LogType type, const char *name, const char *value) { _writer.lock(); ulog_message_info_s msg = {}; @@ -1967,13 +1961,18 @@ void Logger::write_info(LogType type, const char *name, const char *value, bool msg.msg_size = msg_size - ULOG_MSG_HEADER_LEN; - write_message(type, buffer, msg_size, acked); +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); + write_message(type, buffer, msg_size, true, th_data->wait_for_ack); +#else + write_message(type, buffer, msg_size); +#endif } _writer.unlock(); } -void Logger::write_info_multiple(LogType type, const char *name, const char *value, bool is_continued, bool acked) +void Logger::write_info_multiple(LogType type, const char *name, const char *value, bool is_continued) { _writer.lock(); ulog_message_info_multiple_s msg; @@ -1993,7 +1992,12 @@ void Logger::write_info_multiple(LogType type, const char *name, const char *val msg.msg_size = msg_size - ULOG_MSG_HEADER_LEN; - write_message(type, buffer, msg_size, acked); +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); + write_message(type, buffer, msg_size, true, th_data->wait_for_ack); +#else + write_message(type, buffer, msg_size); +#endif } else { PX4_ERR("info_multiple str too long (%" PRIu8 "), key=%s", msg.key_len, msg.key_value_str); @@ -2002,7 +2006,7 @@ void Logger::write_info_multiple(LogType type, const char *name, const char *val _writer.unlock(); } -void Logger::write_info_multiple(LogType type, const char *name, int fd, bool acked) +void Logger::write_info_multiple(LogType type, const char *name, int fd) { // Get the file length struct stat stat_data; @@ -2022,6 +2026,10 @@ void Logger::write_info_multiple(LogType type, const char *name, int fd, bool ac int file_offset = 0; +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); +#endif + while (file_offset < file_size) { _writer.lock(); @@ -2038,7 +2046,11 @@ void Logger::write_info_multiple(LogType type, const char *name, int fd, bool ac msg_size += read_length; msg.msg_size = msg_size - ULOG_MSG_HEADER_LEN; - write_message(type, buffer, msg_size, acked); +#ifdef LOGGER_PARALLEL_LOGGING + write_message(type, buffer, msg_size, true, th_data->wait_for_ack); +#else + write_message(type, buffer, msg_size); +#endif file_offset += ret; } else { @@ -2052,19 +2064,19 @@ void Logger::write_info_multiple(LogType type, const char *name, int fd, bool ac } } -void Logger::write_info(LogType type, const char *name, int32_t value, bool acked) +void Logger::write_info(LogType type, const char *name, int32_t value) { - write_info_template(type, name, value, "int32_t", acked); + write_info_template(type, name, value, "int32_t"); } -void Logger::write_info(LogType type, const char *name, uint32_t value, bool acked) +void Logger::write_info(LogType type, const char *name, uint32_t value) { - write_info_template(type, name, value, "uint32_t", acked); + write_info_template(type, name, value, "uint32_t"); } template -void Logger::write_info_template(LogType type, const char *name, T value, const char *type_str, bool acked) +void Logger::write_info_template(LogType type, const char *name, T value, const char *type_str) { _writer.lock(); ulog_message_info_s msg = {}; @@ -2081,23 +2093,28 @@ void Logger::write_info_template(LogType type, const char *name, T value, const msg.msg_size = msg_size - ULOG_MSG_HEADER_LEN; - write_message(type, buffer, msg_size, acked); +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); + write_message(type, buffer, msg_size, true, th_data->wait_for_ack); +#else + write_message(type, buffer, msg_size); +#endif _writer.unlock(); } -void Logger::write_excluded_optional_topics(LogType type, bool acked) +void Logger::write_excluded_optional_topics(LogType type) { for (int i = 0; i < _num_excluded_optional_topic_ids; ++i) { orb_id_t meta = get_orb_meta((ORB_ID)_excluded_optional_topic_ids[i]); if (meta) { - write_info_multiple(type, "excluded_optional_topics", meta->o_name, false, acked); + write_info_multiple(type, "excluded_optional_topics", meta->o_name, false); } } } -void Logger::write_header(LogType type, bool acked) +void Logger::write_header(LogType type) { ulog_file_header_s header = {}; header.magic[0] = 'U'; @@ -2110,7 +2127,13 @@ void Logger::write_header(LogType type, bool acked) header.magic[7] = 0x01; //file version 1 header.timestamp = hrt_absolute_time(); _writer.lock(); - write_message(type, &header, sizeof(header), acked); + +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); + write_message(type, &header, sizeof(header), true, th_data->wait_for_ack); +#else + write_message(type, &header, sizeof(header)); +#endif // write the Flags message: this MUST be written right after the ulog header ulog_message_flag_bits_s flag_bits{}; @@ -2120,57 +2143,61 @@ void Logger::write_header(LogType type, bool acked) flag_bits.msg_size = sizeof(flag_bits) - ULOG_MSG_HEADER_LEN; flag_bits.msg_type = static_cast(ULogMessageType::FLAG_BITS); - write_message(type, &flag_bits, sizeof(flag_bits), acked); +#ifdef LOGGER_PARALLEL_LOGGING + write_message(type, &flag_bits, sizeof(flag_bits), true, th_data->wait_for_ack); +#else + write_message(type, &flag_bits, sizeof(flag_bits)); +#endif _writer.unlock(); } -void Logger::write_version(LogType type, bool acked) +void Logger::write_version(LogType type) { - write_info(type, "ver_sw", px4_firmware_version_string(), acked); - write_info(type, "ver_sw_release", px4_firmware_version(), acked); + write_info(type, "ver_sw", px4_firmware_version_string()); + write_info(type, "ver_sw_release", px4_firmware_version()); uint32_t vendor_version = px4_firmware_vendor_version(); if (vendor_version > 0) { - write_info(type, "ver_vendor_sw_release", vendor_version, acked); + write_info(type, "ver_vendor_sw_release", vendor_version); } - write_info(type, "ver_hw", px4_board_name(), acked); + write_info(type, "ver_hw", px4_board_name()); const char *board_sub_type = px4_board_sub_type(); if (board_sub_type && board_sub_type[0]) { - write_info(type, "ver_hw_subtype", board_sub_type, acked); + write_info(type, "ver_hw_subtype", board_sub_type); } - write_info(type, "sys_name", "PX4", acked); - write_info(type, "sys_os_name", px4_os_name(), acked); + write_info(type, "sys_name", "PX4"); + write_info(type, "sys_os_name", px4_os_name()); const char *os_version = px4_os_version_string(); const char *git_branch = px4_firmware_git_branch(); if (git_branch && git_branch[0]) { - write_info(type, "ver_sw_branch", git_branch, acked); + write_info(type, "ver_sw_branch", git_branch); } if (os_version) { - write_info(type, "sys_os_ver", os_version, acked); + write_info(type, "sys_os_ver", os_version); } const char *oem_version = px4_firmware_oem_version_string(); if (oem_version && oem_version[0]) { - write_info(type, "ver_oem", oem_version, acked); + write_info(type, "ver_oem", oem_version); } - write_info(type, "sys_os_ver_release", px4_os_version(), acked); - write_info(type, "sys_toolchain", px4_toolchain_name(), acked); - write_info(type, "sys_toolchain_ver", px4_toolchain_version(), acked); + write_info(type, "sys_os_ver_release", px4_os_version()); + write_info(type, "sys_toolchain", px4_toolchain_name()); + write_info(type, "sys_toolchain_ver", px4_toolchain_version()); const char *ecl_version = px4_ecl_lib_version_string(); if (ecl_version && ecl_version[0]) { - write_info(type, "sys_lib_ecl_ver", ecl_version, acked); + write_info(type, "sys_lib_ecl_ver", ecl_version); } char revision = 'U'; @@ -2179,12 +2206,12 @@ void Logger::write_version(LogType type, bool acked) if (board_mcu_version(&revision, &chip_name, nullptr) >= 0) { char mcu_ver[64]; snprintf(mcu_ver, sizeof(mcu_ver), "%s, rev. %c", chip_name, revision); - write_info(type, "sys_mcu", mcu_ver, acked); + write_info(type, "sys_mcu", mcu_ver); } // data versioning: increase this on every larger data change (format/semantic) // 1: switch to FIFO drivers (disabled on-chip DLPF) - write_info(type, "ver_data_format", static_cast(1), acked); + write_info(type, "ver_data_format", static_cast(1)); #ifndef BOARD_HAS_NO_UUID @@ -2192,23 +2219,23 @@ void Logger::write_version(LogType type, bool acked) if (_param_sdlog_uuid.get() == 1) { char px4_uuid_string[PX4_GUID_FORMAT_SIZE]; board_get_px4_guid_formated(px4_uuid_string, sizeof(px4_uuid_string)); - write_info(type, "sys_uuid", px4_uuid_string, acked); + write_info(type, "sys_uuid", px4_uuid_string); } #endif /* BOARD_HAS_NO_UUID */ - write_info(type, "time_ref_utc", _param_sdlog_utc_offset.get() * 60, acked); + write_info(type, "time_ref_utc", _param_sdlog_utc_offset.get() * 60); if (_replay_file_name) { - write_info(type, "replay", _replay_file_name, acked); + write_info(type, "replay", _replay_file_name); } if (type == LogType::Mission) { - write_info(type, "log_type", "mission", acked); + write_info(type, "log_type", "mission"); } } -void Logger::write_parameter_defaults(LogType type, bool acked) +void Logger::write_parameter_defaults(LogType type) { _writer.lock(); ulog_message_parameter_default_s msg = {}; @@ -2217,6 +2244,9 @@ void Logger::write_parameter_defaults(LogType type, bool acked) msg.msg_type = static_cast(ULogMessageType::PARAMETER_DEFAULT); int param_idx = 0; param_t param = 0; +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); +#endif do { // skip over all parameters which are not used @@ -2279,20 +2309,32 @@ void Logger::write_parameter_defaults(LogType type, bool acked) if (memcmp(&value, &default_value, value_size) != 0) { memcpy(&buffer[msg_size - value_size], default_value, value_size); msg.default_types = ulog_parameter_default_type_t::current_setup | ulog_parameter_default_type_t::system; - write_message(type, buffer, msg_size, acked); +#ifdef LOGGER_PARALLEL_LOGGING + write_message(type, buffer, msg_size, true, th_data->wait_for_ack); +#else + write_message(type, buffer, msg_size); +#endif } } else { if (memcmp(&value, &default_value, value_size) != 0) { memcpy(&buffer[msg_size - value_size], default_value, value_size); msg.default_types = ulog_parameter_default_type_t::current_setup; - write_message(type, buffer, msg_size, acked); +#ifdef LOGGER_PARALLEL_LOGGING + write_message(type, buffer, msg_size, true, th_data->wait_for_ack); +#else + write_message(type, buffer, msg_size); +#endif } if (memcmp(&value, &system_default_value, value_size) != 0) { memcpy(&buffer[msg_size - value_size], system_default_value, value_size); msg.default_types = ulog_parameter_default_type_t::system; - write_message(type, buffer, msg_size, acked); +#ifdef LOGGER_PARALLEL_LOGGING + write_message(type, buffer, msg_size, true, th_data->wait_for_ack); +#else + write_message(type, buffer, msg_size); +#endif } } } @@ -2302,7 +2344,7 @@ void Logger::write_parameter_defaults(LogType type, bool acked) _writer.notify(); } -void Logger::write_parameters(LogType type, bool acked) +void Logger::write_parameters(LogType type) { _writer.lock(); ulog_message_parameter_s msg = {}; @@ -2312,6 +2354,10 @@ void Logger::write_parameters(LogType type, bool acked) int param_idx = 0; param_t param = 0; +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); +#endif + do { // skip over all parameters which are not used do { @@ -2363,7 +2409,11 @@ void Logger::write_parameters(LogType type, bool acked) msg.msg_size = msg_size - ULOG_MSG_HEADER_LEN; - write_message(type, buffer, msg_size, acked); +#ifdef LOGGER_PARALLEL_LOGGING + write_message(type, buffer, msg_size, true, th_data->wait_for_ack); +#else + write_message(type, buffer, msg_size); +#endif } } while ((param != PARAM_INVALID) && (param_idx < (int) param_count())); @@ -2381,6 +2431,10 @@ void Logger::write_changed_parameters(LogType type) int param_idx = 0; param_t param = 0; +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); +#endif + do { // skip over all parameters which are not used do { @@ -2434,7 +2488,11 @@ void Logger::write_changed_parameters(LogType type) // msg_size is now 1 (msg_type) + 2 (msg_size) + 1 (key_len) + key_len + value_size msg.msg_size = msg_size - ULOG_MSG_HEADER_LEN; +#ifdef LOGGER_PARALLEL_LOGGING + write_message(type, buffer, msg_size, true, th_data->wait_for_ack); +#else write_message(type, buffer, msg_size); +#endif } } while ((param != PARAM_INVALID) && (param_idx < (int) param_count())); @@ -2442,7 +2500,7 @@ void Logger::write_changed_parameters(LogType type) _writer.notify(); } -void Logger::write_events_file(LogType type, bool acked) +void Logger::write_events_file(LogType type) { int fd = open(PX4_ROOTFSDIR "/etc/extras/all_events.json.xz", O_RDONLY); @@ -2454,11 +2512,11 @@ void Logger::write_events_file(LogType type, bool acked) return; } - write_info_multiple(type, "metadata_events", fd, acked); + write_info_multiple(type, "metadata_events", fd); close(fd); - write_info(type, "metadata_events_sha256", component_information::all_events_sha256, acked); + write_info(type, "metadata_events_sha256", component_information::all_events_sha256); } void Logger::ack_vehicle_command(vehicle_command_s *cmd, uint32_t result) diff --git a/src/modules/logger/logger.h b/src/modules/logger/logger.h index 7dc9daf66379..e215401d0861 100644 --- a/src/modules/logger/logger.h +++ b/src/modules/logger/logger.h @@ -69,6 +69,14 @@ namespace px4 namespace logger { +#ifdef LOGGER_PARALLEL_LOGGING +typedef struct thread_data { + bool wait_for_ack{false}; +} thread_data_t; + +pthread_key_t pthread_data_key; +#endif + static constexpr uint8_t MSG_ID_INVALID = UINT8_MAX; struct LoggerSubscription : public uORB::SubscriptionInterval { @@ -190,13 +198,13 @@ class Logger : public ModuleBase, public ModuleParams /** * Write an ADD_LOGGED_MSG to the log for a all current subscriptions and instances */ - void write_all_add_logged_msg(LogType type, bool acked = false); + void write_all_add_logged_msg(LogType type); /** * Write an ADD_LOGGED_MSG to the log for a given subscription and instance. * _writer.lock() must be held when calling this. */ - void write_add_logged_msg(LogType type, LoggerSubscription &subscription, bool acked = false); + void write_add_logged_msg(LogType type, LoggerSubscription &subscription); /** * Create logging directory @@ -228,7 +236,7 @@ class Logger : public ModuleBase, public ModuleParams void mav_start_steps(); static void *mav_start_steps_helper(void *); - pthread_t _mav_start_thread = 0; + #endif /** check if mavlink logging can be started */ @@ -244,24 +252,24 @@ class Logger : public ModuleBase, public ModuleParams /** * write the file header with file magic and timestamp. */ - void write_header(LogType type, bool acked = false); + void write_header(LogType type); /// Array to store written formats for nested definitions (only) using WrittenFormats = Array < const orb_metadata *, 20 >; void write_format(LogType type, const orb_metadata &meta, WrittenFormats &written_formats, ulog_message_format_s &msg, - int subscription_index, int level = 1, bool acked = false); - void write_formats(LogType type, bool acked = false); + int subscription_index, int level = 1); + void write_formats(LogType type); /** * write performance counters */ - void write_perf_data(PrintLoadReason reason, bool acked = false); + void write_perf_data(PrintLoadReason reason); /** * write bootup console output */ - void write_console_output(bool acked = false); + void write_console_output(); /** * callback to write the performance counters @@ -273,25 +281,25 @@ class Logger : public ModuleBase, public ModuleParams */ static void print_load_callback(void *user); - void write_version(LogType type, bool acked = false); + void write_version(LogType type); - void write_excluded_optional_topics(LogType type, bool acked = false); + void write_excluded_optional_topics(LogType type); - void write_info(LogType type, const char *name, const char *value, bool acked = false); - void write_info_multiple(LogType type, const char *name, const char *value, bool is_continued, bool acked = false); - void write_info_multiple(LogType type, const char *name, int fd, bool acked = false); - void write_info(LogType type, const char *name, int32_t value, bool acked = false); - void write_info(LogType type, const char *name, uint32_t value, bool acked = false); + void write_info(LogType type, const char *name, const char *value); + void write_info_multiple(LogType type, const char *name, const char *value, bool is_continued); + void write_info_multiple(LogType type, const char *name, int fd); + void write_info(LogType type, const char *name, int32_t value); + void write_info(LogType type, const char *name, uint32_t value); /** generic common template method for write_info variants */ template - void write_info_template(LogType type, const char *name, T value, const char *type_str, bool acked = false); + void write_info_template(LogType type, const char *name, T value, const char *type_str); - void write_parameters(LogType type, bool acked = false); - void write_parameter_defaults(LogType type, bool acked = false); + void write_parameters(LogType type); + void write_parameter_defaults(LogType type); void write_changed_parameters(LogType type); - void write_events_file(LogType type, bool acked = false); + void write_events_file(LogType type); inline bool copy_if_updated(int sub_idx, void *buffer, bool try_to_subscribe); @@ -300,7 +308,7 @@ class Logger : public ModuleBase, public ModuleParams * Must be called with _writer.lock() held. * @return true if data written, false otherwise (on overflow) */ - bool write_message(LogType type, void *ptr, size_t size, bool acked = false); + bool write_message(LogType type, void *ptr, size_t size, bool reliable = false, bool wait = false); /** * Add topic subscriptions from SD file if it exists, otherwise add topics based on the configured profile. @@ -351,6 +359,12 @@ class Logger : public ModuleBase, public ModuleParams void adjust_subscription_updates(); +#ifdef LOGGER_PARALLEL_LOGGING + pthread_t _mav_start_thread {0}; + thread_data_t _thread_main_data; + thread_data_t _thread_mav_start_sender_data; +#endif + uint8_t *_msg_buffer{nullptr}; int _msg_buffer_len{0}; diff --git a/src/modules/logger/parallel_mavlink_logging.md b/src/modules/logger/parallel_mavlink_logging.md index 3d31b52cd916..d506a53aba65 100644 --- a/src/modules/logger/parallel_mavlink_logging.md +++ b/src/modules/logger/parallel_mavlink_logging.md @@ -7,7 +7,7 @@ Starting flight logging over mavlink is a slow operation. In case logging is sta To speed up logging startup and reduce that blackout period in the beginning of the log, the parallel data logging option is implemented. The trick here is that the actual uorb data logging is started as soon as possible when logging is started and the static definitions/configs are sent at the same time through another channel. The receiver end reads both streams and store them to two separate files and in the end of logging it combines them into one ulog file by appending the topic data file in the end of static definition data file. -This new protocol breaks the backward compatibility, so BOTH px4 logger and the reveicer MUST or MUST NOT implement the parallel logging to make it work! +This new protocol is not backward compatible, so BOTH px4 logger and the receiver MUST or MUST NOT implement the parallel logging to make it work! ### Original protocol @@ -59,13 +59,13 @@ MAVLINK_MSG_LOGGING_DATA_ACKED MAVLINK_MSG_ ``` -### Parallel protocol -Logger spawns new thread for sending Static definitions data and continues to send dynamic topic data at the same time. Static data is published into **ulog_stream_acked** topic and the dynamic data into **ulog_stream** topic. The thread sending dynamic data does not need to wait anything, but continuously sending the data without waiting any ack. The static data sender thread publishes one msg at a time and waits for ack until it can publish next one. +### Parallel logging protocol +Logger spawns new thread for sending Static definitions data (reliable transfer enabled) and continues to send dynamic topic data at the same time. Static data is published into **ulog_stream_acked** topic and the dynamic data into **ulog_stream** topic. The thread sending dynamic data does not need to wait anything, but continuously sending the data without waiting any ack. The static data sender thread publishes one message at a time and waits for ack until publishing next one. -mavlink_uorb reads both **ulog_stream** and **ulog_stream_acked** streams and sends either **MAVLINK_MSG_LOGGING_DATA** or **MAVLINK_MSG_LOGGING_DATA_ACKED** msgs accordingly. Also it listens to **MAVLINK_MSG_LOGGING_DATA_ACK** messages and publish to **ulog_stream_ack** if one received. -Sending **MAVLINK_MSG_LOGGING_DATA_ACKED** raises a flag to wait for ack until next _acked message is sent, but **MAVLINK_MSG_LOGGING_DATA** messages are always sent without any blocking. +mavlink_uorb reads both **ulog_stream** and **ulog_stream_acked** streams and sends either **MAVLINK_MSG_LOGGING_DATA** or **MAVLINK_MSG_LOGGING_DATA_ACKED** mavlink msgs accordingly. Also it listens to **MAVLINK_MSG_LOGGING_DATA_ACK** messages and publish to **ulog_stream_ack** if one received. +Sending **MAVLINK_MSG_LOGGING_DATA_ACKED** raises a flag to wait for ack. A **MAVLINK_MSG_LOGGING_DATA_ACK** message with expected sequence number shall be received before next _acked message can be sent, but the **MAVLINK_MSG_LOGGING_DATA** messages are always sent in parallel of that without any blocking. -Receiver listens to both **MAVLINK_MSG_LOGGING_DATA** and **MAVLINK_MSG_LOGGING_DATA_ACKED** messages and write them to separate files accordingly: _DATA into .data file and _DATA_ACKED into .ulg file. When logging is stopped, the receiver append the .data file content into the end of .ulg file. +Receiver listens to both **MAVLINK_MSG_LOGGING_DATA** and **MAVLINK_MSG_LOGGING_DATA_ACKED** messages and write them to separate files accordingly: _DATA into .data file and _DATA_ACKED into .ulg file. For each **MAVLINK_MSG_LOGGING_DATA_ACKED** message it sends back a **MAVLINK_MSG_LOGGING_DATA_ACK** message. When logging is stopped, the receiver append the .data file content into the end of .ulg file and removes the .data file. ``` +----------------------------------------------+ @@ -77,7 +77,7 @@ Receiver listens to both **MAVLINK_MSG_LOGGING_DATA** and **MAVLINK_MSG_LOGGING_ | | ^ | Publish | Publish | Subs | | | - V V | + v v | +----------------------------------------------+ | MAvlink_ulog | | | @@ -85,11 +85,11 @@ Receiver listens to both **MAVLINK_MSG_LOGGING_DATA** and **MAVLINK_MSG_LOGGING_ | | +----------------------------------------------+ | | ^ - Send Send Recv -MAVLINK_MSG_ MAVLINK_MSG_ MAVLINK_MSG_ -LOGGING_DATA LOGGING_DATA_ACKED LOGGING_DATA_ACK - | | | - V V | +MAVLINK_MSG_ MAVLINK_MSG_ | +LOGGING_DATA LOGGING_DATA_ACKED | + | | MAVLINK_MSG_ + | | LOGGING_DATA_ACK + v v | +----------------------------------------------+ | Receiver | |-----------+ +--------------------------| @@ -98,18 +98,21 @@ LOGGING_DATA LOGGING_DATA_ACKED LOGGING_DATA_ACK | | | | | | - V V + v v +------------+ +------------+ | .data file | | .ulg file | +| DATA | | DEFS | +------------+ +------------+ | | | ---- Stop logging---- | | | - +------------+--------------+ - | - V - +-----------+ - | .ulg file | - +-----------+ + +-------------------------->+ + | + v + +------------+ + | .ulg file | + | DEFS | + | DATA | + +------------+ ``` diff --git a/src/modules/mavlink/CMakeLists.txt b/src/modules/mavlink/CMakeLists.txt index 599ec7e63e71..45f7737e6e03 100644 --- a/src/modules/mavlink/CMakeLists.txt +++ b/src/modules/mavlink/CMakeLists.txt @@ -31,10 +31,6 @@ # ############################################################################ -if(CONFIG_MAVLINK_PARALLEL_LOGGING) - set(MAVLINK_PARALLEL_COMPILE_FLAG "-DMAVLINK_PARALLEL_LOGGING") -endif() - set(MAVLINK_GIT_DIR "${CMAKE_CURRENT_LIST_DIR}/mavlink") set(MAVLINK_LIBRARY_DIR "${CMAKE_BINARY_DIR}/mavlink") file(RELATIVE_PATH MAVLINK_GIT_DIR_RELATIVE ${CMAKE_SOURCE_DIR} ${MAVLINK_GIT_DIR}) @@ -109,7 +105,6 @@ px4_add_module( MAIN mavlink COMPILE_FLAGS -Wno-enum-compare # ROTATION <-> MAV_SENSOR_ROTATION - ${MAVLINK_PARALLEL_COMPILE_FLAG} #-DDEBUG_BUILD SRCS mavlink.c diff --git a/src/modules/mavlink/Kconfig b/src/modules/mavlink/Kconfig index 7adde83e84cb..8fb9219d9d61 100644 --- a/src/modules/mavlink/Kconfig +++ b/src/modules/mavlink/Kconfig @@ -24,10 +24,3 @@ menuconfig USER_MAVLINK depends on BOARD_PROTECTED && MODULES_MAVLINK ---help--- Put mavlink in userspace memory - -menuconfig MAVLINK_PARALLEL_LOGGING - bool "Custom mavlink logging protocol" - default n - depends on MODULES_MAVLINK - ---help--- - Custom mavlink logging protocol for speed up logging start phase diff --git a/src/modules/mavlink/mavlink_ulog.cpp b/src/modules/mavlink/mavlink_ulog.cpp index 161ee0eae3eb..bc7f05d33205 100644 --- a/src/modules/mavlink/mavlink_ulog.cpp +++ b/src/modules/mavlink/mavlink_ulog.cpp @@ -60,15 +60,12 @@ MavlinkULog::MavlinkULog(int datarate, float max_rate_factor, uint8_t target_sys } -#ifdef MAVLINK_PARALLEL_LOGGING - - PX4_INFO("[Mavlink_ulog] Parallel logging enabled"); + PX4_INFO("[Mavlink_ulog] Parallel logging supported"); while (_ulog_stream_acked_sub.update()) { } -#endif _waiting_for_initial_ack = true; _last_sent_time = hrt_absolute_time(); //(ab)use this timestamp during initialization _next_rate_check = _last_sent_time + _rate_calculation_delta_t; @@ -104,54 +101,11 @@ int MavlinkULog::handle_update(mavlink_channel_t channel) return 0; } -#ifdef MAVLINK_PARALLEL_LOGGING - - while ((_current_num_msgs < _max_num_messages) && _ulog_stream_sub.updated()) { - const unsigned last_generation = _ulog_stream_sub.get_last_generation(); - _ulog_stream_sub.update(); - - if (_ulog_stream_sub.get_last_generation() != last_generation + 1) { - perf_count(_msg_missed_ulog_stream_perf); - } - - const ulog_stream_s &ulog_data = _ulog_stream_sub.get(); - - if (ulog_data.timestamp > 0) { - mavlink_logging_data_t msg; - msg.sequence = ulog_data.msg_sequence; - msg.length = ulog_data.length; - msg.first_message_offset = ulog_data.first_message_offset; - msg.target_system = _target_system; - msg.target_component = _target_component; - memcpy(msg.data, ulog_data.data, sizeof(msg.data)); - mavlink_msg_logging_data_send_struct(channel, &msg); - } - - ++_current_num_msgs; - } - - //need to update the rate? - hrt_abstime t = hrt_absolute_time(); - - if (t > _next_rate_check) { - if (_current_num_msgs < _max_num_messages) { - _current_rate_factor = _max_rate_factor * (float)_current_num_msgs / _max_num_messages; - - } else { - _current_rate_factor = _max_rate_factor; - } - - _current_num_msgs = 0; - _next_rate_check = t + _rate_calculation_delta_t * 1.e6f; - PX4_DEBUG("current rate=%.3f (max=%i msgs in %.3fs)", (double)_current_rate_factor, _max_num_messages, - (double)_rate_calculation_delta_t); - } - -#endif - // check if we're waiting for an ACK + bool check_for_updates = true; + if (_last_sent_time) { - bool check_for_updates = false; + check_for_updates = false; if (_ack_received) { _last_sent_time = 0; @@ -167,11 +121,7 @@ int MavlinkULog::handle_update(mavlink_channel_t channel) PX4_DEBUG("re-sending ulog mavlink message (try=%i)", _sent_tries); _last_sent_time = hrt_absolute_time(); -#ifdef MAVLINK_PARALLEL_LOGGING const ulog_stream_s &ulog_data = _ulog_stream_acked_sub.get(); -#else - const ulog_stream_s &ulog_data = _ulog_stream_sub.get(); -#endif mavlink_logging_data_acked_t msg; msg.sequence = ulog_data.msg_sequence; @@ -184,54 +134,58 @@ int MavlinkULog::handle_update(mavlink_channel_t channel) } } } - - if (!check_for_updates) { - return 0; - } } -#ifdef MAVLINK_PARALLEL_LOGGING - - if (_ulog_stream_acked_sub.updated()) { - _ulog_stream_acked_sub.update(); - - const ulog_stream_s &ulog_data = _ulog_stream_acked_sub.get(); - - if (ulog_data.timestamp > 0) { - _sent_tries = 1; - _last_sent_time = hrt_absolute_time(); - lock(); - _wait_for_ack_sequence = ulog_data.msg_sequence; - _ack_received = false; - unlock(); + while ((_current_num_msgs < _max_num_messages) && (_ulog_stream_sub.updated() || _ulog_stream_acked_sub.updated())) { + if (_ulog_stream_sub.updated()) { + const unsigned last_generation = _ulog_stream_sub.get_last_generation(); + _ulog_stream_sub.update(); - mavlink_logging_data_acked_t msg; - msg.sequence = ulog_data.msg_sequence; - msg.length = ulog_data.length; - msg.first_message_offset = ulog_data.first_message_offset; - msg.target_system = _target_system; - msg.target_component = _target_component; - memcpy(msg.data, ulog_data.data, sizeof(msg.data)); - mavlink_msg_logging_data_acked_send_struct(channel, &msg); + if (_ulog_stream_sub.get_last_generation() != last_generation + 1) { + perf_count(_msg_missed_ulog_stream_perf); + } - } + const ulog_stream_s &ulog_data = _ulog_stream_sub.get(); - } + if (ulog_data.timestamp > 0) { + if (ulog_data.flags & ulog_stream_s::FLAGS_NEED_ACK) { + _sent_tries = 1; + _last_sent_time = hrt_absolute_time(); + lock(); + _wait_for_ack_sequence = ulog_data.msg_sequence; + _ack_received = false; + unlock(); -#else + mavlink_logging_data_acked_t msg; + msg.sequence = ulog_data.msg_sequence; + msg.length = ulog_data.length; + msg.first_message_offset = ulog_data.first_message_offset; + msg.target_system = _target_system; + msg.target_component = _target_component; + memcpy(msg.data, ulog_data.data, sizeof(msg.data)); + mavlink_msg_logging_data_acked_send_struct(channel, &msg); - while ((_current_num_msgs < _max_num_messages) && _ulog_stream_sub.updated()) { - const unsigned last_generation = _ulog_stream_sub.get_last_generation(); - _ulog_stream_sub.update(); + } else { + mavlink_logging_data_t msg; + msg.sequence = ulog_data.msg_sequence; + msg.length = ulog_data.length; + msg.first_message_offset = ulog_data.first_message_offset; + msg.target_system = _target_system; + msg.target_component = _target_component; + memcpy(msg.data, ulog_data.data, sizeof(msg.data)); + mavlink_msg_logging_data_send_struct(channel, &msg); + } + } - if (_ulog_stream_sub.get_last_generation() != last_generation + 1) { - perf_count(_msg_missed_ulog_stream_perf); + ++_current_num_msgs; } - const ulog_stream_s &ulog_data = _ulog_stream_sub.get(); + if (check_for_updates && _ulog_stream_acked_sub.updated()) { + _ulog_stream_acked_sub.update(); - if (ulog_data.timestamp > 0) { - if (ulog_data.flags & ulog_stream_s::FLAGS_NEED_ACK) { + const ulog_stream_s &ulog_data = _ulog_stream_acked_sub.get(); + + if (ulog_data.timestamp > 0) { _sent_tries = 1; _last_sent_time = hrt_absolute_time(); lock(); @@ -248,19 +202,10 @@ int MavlinkULog::handle_update(mavlink_channel_t channel) memcpy(msg.data, ulog_data.data, sizeof(msg.data)); mavlink_msg_logging_data_acked_send_struct(channel, &msg); - } else { - mavlink_logging_data_t msg; - msg.sequence = ulog_data.msg_sequence; - msg.length = ulog_data.length; - msg.first_message_offset = ulog_data.first_message_offset; - msg.target_system = _target_system; - msg.target_component = _target_component; - memcpy(msg.data, ulog_data.data, sizeof(msg.data)); - mavlink_msg_logging_data_send_struct(channel, &msg); } - } - ++_current_num_msgs; + ++_current_num_msgs; + } } //need to update the rate? @@ -280,7 +225,6 @@ int MavlinkULog::handle_update(mavlink_channel_t channel) (double)(_rate_calculation_delta_t / 1e6)); } -#endif return 0; } @@ -335,6 +279,7 @@ void MavlinkULog::handle_ack(mavlink_logging_ack_t ack) lock(); if (_instance) { // make sure stop() was not called right before + //PX4_INFO("<< %u (wait: %u)", ack.sequence, _wait_for_ack_sequence); if (_wait_for_ack_sequence == ack.sequence) { _ack_received = true; publish_ack(ack.sequence); diff --git a/src/modules/mavlink/mavlink_ulog.h b/src/modules/mavlink/mavlink_ulog.h index 15f4d8f70a7d..9e534075920a 100644 --- a/src/modules/mavlink/mavlink_ulog.h +++ b/src/modules/mavlink/mavlink_ulog.h @@ -52,9 +52,6 @@ #include #include -#ifdef MAVLINK_PARALLEL_LOGGING -#include -#endif #include "mavlink_bridge_header.h" using namespace time_literals; @@ -128,9 +125,8 @@ class MavlinkULog static constexpr hrt_abstime _rate_calculation_delta_t = 100_ms; ///< rate update interval uORB::SubscriptionData _ulog_stream_sub{ORB_ID(ulog_stream)}; -#ifdef MAVLINK_PARALLEL_LOGGING uORB::SubscriptionData _ulog_stream_acked_sub {ORB_ID(ulog_stream_acked)}; -#endif + uORB::Publication _ulog_stream_ack_pub {ORB_ID(ulog_stream_ack)}; uint16_t _wait_for_ack_sequence; uint8_t _sent_tries = 0;