Skip to content

Commit

Permalink
Implement FreezeDetector class (used to detect freezes of Handling th…
Browse files Browse the repository at this point in the history
…read),

Move Sleep and Time Diff functions to Utilities,
Extend Thread Structure with  functions used for maniuplation with it
  • Loading branch information
NTX authored and NTX committed Mar 10, 2012
1 parent 897a153 commit 2956f6a
Show file tree
Hide file tree
Showing 12 changed files with 195 additions and 42 deletions.
56 changes: 54 additions & 2 deletions Application/Application.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,44 @@
#include "Application.h"

void* CallFreezeDetector(void* obj)
{
((FreezeDetector*)obj)->_process(NULL);
return NULL;
}

void FreezeDetector::Run()
{
_detectorThread = Thread::CreateThread(CallFreezeDetector, this);
}

void* FreezeDetector::_process(void*)
{
_active = true;
_detectorThread->status = THREAD_ACTIVE;
while (!_exit)
{
if (_maxDiffTime < _diff)
{
sLog->outError("Freeze Detected !! Diff Time: %lu", _diff);
assert(false);
}
if (_pause)
{
Thread::SuspendThisThread();
_pause = false;
}
usleep(_maxDiffTime - _diff);
}
_detectorThread->status = THREAD_EXIT;
_active = false;
return NULL;
}

Application::Application(int argc, char* argv[], const char *conf):
debug(false), control(false), daemonize(false), libLoaded(false), terminate(false)
debug(false), control(false), daemonize(false), libLoaded(false), terminate(false),
_diffTime(0)
{
gettimeofday(&_lastUpdate, 0);
if (argc > 0)
{
ApplicationAddress = argv[0];
Expand Down Expand Up @@ -64,7 +99,7 @@ debug(false), control(false), daemonize(false), libLoaded(false), terminate(fals
sLog->outString("Revision hash: %s Revision Date: %s", __ShortCommitHash, __CommitDate);
sLog->outString("Built: %s %s", __DATE__, __TIME__);
sLog->outString();
sLog->outString("Using Protocol: %s", StringConfigs[CONFIG_STRING_PROTOCOL_NAME].c_str());
sLog->outString("Initialized in: %lu ms", getMsTimeDiffToNow(_lastUpdate));
sLog->outString();
}

Expand Down Expand Up @@ -96,9 +131,14 @@ void Application::LoadConfigs()
LoadBoolConfig("Server.Daemonize", CONFIG_BOOL_DAEMONIZE, false);
LoadStringConfig("Server.PidFile", CONFIG_STRING_PID_FILE, "/var/lock/ikaros.pid");
LoadStringConfig("Server.WorkingDirectory", CONFIG_STRING_WORKING_DIRECTORY, ApplicationAddress.substr(0, ApplicationAddress.find_last_of('/')));

LoadBoolConfig("Server.Log", CONFIG_BOOL_LOG, true);
LoadIntConfig("Server.LogLevel", CONFIG_INT_LOG_LEVEL, 0);

LoadBoolConfig("Server.EnableFreezeDetector", CONFIG_BOOL_ENABLE_FREEZE_DETECTOR, true);
LoadIntConfig("Server.FreezeDetectorMaxDiff", CONFIG_INT_FREEZE_DETECTOR_MAX_DIFF, 30000);


LoadIntConfig("Protocol.BindPort", CONFIG_INT_BIND_PORT, 3535);
LoadStringConfig("Protocol.BindIp", CONFIG_STRING_BIND_IP, "127.0.0.1");
LoadStringConfig("Protocol.ProtocolName", CONFIG_STRING_PROTOCOL_NAME);
Expand Down Expand Up @@ -135,6 +175,9 @@ void Application::outDebugParams() const
sLog->outString("CONFIG_STRING_WORKING_DIRECTORY='%s'", StringConfigs[CONFIG_STRING_WORKING_DIRECTORY].c_str());
sLog->outString("CONFIG_BOOL_LOG=%s", toString(BoolConfigs[CONFIG_BOOL_LOG]));
sLog->outString("CONFIG_INT_LOG_LEVEL=%d", IntConfigs[CONFIG_INT_LOG_LEVEL]);
sLog->outString("CONFIG_BOOL_ENABLE_FREEZE_DETECTOR=%s", toString(BoolConfigs[CONFIG_BOOL_ENABLE_FREEZE_DETECTOR]));
sLog->outString("CONFIG_INT_FREEZE_DETECTOR_MAX_DIFF=%d", IntConfigs[CONFIG_INT_FREEZE_DETECTOR_MAX_DIFF]);

sLog->outString("Control=%s", toString(control));
sLog->outString("Debug=%s", toString(debug));
sLog->outString();
Expand Down Expand Up @@ -291,6 +334,8 @@ void Application::_initGlobals()
socketMgr = new ConnectionMgr();
threadMgr = new ThreadMgr();
handler = new PacketHandler();
if (BoolConfigs[CONFIG_BOOL_ENABLE_FREEZE_DETECTOR])
freezeDetector = new FreezeDetector(IntConfigs[CONFIG_INT_FREEZE_DETECTOR_MAX_DIFF], handler->_diffTime);
}

void Application::_uninitGlobals()
Expand All @@ -300,6 +345,8 @@ void Application::_uninitGlobals()
delete socketMgr;
delete threadMgr;
delete handler;
if (freezeDetector)
delete freezeDetector;
delete sLog;
}

Expand Down Expand Up @@ -389,7 +436,10 @@ uint32 Application::Update()
selectTimeout.tv_sec = 0;
selectTimeout.tv_usec = 500000;

gettimeofday(&_lastUpdate, 0);

int ProcessQueue = threadMgr->CreateThread("Queue", &CallProcessQueue, handler);
freezeDetector->Run();
if (ProcessQueue == -1)
{
sLog->outError("[Recv Thread] Executing Packet Handling Queue Failed errno: %d",errno);
Expand All @@ -403,6 +453,8 @@ uint32 Application::Update()
selectTimeout.tv_usec = 500000;
nfds = ((socketMgr->GetHighestFd() > ServerSocket) ? socketMgr->GetHighestFd() : ServerSocket) + 1;
r = select(nfds, &test, NULL, NULL, &selectTimeout);
_diffTime = getMsTimeDiffToNow(_lastUpdate);
gettimeofday(&_lastUpdate, 0);
if (r == 0)
continue;

Expand Down
37 changes: 34 additions & 3 deletions Application/Application.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ enum IntOptions
{
CONFIG_INT_BIND_PORT,
CONFIG_INT_LOG_LEVEL,
CONFIG_INT_FREEZE_DETECTOR_MAX_DIFF,
CONFIG_INT_MAX
};

Expand All @@ -65,6 +66,7 @@ enum BoolOptions
{
CONFIG_BOOL_DAEMONIZE,
CONFIG_BOOL_LOG,
CONFIG_BOOL_ENABLE_FREEZE_DETECTOR,
CONFIG_BOOL_MAX
};

Expand All @@ -75,6 +77,34 @@ class ConnectionMgr;

extern SimpleLog* sLog;

void* CallFreezeDetector(void* obj);

class FreezeDetector
{
friend void* CallFreezeDetector(void* obj);

public:
FreezeDetector(uint64 maxDiffTime, uint64& diff):
_maxDiffTime(maxDiffTime), _diff(diff), _exit(false), _pause(false), _active(false), _detectorThread(NULL) { }

void Run();
void Exit() { _exit = true; }
void Pause() { _pause = true; }
void Continue() { _detectorThread->Continue(); }

int GetStatus() const { if (_detectorThread) return _detectorThread->status; return 0; }

private:
uint64 _maxDiffTime;
uint64& _diff;
bool _exit;
bool _pause;
bool _active;
void* _process(void*);
Thread* _detectorThread;
};


class Application : public Daemon
{
public:
Expand All @@ -90,6 +120,7 @@ class Application : public Daemon
ConnectionMgr *socketMgr;
ThreadMgr *threadMgr;
PacketHandler *handler;
FreezeDetector *freezeDetector;

bool LoadLibrary();

Expand All @@ -98,8 +129,6 @@ class Application : public Daemon
inline bool Exiting() const { return terminate; }
void Terminate();

pthread_mutex_t* GetLogWriteMutex() { return &logMutex; }

protected:
Options FileOptions;
Options RunOptions;
Expand All @@ -115,7 +144,9 @@ class Application : public Daemon
string StringConfigs[CONFIG_STRING_MAX];

string ApplicationAddress;
pthread_mutex_t logMutex;

uint64 _diffTime;
timeval _lastUpdate;

private:
void ParseParams();
Expand Down
5 changes: 3 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ set (MultiServer_LINK_FLAGS ${CMAKE_CXX_FLAGS})

add_definitions(${CMAKE_CXX_FLAGS})


set (STAT_CORE
Utilities/Utils.cpp
DataArrays/binData.cpp
Expand All @@ -96,7 +97,6 @@ set (STAT_CORE
set (STAT_MAIN main.cpp)



add_library(Core STATIC ${STAT_CORE})
target_link_libraries(Core ${CMAKE_DL_LIBS} Core)

Expand All @@ -112,7 +112,8 @@ if (Protocols)
endif (Protocols)

file(MAKE_DIRECTORY ${InstallDirectory})

install(TARGETS MultiServer DESTINATION ${InstallDirectory})
if (NOT EXISTS "${InstallDirectory}config.conf")
install(FILES config.conf DESTINATION ${InstallDirectory})
install(FILES config.conf DESTINATION ${InstallDirectory})
endif (NOT EXISTS "${InstallDirectory}config.conf")
39 changes: 8 additions & 31 deletions Handling/Handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,8 @@ void SigContHandler(int)

}

uint64 getMsTimeDiff(timeval a, timeval b)
{
return (b.tv_sec - a.tv_sec) * 1000 + (b.tv_usec - a.tv_usec) / 1000;
}

uint64 getMsTimeDiffToNow(timeval a)
{
timeval b;
gettimeofday(&b, 0);
return (b.tv_sec - a.tv_sec) * 1000 + (b.tv_usec - a.tv_usec) / 1000;
}

int nanosleep(uint64 time)
{
timespec t;
t.tv_sec = 0;
t.tv_nsec = time;
return nanosleep(&t, NULL);
}

inline int usleep(uint64 time)
{
return nanosleep(time * IN_MICROSECONDS);
}

inline int msleep(uint64 time)
{
return nanosleep(time * IN_MILLISECONDS);
}

PacketHandler::PacketHandler():
process(true)
_diffTime(0), process(true)
{

}
Expand Down Expand Up @@ -101,11 +71,14 @@ void PacketHandler::ProcessQueue()
sigset_t suspendSig;
sigemptyset(&suspendSig);
int sig = SIGCONT;

timeval workBegan;
gettimeofday(&workBegan, 0);

while (process)
{
UpdateDelayed();
_diffTime = getMsTimeDiffToNow(workBegan);
gettimeofday(&workBegan, 0);
if (!queue.size() && !delayedQueue.size())
{
Expand All @@ -114,8 +87,11 @@ void PacketHandler::ProcessQueue()
sigaddset(&suspendSig, SIGINT);
sigaddset(&suspendSig, SIGTERM);
app->threadMgr->SetThreadStatus(GetThisThread(), THREAD_SUSPENDED);
app->freezeDetector->Pause();
sigwait(&suspendSig, &sig);
app->threadMgr->SetThreadStatus(GetThisThread(), THREAD_ACTIVE);
_diffTime = getMsTimeDiffToNow(workBegan);
app->freezeDetector->Continue();
//sLog->outDebug("Process Queue was suspended %lu milliseconds", getMsTimeDiffToNow(workBegan));
}

Expand Down Expand Up @@ -147,6 +123,7 @@ void PacketHandler::ProcessQueue()
if (delayedQueue.size())
msleep(500);
}
app->freezeDetector->Exit();
sLog->outControl("[PacketHandler] Process Queue exit");
ThreadMgr::Exit(NULL);
}
2 changes: 2 additions & 0 deletions Handling/Handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class PacketHandler
public:
typedef std::queue<Event> EventQueue;
typedef std::list<DelayedEvent> DelayedEventQueue;

PacketHandler();
~PacketHandler();

Expand All @@ -78,6 +79,7 @@ class PacketHandler
void UpdateDelayed();
void Terminate();

uint64 _diffTime;
private:
SignalHandler* sigHandler;

Expand Down
1 change: 1 addition & 0 deletions Protocol/FTP/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ include_directories(
)

add_library(${LibraryName} SHARED Base.cpp)

install(TARGETS ${LibraryName} DESTINATION ${InstallDirectory})

if (EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/${ConfigName}")
Expand Down
6 changes: 4 additions & 2 deletions Protocol/FTP/FTP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ void* SendData(void* args)
size_t dataLength = sendStat->l;
FTP::SendCommandResponse(sock, 150);

int sendReturn = 0, sleepTime = intConfigs[CONFIG_INT_SEND_WAIT_TIME], errSleepTime = intConfigs[CONFIG_INT_SEND_ERROR_WAIT_TIME];
int sendReturn = 0;
uint64 sleepTime = intConfigs[CONFIG_INT_SEND_WAIT_TIME], errSleepTime = intConfigs[CONFIG_INT_SEND_ERROR_WAIT_TIME];
size_t sended = 0, sendingPart = intConfigs[CONFIG_INT_MAX_DATA_SEGMENT_SIZE];
bool interrupt = false;

Expand Down Expand Up @@ -163,7 +164,8 @@ void* RecvData(void* args)
const char* fileName = recvStat->f;
FTP::SendCommandResponse(sock, 150);

int recvReturn = 0, sleepTime = intConfigs[CONFIG_INT_RECV_WAIT_TIME], errSleepTime = intConfigs[CONFIG_INT_RECV_ERROR_WAIT_TIME];
int recvReturn = 0;
uint64 sleepTime = intConfigs[CONFIG_INT_RECV_WAIT_TIME], errSleepTime = intConfigs[CONFIG_INT_RECV_ERROR_WAIT_TIME];
size_t recieved = 0, recievePart = intConfigs[CONFIG_INT_MAX_DATA_SEGMENT_SIZE];
bool interrupt = false;
BinnaryData* recvData = new BinnaryData();
Expand Down
1 change: 1 addition & 0 deletions Protocol/FTP/FTP.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "Includes.cpp"
#include "defines.h"
#include "Utils.h"

#include "ConfigMgr.cpp"

Expand Down
1 change: 1 addition & 0 deletions Protocol/Includes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "Application.h"
#include "Socket.h"
#include "SimpleLog.h"
#include "Utils.cpp"

extern Application* app;

Expand Down
Loading

0 comments on commit 2956f6a

Please sign in to comment.