Skip to content

Commit

Permalink
Improved thread parameter safety.
Browse files Browse the repository at this point in the history
  • Loading branch information
NTDLS committed Oct 7, 2024
1 parent 04ce02c commit 9cc3e08
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 31 deletions.
20 changes: 10 additions & 10 deletions NTDLS.Katzebase.Engine/Interactions/Management/IndexManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -330,19 +330,19 @@ private Dictionary<uint, DocumentPointer> MatchSchemaDocumentsByIndexingConditio

var allThreadResultsForSingleCondition = new Dictionary<uint, DocumentPointer>();

var childPool = _core.ThreadPool.Indexing.CreateChildPool(_core.Settings.IndexingChildThreadPoolQueueDepth);
var childPool = _core.ThreadPool.Indexing.CreateChildPool<uint>(_core.Settings.IndexingChildThreadPoolQueueDepth);

foreach (var indexPartition in indexPartitions)
{
var ptThreadQueue = transaction.Instrumentation.CreateToken(PerformanceCounter.ThreadQueue);
childPool.Enqueue(() =>
childPool.Enqueue(indexPartition, (uint threadIndexPartition) =>
{
#region Thread.

try
{
Dictionary<uint, DocumentPointer> singleThreadResults = new();
string pageDiskPath = indexLookup.IndexSelection.PhysicalIndex.GetPartitionPagesFileName(physicalSchema, indexPartition);
string pageDiskPath = indexLookup.IndexSelection.PhysicalIndex.GetPartitionPagesFileName(physicalSchema, threadIndexPartition);
var physicalIndexPages = _core.IO.GetPBuf<PhysicalIndexPages>(transaction, pageDiskPath, LockOperation.Read);

List<PhysicalIndexLeaf> workingPhysicalIndexLeaves = [physicalIndexPages.Root];
Expand Down Expand Up @@ -1009,14 +1009,14 @@ private void RemoveDocumentsFromIndex(Transaction transaction, PhysicalSchema ph
}

var ptThreadQueue = transaction.Instrumentation.CreateToken(PerformanceCounter.ThreadQueue);
childPool.Enqueue(indexPartition, (uint indexPartition) =>
childPool.Enqueue(indexPartition, (uint threadIndexPartition) =>
{
#region Thread.

transaction.EnsureActive();

string pageDiskPath = physicalIndex.GetPartitionPagesFileName(
physicalSchema, indexPartition);
physicalSchema, threadIndexPartition);

var physicalIndexPages = _core.IO.GetPBuf<PhysicalIndexPages>(transaction, pageDiskPath, LockOperation.Write);

Expand Down Expand Up @@ -1107,7 +1107,7 @@ private void RebuildIndex(Transaction transaction, PhysicalSchema physicalSchema
(physicalSchema, indexPartition), physicalIndexPages);
}

var childPool = _core.ThreadPool.Indexing.CreateChildPool(_core.Settings.IndexingChildThreadPoolQueueDepth);
var childPool = _core.ThreadPool.Indexing.CreateChildPool<PhysicalDocumentPageCatalogItem>(_core.Settings.IndexingChildThreadPoolQueueDepth);

var syncObjects = new object[physicalIndex.Partitions];
for (uint indexPartition = 0; indexPartition < physicalIndex.Partitions; indexPartition++)
Expand All @@ -1123,7 +1123,7 @@ private void RebuildIndex(Transaction transaction, PhysicalSchema physicalSchema
}

var ptThreadQueue = transaction.Instrumentation.CreateToken(PerformanceCounter.ThreadQueue);
childPool.Enqueue(() =>
childPool.Enqueue(physicalDocumentPageCatalogItem, (PhysicalDocumentPageCatalogItem threadPhysicalDocumentPageCatalogItem) =>
{
#region Thread.

Expand All @@ -1132,9 +1132,9 @@ private void RebuildIndex(Transaction transaction, PhysicalSchema physicalSchema
transaction.EnsureActive();

var physicalDocumentPageMap = _core.Documents.AcquireDocumentPageMap(transaction,
physicalSchema, physicalDocumentPageCatalogItem.PageNumber, LockOperation.Read);
physicalSchema, threadPhysicalDocumentPageCatalogItem.PageNumber, LockOperation.Read);

var documentPointers = physicalDocumentPageMap.DocumentIDs.Select(o => new DocumentPointer(physicalDocumentPageCatalogItem.PageNumber, o));
var documentPointers = physicalDocumentPageMap.DocumentIDs.Select(o => new DocumentPointer(threadPhysicalDocumentPageCatalogItem.PageNumber, o));

foreach (var documentPointer in documentPointers)
{
Expand Down Expand Up @@ -1165,7 +1165,7 @@ private void RebuildIndex(Transaction transaction, PhysicalSchema physicalSchema
}
catch (Exception ex)
{
LogManager.Error($"Failed to insert document into index for process id {transaction.ProcessId}, page number: {physicalDocumentPageCatalogItem.PageNumber}.", ex);
LogManager.Error($"Failed to insert document into index for process id {transaction.ProcessId}, page number: {threadPhysicalDocumentPageCatalogItem.PageNumber}.", ex);
throw;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public static SchemaIntersectionRowCollection GatherIntersectedRows(EngineCore c
{
var resultingRowCollection = GatherPrimarySchemaRows(core, transaction, schemaMappings, query, gatherDocumentPointersForSchemaAliases);

var childPool = core.ThreadPool.Intersection.CreateChildPool(core.Settings.IntersectionChildThreadPoolQueueDepth);
var childPool = core.ThreadPool.Intersection.CreateChildPool<SchemaIntersectionRow>(core.Settings.IntersectionChildThreadPoolQueueDepth);

bool rowLimitExceeded = false;

Expand All @@ -161,14 +161,12 @@ public static SchemaIntersectionRowCollection GatherIntersectedRows(EngineCore c
transaction.EnsureActive();

var ptThreadQueue = transaction.Instrumentation.CreateToken(PerformanceCounter.ThreadQueue);
childPool.Enqueue(() =>
childPool.Enqueue(templateRow.Clone(), (SchemaIntersectionRow threadTemplateRowClone) =>
{
#region Thread.

transaction.EnsureActive();

var templateRowClone = templateRow.Clone();

IEnumerable<DocumentPointer>? documentPointers = null;

#region Index optimization.
Expand All @@ -181,7 +179,7 @@ public static SchemaIntersectionRowCollection GatherIntersectedRows(EngineCore c
var rightHandDocumentIdentifiers = schemaMap.Value.Optimization.Conditions.FlattenToRightDocumentIdentifiers();
foreach (var documentIdentifier in rightHandDocumentIdentifiers)
{
if (!templateRowClone.SchemaElements.TryGetValue(documentIdentifier.SchemaAlias, out var schemaElements))
if (!threadTemplateRowClone.SchemaElements.TryGetValue(documentIdentifier.SchemaAlias, out var schemaElements))
{
throw new KbEngineException($"Schema not found in query: [{documentIdentifier.SchemaAlias}].");
}
Expand Down Expand Up @@ -210,11 +208,11 @@ public static SchemaIntersectionRowCollection GatherIntersectedRows(EngineCore c
{
var physicalDocument = core.Documents.AcquireDocument(transaction, schemaMap.Value.PhysicalSchema, documentPointer, LockOperation.Read);

templateRowClone.SchemaElements[schemaMap.Value.Prefix.ToLowerInvariant()] = physicalDocument.Elements;
threadTemplateRowClone.SchemaElements[schemaMap.Value.Prefix.ToLowerInvariant()] = physicalDocument.Elements;

if (IsJoinExpressionMatch(transaction, query, schemaMap.Value.Conditions, templateRowClone))
if (IsJoinExpressionMatch(transaction, query, schemaMap.Value.Conditions, threadTemplateRowClone))
{
var newRow = templateRowClone.Clone();
var newRow = threadTemplateRowClone.Clone();
newRow.MatchedSchemas.Add(schemaMap.Key);

if (gatherDocumentPointersForSchemaAliases?.Contains(schemaMap.Value.Prefix, StringComparer.InvariantCultureIgnoreCase) == true)
Expand Down Expand Up @@ -273,23 +271,26 @@ public static SchemaIntersectionRowCollection GatherIntersectedRows(EngineCore c
//Now that we have finished joining all schemas, we can now apply the WHERE clause.
var primarySchema = schemaMappings.First();

var matchChildPool = core.ThreadPool.Intersection.CreateChildPool<SchemaIntersectionRow>(core.Settings.IntersectionChildThreadPoolQueueDepth);


foreach (var resultingRow in resultingRowCollection)
{
var ptThreadQueue = transaction.Instrumentation.CreateToken(PerformanceCounter.ThreadQueue);
childPool.Enqueue(() =>
matchChildPool.Enqueue(resultingRow, (SchemaIntersectionRow threadResultingRow) =>
{
#region Thread.

var schemaElements = resultingRow.SchemaElements.Flatten();
resultingRow.MatchedByWhereClause = IsWhereClauseMatch(transaction, query, primarySchema.Value.Conditions, schemaElements);
var schemaElements = threadResultingRow.SchemaElements.Flatten();
threadResultingRow.MatchedByWhereClause = IsWhereClauseMatch(transaction, query, primarySchema.Value.Conditions, schemaElements);

#endregion
});
ptThreadQueue?.StopAndAccumulate();
}

var ptThreadCompletion_Removal = transaction.Instrumentation.CreateToken(PerformanceCounter.ThreadCompletion);
childPool.WaitForCompletion();
matchChildPool.WaitForCompletion();
ptThreadCompletion_Removal?.StopAndAccumulate();

//Remove all rows that were not matched by the where clause.
Expand Down Expand Up @@ -421,7 +422,7 @@ private static SchemaIntersectionRowCollection GatherPrimarySchemaRows(EngineCor

var schemaIntersectionRowCollection = new SchemaIntersectionRowCollection();

var childPool = core.ThreadPool.Lookup.CreateChildPool(core.Settings.LookupChildThreadPoolQueueDepth);
var childPool = core.ThreadPool.Lookup.CreateChildPool<DocumentPointer>(core.Settings.LookupChildThreadPoolQueueDepth);

bool rowLimitExceeded = false;

Expand All @@ -430,13 +431,13 @@ private static SchemaIntersectionRowCollection GatherPrimarySchemaRows(EngineCor
transaction.EnsureActive();

var ptThreadQueue = transaction.Instrumentation.CreateToken(PerformanceCounter.ThreadQueue);
childPool.Enqueue(() =>
childPool.Enqueue(documentPointer, (DocumentPointer threadDocumentPointer) =>
{
#region Thread.

transaction.EnsureActive();

var physicalDocument = core.Documents.AcquireDocument(transaction, primarySchema.Value.PhysicalSchema, documentPointer, LockOperation.Read);
var physicalDocument = core.Documents.AcquireDocument(transaction, primarySchema.Value.PhysicalSchema, threadDocumentPointer, LockOperation.Read);

//Had to remove this match because the where clause can contain conditions comprised of values from joins.
//TODO: We use condition groups to determine if we can do early elimination of primary schema results.
Expand All @@ -450,7 +451,7 @@ private static SchemaIntersectionRowCollection GatherPrimarySchemaRows(EngineCor
if (gatherDocumentPointersForSchemaAliases?.Contains(primarySchema.Value.Prefix, StringComparer.InvariantCultureIgnoreCase) == true)
{
//Keep track of document pointers for this schema if we are to do so as denoted by gatherDocumentPointersForSchemaAliases.
schemaIntersectionRow.DocumentPointers.Add(primarySchema.Value.Prefix.ToLowerInvariant(), documentPointer);
schemaIntersectionRow.DocumentPointers.Add(primarySchema.Value.Prefix.ToLowerInvariant(), threadDocumentPointer);
}

//Found a document that matched the where clause, add row to the results collection.
Expand Down Expand Up @@ -498,7 +499,7 @@ private static MaterializedRowCollection MaterializeRowValues(EngineCore core, T
{
#region No Grouping.

var childPool = core.ThreadPool.Materialization.CreateChildPool(core.Settings.MaterializationChildThreadPoolQueueDepth);
var childPool = core.ThreadPool.Materialization.CreateChildPool<SchemaIntersectionRow>(core.Settings.MaterializationChildThreadPoolQueueDepth);

bool rowLimitExceeded = false;

Expand All @@ -507,20 +508,20 @@ private static MaterializedRowCollection MaterializeRowValues(EngineCore core, T
transaction.EnsureActive();

var ptThreadQueue = transaction.Instrumentation.CreateToken(PerformanceCounter.ThreadQueue);
childPool.Enqueue(() =>
childPool.Enqueue(row, (SchemaIntersectionRow threadRow) =>
{
#region Thread.

transaction.EnsureActive();

var materializedRow = new MaterializedRow();
var flattenedSchemaElements = row.SchemaElements.Flatten();
var flattenedSchemaElements = threadRow.SchemaElements.Flatten();

foreach (var field in query.SelectFields)
{
if (field.Expression is QueryFieldDocumentIdentifier fieldDocumentIdentifier)
{
if (!row.SchemaElements.TryGetValue(fieldDocumentIdentifier.SchemaAlias, out var schemaElements))
if (!threadRow.SchemaElements.TryGetValue(fieldDocumentIdentifier.SchemaAlias, out var schemaElements))
{
throw new KbEngineException($"Schema not found in query: [{fieldDocumentIdentifier.SchemaAlias}].");
}
Expand Down Expand Up @@ -556,7 +557,7 @@ private static MaterializedRowCollection MaterializeRowValues(EngineCore core, T
{
if (field.Expression is QueryFieldDocumentIdentifier fieldDocumentIdentifier)
{
if (!row.SchemaElements.TryGetValue(fieldDocumentIdentifier.SchemaAlias, out var schemaElements))
if (!threadRow.SchemaElements.TryGetValue(fieldDocumentIdentifier.SchemaAlias, out var schemaElements))
{
throw new KbEngineException($"Schema not found in query: [{fieldDocumentIdentifier.SchemaAlias}].");
}
Expand Down

0 comments on commit 9cc3e08

Please sign in to comment.