Skip to content

Commit

Permalink
Sender
Browse files Browse the repository at this point in the history
- Calling `sq(ubyte[])` now will wake up the condition variable

Receiver

- Calling `rq(ubyte[])` now will wake up the condition variable
- Updated `recvHandlerFunc()` to use the condition variable
- Callind `end()` will wakeup the sleeping thread
  • Loading branch information
deavmi committed Oct 23, 2023
1 parent ea2b070 commit c25c28c
Showing 1 changed file with 20 additions and 44 deletions.
64 changes: 20 additions & 44 deletions source/birchwood/client/receiver.d
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import core.thread : Thread, dur;

import std.container.slist : SList;
import core.sync.mutex : Mutex;
import core.sync.condition : Condition;

import eventy : EventyEvent = Event;

Expand Down Expand Up @@ -39,11 +40,10 @@ public final class ReceiverThread : Thread
private Mutex recvQueueLock;

/**
* The libsnooze event to await on which
* when we wake up signals a new message
* to be processed and received
* Condition variable for waking
* up receive queue reader
*/
private Event receiveEvent;
private Condition recvQueueCond;

/**
* The associated IRC client
Expand All @@ -64,9 +64,8 @@ public final class ReceiverThread : Thread
{
super(&recvHandlerFunc);
this.client = client;
this.receiveEvent = new Event();
this.recvQueueLock = new Mutex();
this.receiveEvent.ensure(this);
this.recvQueueCond = new Condition(this.recvQueueLock);
}

/**
Expand All @@ -84,14 +83,11 @@ public final class ReceiverThread : Thread
/* Add to queue */
recvQueue.insertAfter(recvQueue[], encodedMessage);

/* Wake the sleeping message handler */
recvQueueCond.notify();

/* Unlock queue */
recvQueueLock.unlock();

/**
* Wake up all threads waiting on this event
* (if any, and if so it would only be the receiver)
*/
receiveEvent.notifyAll();
}

/**
Expand All @@ -108,34 +104,12 @@ public final class ReceiverThread : Thread
{
while(client.isRunning())
{
// TODO: We could look at libsnooze wait starvation or mutex racing (future thought)

try
{
receiveEvent.wait();
}
catch(InterruptedException e)
{
version(unittest)
{
writeln("wait() interrupted");
}
continue;
}
catch(FatalException e)
{
// TODO: This should crash and end
version(unittest)
{
writeln("wait() had a FATAL error!!!!!!!!!!!");
}
continue;
}


/* Lock the receieve queue */
/* Lock the queue */
recvQueueLock.lock();

/* Sleep till woken (new message) */
recvQueueCond.wait(); // TODO: Check SyncError?

/* Parsed messages */
SList!(Message) currentMessageQueue;

Expand Down Expand Up @@ -237,14 +211,16 @@ public final class ReceiverThread : Thread
*/
public void end()
{
// TODO: See above notes about libsnooze behaviour due
// ... to usage in our context
receiveEvent.notifyAll();
/* Lock the queue */
recvQueueLock.lock();

/* Wake up sleeping thread (so it can exit) */
recvQueueCond.notify();

/* Unlock the queue */
recvQueueLock.unlock();

// Wait on the manager thread to end
join();

// Dispose the eventy event (TODO: We could do this then join for same effect)
receiveEvent.dispose();
}
}

0 comments on commit c25c28c

Please sign in to comment.