From e0ca321f7f1c7895101b650ce9f8129432c0b6c3 Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Wed, 8 Jan 2025 05:17:54 +0200 Subject: [PATCH] Fixed lifetime issue of startup mutex and condition variable in PfRingDevice. (#1679) * Fixed lifetime issue of startup mutex and condition variable. * Variable name fixup. --- Pcap++/header/PfRingDevice.h | 10 +++++- Pcap++/src/PfRingDevice.cpp | 64 +++++++++++++++++++++++------------- 2 files changed, 51 insertions(+), 23 deletions(-) diff --git a/Pcap++/header/PfRingDevice.h b/Pcap++/header/PfRingDevice.h index 304f40c3c8..795ed42a9b 100644 --- a/Pcap++/header/PfRingDevice.h +++ b/Pcap++/header/PfRingDevice.h @@ -7,6 +7,7 @@ #include "SystemUtils.h" #include "Packet.h" #include +#include #include /// @file @@ -47,6 +48,13 @@ namespace pcpp void clear(); }; + struct StartupBlock + { + std::mutex Mutex; + std::condition_variable Cond; + int State = 0; + }; + pfring** m_PfRingDescriptors; uint8_t m_NumOfOpenedRxChannels; std::string m_DeviceName; @@ -64,7 +72,7 @@ namespace pcpp PfRingDevice(const char* deviceName); bool initCoreConfigurationByCoreMask(CoreMask coreMask); - void captureThreadMain(std::condition_variable* startCond, std::mutex* startMutex, const int* startState); + void captureThreadMain(std::shared_ptr startupBlock); int openSingleRxChannel(const char* deviceName, pfring** ring); diff --git a/Pcap++/src/PfRingDevice.cpp b/Pcap++/src/PfRingDevice.cpp index 8a945a9c20..55134abbe1 100644 --- a/Pcap++/src/PfRingDevice.cpp +++ b/Pcap++/src/PfRingDevice.cpp @@ -449,9 +449,7 @@ namespace pcpp return false; } - std::mutex mutex; - std::condition_variable cond; - int startThread = 0; + std::shared_ptr startupBlock = std::make_shared(); m_StopThread = false; int rxChannel = 0; @@ -468,7 +466,7 @@ namespace pcpp // create a new thread m_CoreConfiguration[coreId].Channel = m_PfRingDescriptors[rxChannel++]; m_CoreConfiguration[coreId].RxThread = - std::thread(&pcpp::PfRingDevice::captureThreadMain, this, &cond, &mutex, &startThread); + std::thread(&pcpp::PfRingDevice::captureThreadMain, this, startupBlock); // set affinity to cores cpu_set_t cpuset; @@ -479,14 +477,21 @@ namespace pcpp if (err != 0) { PCPP_LOG_ERROR("Error while binding thread to core " << coreId << ": errno=" << err); - startThread = 1; + { + std::unique_lock lock(startupBlock->Mutex); + startupBlock->State = 1; + } + startupBlock->Cond.notify_all(); clearCoreConfiguration(); return false; } } - startThread = 2; - cond.notify_all(); + { + std::unique_lock lock(startupBlock->Mutex); + startupBlock->State = 2; + } + startupBlock->Cond.notify_all(); return true; } @@ -517,28 +522,35 @@ namespace pcpp m_ReentrantMode = false; - std::mutex mutex; - std::condition_variable cond; - int startThread = 0; + std::shared_ptr startupBlock = std::make_shared(); cpu_set_t cpuset; CPU_ZERO(&cpuset); CPU_SET(0, &cpuset); m_CoreConfiguration[0].IsInUse = true; m_CoreConfiguration[0].Channel = m_PfRingDescriptors[0]; - m_CoreConfiguration[0].RxThread = - std::thread(&pcpp::PfRingDevice::captureThreadMain, this, &cond, &mutex, &startThread); + m_CoreConfiguration[0].RxThread = std::thread(&pcpp::PfRingDevice::captureThreadMain, this, startupBlock); m_CoreConfiguration[0].IsAffinitySet = false; int err = pthread_setaffinity_np(m_CoreConfiguration[0].RxThread.native_handle(), sizeof(cpu_set_t), &cpuset); if (err != 0) { - startThread = 1; + { + std::unique_lock lock(startupBlock->Mutex); + startupBlock->State = 1; + } + startupBlock->Cond.notify_all(); + m_CoreConfiguration[0].RxThread.join(); + PCPP_LOG_ERROR("Error while binding thread to core 0: errno=" << err); clearCoreConfiguration(); return false; } - startThread = 2; - cond.notify_all(); + + { + std::unique_lock lock(startupBlock->Mutex); + startupBlock->State = 2; + } + startupBlock->Cond.notify_all(); PCPP_LOG_DEBUG("Capturing started for device [" << m_DeviceName << "]"); return true; @@ -559,19 +571,27 @@ namespace pcpp PCPP_LOG_DEBUG("All capturing threads stopped"); } - void PfRingDevice::captureThreadMain(std::condition_variable* startCond, std::mutex* startMutex, - const int* startState) + void PfRingDevice::captureThreadMain(std::shared_ptr startupBlock) { - while (*startState == 0) + if (startupBlock == nullptr) { - std::unique_lock lock(*startMutex); - startCond->wait_for(lock, std::chrono::milliseconds(100)); + PCPP_LOG_ERROR("Capture thread started without a startup block. Exiting capture thread"); + return; } - if (*startState == 1) + { - return; + std::unique_lock lock(startupBlock->Mutex); + startupBlock->Cond.wait(lock, [&] { return startupBlock->State != 0; }); + + if (startupBlock->State == 1) + { + return; + } } + // Startup is complete. The block is no longer needed. + startupBlock = nullptr; + int coreId = this->getCurrentCoreId().Id; pfring* ring = nullptr;