Skip to content

Commit

Permalink
Fixed lifetime issue of startup mutex and condition variable.
Browse files Browse the repository at this point in the history
  • Loading branch information
Dimi1010 committed Jan 4, 2025
1 parent 7898a5d commit 7ada5c4
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 23 deletions.
10 changes: 9 additions & 1 deletion Pcap++/header/PfRingDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "SystemUtils.h"
#include "Packet.h"
#include <thread>
#include <mutex>
#include <condition_variable>

/// @file
Expand Down Expand Up @@ -47,6 +48,13 @@ namespace pcpp
void clear();
};

struct StartupBlock
{
std::mutex Mutex;
std::condition_variable Cond;
int State = 0;
};

pfring** m_PfRingDescriptors;
uint8_t m_NumOfOpenedRxChannels;
std::string m_DeviceName;
Expand All @@ -64,7 +72,7 @@ namespace pcpp
PfRingDevice(const char* deviceName);

bool initCoreConfigurationByCoreMask(CoreMask coreMask);
void captureThreadMain(std::condition_variable* startCond, std::mutex* startMutex, const int* startState);
void captureThreadMain(std::shared_ptr<StartupBlock> startupBlock);

int openSingleRxChannel(const char* deviceName, pfring** ring);

Expand Down
64 changes: 42 additions & 22 deletions Pcap++/src/PfRingDevice.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -449,9 +449,7 @@ namespace pcpp
return false;
}

std::mutex mutex;
std::condition_variable cond;
int startThread = 0;
std::shared_ptr<StartupBlock> startupBlock = std::make_shared<StartupBlock>();

m_StopThread = false;
int rxChannel = 0;
Expand All @@ -468,7 +466,7 @@ namespace pcpp
// create a new thread
m_CoreConfiguration[coreId].Channel = m_PfRingDescriptors[rxChannel++];
m_CoreConfiguration[coreId].RxThread =
std::thread(&pcpp::PfRingDevice::captureThreadMain, this, &cond, &mutex, &startThread);
std::thread(&pcpp::PfRingDevice::captureThreadMain, this, startupBlock);

// set affinity to cores
cpu_set_t cpuset;
Expand All @@ -479,14 +477,21 @@ namespace pcpp
if (err != 0)
{
PCPP_LOG_ERROR("Error while binding thread to core " << coreId << ": errno=" << err);
startThread = 1;
{
std::unique_lock<std::mutex> lock(startupBlock->Mutex);
startupBlock->State = 1;
}
startupBlock->Cond.notify_all();
clearCoreConfiguration();
return false;
}
}

startThread = 2;
cond.notify_all();
{
std::unique_lock<std::mutex> lock(startupBlock->Mutex);
startupBlock->State = 2;
}
startupBlock->Cond.notify_all();

return true;
}
Expand Down Expand Up @@ -517,28 +522,35 @@ namespace pcpp

m_ReentrantMode = false;

std::mutex mutex;
std::condition_variable cond;
int startThread = 0;
std::shared_ptr<StartupBlock> startupBlock = std::make_shared<StartupBlock>();

cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(0, &cpuset);
m_CoreConfiguration[0].IsInUse = true;
m_CoreConfiguration[0].Channel = m_PfRingDescriptors[0];
m_CoreConfiguration[0].RxThread =
std::thread(&pcpp::PfRingDevice::captureThreadMain, this, &cond, &mutex, &startThread);
m_CoreConfiguration[0].RxThread = std::thread(&pcpp::PfRingDevice::captureThreadMain, this, startupBlock);
m_CoreConfiguration[0].IsAffinitySet = false;
int err = pthread_setaffinity_np(m_CoreConfiguration[0].RxThread.native_handle(), sizeof(cpu_set_t), &cpuset);
if (err != 0)
{
startThread = 1;
{
std::unique_lock<std::mutex> lock(startupBlock->Mutex);
startupBlock->State = 1;
}
startupBlock->Cond.notify_all();
m_CoreConfiguration[0].RxThread.join();

PCPP_LOG_ERROR("Error while binding thread to core 0: errno=" << err);
clearCoreConfiguration();
return false;
}
startThread = 2;
cond.notify_all();

{
std::unique_lock<std::mutex> lock(startupBlock->Mutex);
startupBlock->State = 2;
}
startupBlock->Cond.notify_all();

PCPP_LOG_DEBUG("Capturing started for device [" << m_DeviceName << "]");
return true;
Expand All @@ -559,19 +571,27 @@ namespace pcpp
PCPP_LOG_DEBUG("All capturing threads stopped");
}

void PfRingDevice::captureThreadMain(std::condition_variable* startCond, std::mutex* startMutex,
const int* startState)
void PfRingDevice::captureThreadMain(std::shared_ptr<StartupBlock> startupBlock)
{
while (*startState == 0)
if (StartupBlock == nullptr)
{
std::unique_lock<std::mutex> lock(*startMutex);
startCond->wait_for(lock, std::chrono::milliseconds(100));
PCPP_LOG_ERROR("Capture thread started without a startup block. Exiting capture thread");
return;
}
if (*startState == 1)

{
return;
std::unique_lock<std::mutex> lock(startupBlock->Mutex);
startupBlock->Cond.wait(lock, [&] { return startupBlock->State != 0; });

if (startupBlock->State == 1)
{
return;
}
}

// Startup is complete. The block is no longer needed.
startupBlock = nullptr;

int coreId = this->getCurrentCoreId().Id;
pfring* ring = nullptr;

Expand Down

0 comments on commit 7ada5c4

Please sign in to comment.