Skip to content

Commit

Permalink
Documentation sync with github
Browse files Browse the repository at this point in the history
NO CODE CHANGES
  • Loading branch information
OrionEdwards-Gallagher committed Feb 13, 2020
1 parent 0ed6f9c commit ed77960
Showing 1 changed file with 116 additions and 95 deletions.
211 changes: 116 additions & 95 deletions src/SerialQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@

namespace Dispatch
{
/// <summary>Implements a serial queue</summary>
/// <summary>
/// This class is the main purpose of the library.
/// It represents a serial queue which will run all it's callbacks sequentially and safely
/// (like a thread) but whose execution actually is performed on the OS threadpool.</summary>
public class SerialQueue : IDispatchQueue
{
enum AsyncState
Expand All @@ -54,8 +57,12 @@ enum AsyncState
volatile AsyncState m_asyncState = AsyncState.Idle; // acquire m_schedulerLock
bool m_isDisposed = false; // acquire m_schedulerLock

/// <summary>Constructs a new SerialQueue backed by the given ThreadPool</summary>
/// <summary>Constructs a new SerialQueue backed by a custom ThreadPool implementation.
/// This primarily exists to enable unit testing, however if you have a custom ThreadPool you could use it here</summary>
/// <param name="threadpool">The threadpool to queue async actions to</param>
/// <param name="name">An optional friendly name for this queue</param>
/// <param name="features">You may opt-out of certain features in order to reduce overhead.
/// You shouldn't need to do this except in extreme situations as shown by profiling.</param>
public SerialQueue(IThreadPool threadpool, string? name = null, SerialQueueFeatures features = SerialQueueFeatures.All)
{
m_threadPool = threadpool ?? throw new ArgumentNullException(nameof(threadpool));
Expand All @@ -66,10 +73,14 @@ public SerialQueue(IThreadPool threadpool, string? name = null, SerialQueueFeatu
m_syncContext = new DispatchQueueSynchronizationContext(this);
}

/// <summary>Constructs a new SerialQueue backed by the default TaskThreadPool</summary>
/// <summary>Constructs a new SerialQueue backed by the default TaskThreadPool.
/// This is the default constructor which is intended for normal use</summary>
/// <param name="name">An optional friendly name for this queue</param>
/// <param name="features">You may opt-out of certain features in order to reduce overhead.
/// You shouldn't need to do this except in extreme situations as shown by profiling.</param>
public SerialQueue(string? name = null, SerialQueueFeatures features = SerialQueueFeatures.All) : this(TaskThreadPool.Default, name, features) { }

/// <summary>Returns the name (if one is set)</summary>
/// <summary>Returns the friendly name (if one is set)</summary>
public string? Name { get; }

/// <summary>Returns the enabled features this serial queue has</summary>
Expand All @@ -91,6 +102,96 @@ public void VerifyQueue()
throw new InvalidOperationException("On the wrong queue");
}

/// <summary>Schedules the given action to run asynchronously on the queue when it is available</summary>
/// <param name="action">The function to run</param>
/// <returns>A disposable token which you can use to cancel the async action if it has not run yet.
/// It is always safe to dispose this token, even if the async action has already run</returns>
public virtual IDisposable DispatchAsync(Action action)
{
lock (m_schedulerLock)
{
if (m_isDisposed)
throw new ObjectDisposedException(nameof(SerialQueue), "Cannot call DispatchAsync on a disposed queue");

m_asyncActions.Add(action);

if (m_asyncState == AsyncState.Idle)
{
// even though we don't hold m_schedulerLock when asyncActionsAreProcessing is set to false
// that should be OK as the only "contention" happens up here while we do hold it
m_asyncState = AsyncState.Scheduled;
m_threadPool.QueueWorkItem(ProcessAsync);
}
}

return new AnonymousDisposable(() => {
// we can't "take it out" of the threadpool as not all threadpools support that
lock (m_schedulerLock)
m_asyncActions.Remove(action);
});
}

/// <summary>Runs the given action on the queue.
/// Blocks until the action is fully complete.
/// If the queue is not currently busy processing asynchronous actions (a very common state), this should have the same performance characteristics as a simple lock, so it is often nice and convenient.
/// The SerialQueue guarantees the action will run on the calling thread(it will NOT thread-jump).
/// Other implementations of IDispatchQueue reserve the right to run the action on a different thread(e.g WPF Dispatcher)</summary>
/// <param name="action">The function to run.</param>
public virtual void DispatchSync(Action action)
{
var prevStack = s_queueStack.Value.ToArray(); // there might be a more optimal way of doing this, it seems to be fast enough
s_queueStack.Value.Push(this);

bool schedulerLockTaken = false;
try
{
Monitor.Enter(m_schedulerLock, ref schedulerLockTaken);
Debug.Assert(schedulerLockTaken);

if (m_isDisposed)
throw new ObjectDisposedException(nameof(SerialQueue), "Cannot call DispatchSync on a disposed queue");

if (m_asyncState == AsyncState.Idle || prevStack.Contains(this)) // either queue is empty or it's a nested call
{
Monitor.Exit(m_schedulerLock);
schedulerLockTaken = false;

// process the action
lock (m_executionLock)
action(); // DO NOT CATCH EXCEPTIONS. We're excuting synchronously so just let it throw
return;
}

// if there is any async stuff scheduled we must also schedule
// else m_asyncState == AsyncState.Scheduled, OR we fell through from Processing
var asyncReady = new ManualResetEvent(false);
var syncDone = new ManualResetEvent(false);
DispatchAsync(() => {
asyncReady.Set();
syncDone.WaitOne();
});
Monitor.Exit(m_schedulerLock);
schedulerLockTaken = false;

try
{
asyncReady.WaitOne();
action(); // DO NOT CATCH EXCEPTIONS. We're excuting synchronously so just let it throw
}
finally
{
syncDone.Set(); // tell the dispatchAsync it can release the lock
}
}
finally
{
if (schedulerLockTaken)
Monitor.Exit(m_schedulerLock);

s_queueStack.Value.Pop(); // technically we leak the queue stack threadlocal, but it's probably OK. Windows will free it when the thread exits
}
}

/// <summary>Schedules the given action to run asynchronously on the queue after dueTime.</summary>
/// <remarks>The function is not guaranteed to run at dueTime as the queue may be busy, it will run when next able.</remarks>
/// <param name="dueTime">Delay before running the action</param>
Expand Down Expand Up @@ -137,35 +238,6 @@ public virtual IDisposable DispatchAfter(TimeSpan dueTime, Action action)
}
});
}

/// <summary>Schedules the given action to run asynchronously on the queue when it is available</summary>
/// <param name="action">The function to run</param>
/// <returns>A disposable token which you can use to cancel the async action if it has not run yet.
/// It is always safe to dispose this token, even if the async action has already run</returns>
public virtual IDisposable DispatchAsync(Action action)
{
lock (m_schedulerLock)
{
if (m_isDisposed)
throw new ObjectDisposedException(nameof(SerialQueue), "Cannot call DispatchAsync on a disposed queue");

m_asyncActions.Add(action);

if (m_asyncState == AsyncState.Idle)
{
// even though we don't hold m_schedulerLock when asyncActionsAreProcessing is set to false
// that should be OK as the only "contention" happens up here while we do hold it
m_asyncState = AsyncState.Scheduled;
m_threadPool.QueueWorkItem(ProcessAsync);
}
}

return new AnonymousDisposable(() => {
// we can't "take it out" of the threadpool as not all threadpools support that
lock (m_schedulerLock)
m_asyncActions.Remove(action);
});
}

/// <summary>Internal function which runs on the threadpool to execute the actual async actions</summary>
protected virtual void ProcessAsync()
Expand Down Expand Up @@ -235,65 +307,6 @@ protected virtual void ProcessAsync()
}
}

/// <summary>Runs the given action on the queue.
/// Blocks until the action is fully complete.
/// This implementation will not switch threads to run the function</summary>
/// <param name="action">The function to run.</param>
public virtual void DispatchSync(Action action)
{
var prevStack = s_queueStack.Value.ToArray(); // there might be a more optimal way of doing this, it seems to be fast enough
s_queueStack.Value.Push(this);

bool schedulerLockTaken = false;
try
{
Monitor.Enter(m_schedulerLock, ref schedulerLockTaken);
Debug.Assert(schedulerLockTaken);

if (m_isDisposed)
throw new ObjectDisposedException(nameof(SerialQueue), "Cannot call DispatchSync on a disposed queue");

if(m_asyncState == AsyncState.Idle || prevStack.Contains(this)) // either queue is empty or it's a nested call
{
Monitor.Exit(m_schedulerLock);
schedulerLockTaken = false;

// process the action
lock (m_executionLock)
action(); // DO NOT CATCH EXCEPTIONS. We're excuting synchronously so just let it throw
return;
}

// if there is any async stuff scheduled we must also schedule
// else m_asyncState == AsyncState.Scheduled, OR we fell through from Processing
var asyncReady = new ManualResetEvent(false);
var syncDone = new ManualResetEvent(false);
DispatchAsync(() => {
asyncReady.Set();
syncDone.WaitOne();
});
Monitor.Exit(m_schedulerLock);
schedulerLockTaken = false;

try
{
asyncReady.WaitOne();
action(); // DO NOT CATCH EXCEPTIONS. We're excuting synchronously so just let it throw
}
finally
{
syncDone.Set(); // tell the dispatchAsync it can release the lock
}
}
finally
{
if (schedulerLockTaken)
Monitor.Exit(m_schedulerLock);

s_queueStack.Value.Pop(); // technically we leak the queue stack threadlocal, but it's probably OK. Windows will free it when the thread exits
}
}

/// <summary>Shuts down the queue. All unstarted async actions will be dropped,
/// and any future attempts to call one of the Dispatch functions will throw an
/// ObjectDisposedException</summary>
Expand Down Expand Up @@ -321,8 +334,12 @@ protected virtual void Dispose(bool disposing)
}
}

/// <summary>Enables capture of a serial queue as a SynchronizationContext for async/await.
/// You shouldn't need to interact with this class yourself</summary>
public class DispatchQueueSynchronizationContext : SynchronizationContext
{
/// <summary>Constructs a new DispatchQueueSynchronizationContext wrapping the given queue</summary>
/// <param name="queue">Queue to post actions to</param>
public DispatchQueueSynchronizationContext(IDispatchQueue queue)
=> Queue = queue;

Expand All @@ -335,15 +352,19 @@ public override void Send(SendOrPostCallback d, object state)
=> Queue.DispatchSync(() => d(state));
}

// Use these to turn on and off various features of the serial queue for performance reasons
/// <summary>
/// Use these to turn on and off various features of the serial queue for performance reasons
/// </summary>
[Flags]
public enum SerialQueueFeatures
{
/// <summary>Only basic functionality</summary>
None = 0,
// Note: if there is a need for it, we could put the Verify/Re-entrant DispatchSync behaviour behind a feature
// which could improve performance significantly

/// <summary>If enabled, you may use this queue with async/await</summary>
SynchronizationContext = 1,

/// <summary>All features enabled</summary>
All = SynchronizationContext
}
}

0 comments on commit ed77960

Please sign in to comment.