Skip to content

Commit

Permalink
Add Priority Connection Queue to Worker (#4386)
Browse files Browse the repository at this point in the history
* Support Priority Work on Connections

* few nits

* Make CodeCheck builds happy (hopefully)

* Name tweak

* Add Priority Connection Queue to Worker (WIP)

* wip

* Fix openssl3

* revert spinquic

* Fix start comparison

* More fixes

* slight improvement

* slight change

* slight refactoring

* more refactoring

* Trying to debug failures

* tmp

* basic tests

* 3 test cases

* rollback QuicWorkerQueueConnection

* priority queueing to follow normal queueing

* cleanup and refactoring

* try disabling high priority GetParam

* sleep for flush Connections from worker queue

* empirical solution

* priority connection re-entry

---------

Co-authored-by: ami-GS <[email protected]>
  • Loading branch information
nibanks and ami-GS authored Aug 5, 2024
1 parent 9cb53ee commit c689d51
Show file tree
Hide file tree
Showing 12 changed files with 416 additions and 28 deletions.
4 changes: 2 additions & 2 deletions src/core/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ QuicConnQueuePriorityOper(
// The connection needs to be queued on the worker because this was the
// first operation in our OperQ.
//
QuicWorkerQueueConnection(Connection->Worker, Connection); // TODO - Support priority connections on worker?
QuicWorkerQueuePriorityConnection(Connection->Worker, Connection);
}
}

Expand All @@ -751,7 +751,7 @@ QuicConnQueueHighestPriorityOper(
// The connection needs to be queued on the worker because this was the
// first operation in our OperQ.
//
QuicWorkerQueueConnection(Connection->Worker, Connection);
QuicWorkerQueuePriorityConnection(Connection->Worker, Connection);
}
}

Expand Down
1 change: 1 addition & 0 deletions src/core/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ typedef struct QUIC_CONNECTION {
//
BOOLEAN WorkerProcessing : 1;
BOOLEAN HasQueuedWork : 1;
BOOLEAN HasPriorityWork : 1;

//
// Set of current reasons sending more packets is currently blocked.
Expand Down
3 changes: 2 additions & 1 deletion src/core/operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,8 @@ QuicOperationEnqueue(

//
// Enqueues an operation into the priority part of the queue. Returns TRUE if
// the queue was previously empty and not already being processed.
// the priority queue was previously empty and not already being processed. Note
// that the regular queue might not have been empty.
//
_IRQL_requires_max_(DISPATCH_LEVEL)
BOOLEAN
Expand Down
69 changes: 63 additions & 6 deletions src/core/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ QuicWorkerInitialize(
CxPlatEventInitialize(&Worker->Done, TRUE, FALSE);
CxPlatEventInitialize(&Worker->Ready, FALSE, FALSE);
CxPlatListInitializeHead(&Worker->Connections);
Worker->PriorityConnectionsTail = &Worker->Connections.Flink;
CxPlatListInitializeHead(&Worker->Operations);
CxPlatPoolInitialize(FALSE, sizeof(QUIC_STREAM), QUIC_POOL_STREAM, &Worker->StreamPool);
CxPlatPoolInitialize(FALSE, sizeof(QUIC_RECV_CHUNK)+QUIC_DEFAULT_STREAM_RECV_BUFFER_SIZE, QUIC_POOL_SBUF, &Worker->DefaultReceiveBufferPool);
Expand Down Expand Up @@ -169,6 +170,7 @@ QuicWorkerUninitialize(
CxPlatEventUninitialize(Worker->Ready);

CXPLAT_TEL_ASSERT(CxPlatListIsEmpty(&Worker->Connections));
Worker->PriorityConnectionsTail = NULL;
CXPLAT_TEL_ASSERT(CxPlatListIsEmpty(&Worker->Operations));

CxPlatPoolUninitialize(&Worker->StreamPool);
Expand Down Expand Up @@ -222,10 +224,10 @@ QuicWorkerQueueConnection(
{
CXPLAT_DBG_ASSERT(Connection->Worker != NULL);
BOOLEAN ConnectionQueued = FALSE;
BOOLEAN WakeWorkerThread = FALSE;

CxPlatDispatchLockAcquire(&Worker->Lock);

BOOLEAN WakeWorkerThread;
if (!Connection->WorkerProcessing && !Connection->HasQueuedWork) {
WakeWorkerThread = QuicWorkerIsIdle(Worker);
Connection->Stats.Schedule.LastQueueTime = CxPlatTimeUs32();
Expand All @@ -237,20 +239,61 @@ QuicWorkerQueueConnection(
QuicConnAddRef(Connection, QUIC_CONN_REF_WORKER);
CxPlatListInsertTail(&Worker->Connections, &Connection->WorkerLink);
ConnectionQueued = TRUE;
} else {
WakeWorkerThread = FALSE;
}

Connection->HasQueuedWork = TRUE;

CxPlatDispatchLockRelease(&Worker->Lock);

if (ConnectionQueued) {
if (WakeWorkerThread) {
QuicWorkerThreadWake(Worker);
}
QuicPerfCounterIncrement(QUIC_PERF_COUNTER_CONN_QUEUE_DEPTH);
}
}

if (WakeWorkerThread) {
QuicWorkerThreadWake(Worker);
_IRQL_requires_max_(DISPATCH_LEVEL)
void
QuicWorkerQueuePriorityConnection(
_In_ QUIC_WORKER* Worker,
_In_ QUIC_CONNECTION* Connection
)
{
CXPLAT_DBG_ASSERT(Connection->Worker != NULL);
BOOLEAN ConnectionQueued = FALSE;
BOOLEAN WakeWorkerThread = FALSE;

CxPlatDispatchLockAcquire(&Worker->Lock);

if (!Connection->WorkerProcessing && !Connection->HasPriorityWork) {
if (!Connection->HasQueuedWork) { // Not already queued for normal priority work
WakeWorkerThread = QuicWorkerIsIdle(Worker);
Connection->Stats.Schedule.LastQueueTime = CxPlatTimeUs32();
QuicTraceEvent(
ConnScheduleState,
"[conn][%p] Scheduling: %u",
Connection,
QUIC_SCHEDULE_QUEUED);
QuicConnAddRef(Connection, QUIC_CONN_REF_WORKER);
ConnectionQueued = TRUE;
} else { // Moving from normal priority to high priority
CxPlatListEntryRemove(&Connection->WorkerLink);
}
CxPlatListInsertTail(*Worker->PriorityConnectionsTail, &Connection->WorkerLink);
Worker->PriorityConnectionsTail = &Connection->WorkerLink.Flink;
Connection->HasPriorityWork = TRUE;
}

Connection->HasQueuedWork = TRUE;

CxPlatDispatchLockRelease(&Worker->Lock);

if (ConnectionQueued) {
if (WakeWorkerThread) {
QuicWorkerThreadWake(Worker);
}
QuicPerfCounterIncrement(QUIC_PERF_COUNTER_CONN_QUEUE_DEPTH);
}
}

Expand Down Expand Up @@ -365,9 +408,13 @@ QuicWorkerGetNextConnection(
Connection =
CXPLAT_CONTAINING_RECORD(
CxPlatListRemoveHead(&Worker->Connections), QUIC_CONNECTION, WorkerLink);
if (Worker->PriorityConnectionsTail == &Connection->WorkerLink.Flink) {
Worker->PriorityConnectionsTail = &Worker->Connections.Flink;
}
CXPLAT_DBG_ASSERT(!Connection->WorkerProcessing);
CXPLAT_DBG_ASSERT(Connection->HasQueuedWork);
Connection->HasQueuedWork = FALSE;
Connection->HasPriorityWork = FALSE;
Connection->WorkerProcessing = TRUE;
QuicPerfCounterDecrement(QUIC_PERF_COUNTER_CONN_QUEUE_DEPTH);
}
Expand Down Expand Up @@ -517,7 +564,14 @@ QuicWorkerProcessConnection(
if (!Connection->State.UpdateWorker) {
if (Connection->HasQueuedWork) {
Connection->Stats.Schedule.LastQueueTime = CxPlatTimeUs32();
CxPlatListInsertTail(&Worker->Connections, &Connection->WorkerLink);
if (&Connection->OperQ.List.Flink != Connection->OperQ.PriorityTail) {
// priority operations are still pending
CxPlatListInsertTail(*Worker->PriorityConnectionsTail, &Connection->WorkerLink);
Worker->PriorityConnectionsTail = &Connection->WorkerLink.Flink;
Connection->HasPriorityWork = TRUE;
} else {
CxPlatListInsertTail(&Worker->Connections, &Connection->WorkerLink);
}
QuicTraceEvent(
ConnScheduleState,
"[conn][%p] Scheduling: %u",
Expand Down Expand Up @@ -577,6 +631,9 @@ QuicWorkerLoopCleanup(
QUIC_CONNECTION* Connection =
CXPLAT_CONTAINING_RECORD(
CxPlatListRemoveHead(&Worker->Connections), QUIC_CONNECTION, WorkerLink);
if (Worker->PriorityConnectionsTail == &Connection->WorkerLink.Flink) {
Worker->PriorityConnectionsTail = &Worker->Connections.Flink;
}
if (!Connection->State.ExternalOwner) {
//
// If there is no external owner, shut down the connection so
Expand Down
12 changes: 12 additions & 0 deletions src/core/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ typedef struct QUIC_CACHEALIGN QUIC_WORKER {
// Queue of connections with operations to be processed.
//
CXPLAT_LIST_ENTRY Connections;
CXPLAT_LIST_ENTRY** PriorityConnectionsTail;

//
// Queue of stateless operations to be processed.
Expand Down Expand Up @@ -184,6 +185,17 @@ QuicWorkerQueueConnection(
_In_ QUIC_CONNECTION* Connection
);

//
// Queues a priority connection onto the worker, and kicks the worker thread if
// necessary.
//
_IRQL_requires_max_(DISPATCH_LEVEL)
void
QuicWorkerQueuePriorityConnection(
_In_ QUIC_WORKER* Worker,
_In_ QUIC_CONNECTION* Connection
);

//
// Queues the operation onto the worker, and kicks the worker thread if
// necessary.
Expand Down
46 changes: 32 additions & 14 deletions src/plugins/dbg/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,23 +110,41 @@ EXT_COMMAND(
//

Dml("\n<u>OPERATIONS</u>\n"
"\n");
"\n"
"\tWorker Processing %s\n"
"\tHas Queued Work %s\n"
"\tHas Priority Work %s\n",
Conn.WorkerProcessing() ? "TRUE" : "FALSE",
Conn.HasQueuedWork() ? "TRUE" : "FALSE",
Conn.HasPriorityWork() ? "TRUE" : "FALSE");

bool HasAtLeastOneOperation = false;
auto Operations = Conn.GetOperQueue().GetOperations();
while (!CheckControlC()) {
auto OperLinkAddr = Operations.Next();
if (OperLinkAddr == 0) {
break;
if (Operations.IsEmpty()) {
Dml("\t\tNo Operations Queued\n");
} else {
bool IsHighPriority = true;
bool IsFirstOperation = true;
UINT64 PriorityTail;
ReadPointerAtAddr(Conn.GetOperQueue().GetPriorityTail(), &PriorityTail);
while (!CheckControlC()) {
auto OperLinkAddr = Operations.Next();
if (OperLinkAddr == 0) {
break;
}

if (PriorityTail == OperLinkAddr) {
IsHighPriority = false;
Dml("\n\tNORMAL PRIORITY:\n\n");
}

if (IsFirstOperation && IsHighPriority) {
Dml("\n\tHIGH PRIORITY:\n\n");
}

auto Operation = Operation::FromLink(OperLinkAddr);
Dml("\t\t%s\n", Operation.TypeStr());
IsFirstOperation = false;
}

auto Operation = Operation::FromLink(OperLinkAddr);
Dml("\t%s\n", Operation.TypeStr());
HasAtLeastOneOperation = true;
}

if (!HasAtLeastOneOperation) {
Dml("\tNo Operations Queued\n");
}

//
Expand Down
16 changes: 16 additions & 0 deletions src/plugins/dbg/quictypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -1123,6 +1123,10 @@ struct OperQueue : Struct {
LinkedList GetOperations() {
return LinkedList(AddrOf("List"));
}

ULONG64 GetPriorityTail() {
return ReadPointer("PriorityTail");
}
};

struct StreamSet : Struct {
Expand Down Expand Up @@ -1216,6 +1220,18 @@ struct Connection : Struct {
}
}

BYTE WorkerProcessing() {
return ReadType<BYTE>("WorkerProcessing");
}

BYTE HasQueuedWork() {
return ReadType<BYTE>("HasQueuedWork");
}

BYTE HasPriorityWork() {
return ReadType<BYTE>("HasPriorityWork");
}

IpAddress GetLocalAddress() {
return IpAddress(AddrOf("LocalAddress")); // TODO - Broken
}
Expand Down
9 changes: 8 additions & 1 deletion src/test/MsQuicTests.h
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,10 @@ void
QuicTestOperationPriority(
);

void
QuicTestConnectionPriority(
);

void
QuicTestConnectionStreamStartSendPriority(
);
Expand Down Expand Up @@ -1305,4 +1309,7 @@ typedef struct {
#define IOCTL_QUIC_RUN_OPERATION_PRIORITY \
QUIC_CTL_CODE(122, METHOD_BUFFERED, FILE_WRITE_DATA)

#define QUIC_MAX_IOCTL_FUNC_CODE 122
#define IOCTL_QUIC_RUN_CONNECTION_PRIORITY \
QUIC_CTL_CODE(123, METHOD_BUFFERED, FILE_WRITE_DATA)

#define QUIC_MAX_IOCTL_FUNC_CODE 123
9 changes: 9 additions & 0 deletions src/test/bin/quic_gtest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2229,6 +2229,15 @@ TEST(Basic, OperationPriority) {
}
}

TEST(Basic, ConnectionPriority) {
TestLogger Logger("ConnectionPriority");
if (TestingKernelMode) {
ASSERT_TRUE(DriverClient.Run(IOCTL_QUIC_RUN_CONNECTION_PRIORITY));
} else {
QuicTestConnectionPriority();
}
}

TEST(Drill, VarIntEncoder) {
TestLogger Logger("QuicDrillTestVarIntEncoder");
if (TestingKernelMode) {
Expand Down
5 changes: 5 additions & 0 deletions src/test/bin/winkernel/control.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ size_t QUIC_IOCTL_BUFFER_SIZES[] =
sizeof(BOOLEAN),
0,
0,
0,
};

CXPLAT_STATIC_ASSERT(
Expand Down Expand Up @@ -1457,6 +1458,10 @@ QuicTestCtlEvtIoDeviceControl(
QuicTestCtlRun(QuicTestOperationPriority());
break;

case IOCTL_QUIC_RUN_CONNECTION_PRIORITY:
QuicTestCtlRun(QuicTestConnectionPriority());
break;

default:
Status = STATUS_NOT_IMPLEMENTED;
break;
Expand Down
Loading

0 comments on commit c689d51

Please sign in to comment.