Skip to content

Commit

Permalink
use ConcurrentDictionary instead of lock primitives
Browse files Browse the repository at this point in the history
  • Loading branch information
scottfavre authored and aloneguid committed Nov 14, 2023
1 parent 288fa54 commit 3eb923a
Showing 1 changed file with 7 additions and 24 deletions.
31 changes: 7 additions & 24 deletions src/Parquet/Serialization/ParquetSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
Expand All @@ -16,11 +17,8 @@ namespace Parquet.Serialization {
/// Comes as a rewrite of ParquetConvert/ClrBridge/MSILGenerator and supports nested types as well.
/// </summary>
public static class ParquetSerializer {

private static readonly object _striperLock = new();
private static readonly object _assemblerLock = new();
private static readonly Dictionary<Type, object> _typeToStriper = new();
private static readonly Dictionary<Type, object> _typeToAssembler = new();
private static readonly ConcurrentDictionary<Type, object> _typeToStriper = new();
private static readonly ConcurrentDictionary<Type, object> _typeToAssembler = new();

private static async Task SerializeRowGroupAsync<T>(ParquetWriter writer, Striper<T> striper,
IEnumerable<T> objectInstances,
Expand Down Expand Up @@ -54,16 +52,8 @@ public static async Task<ParquetSchema> SerializeAsync<T>(IEnumerable<T> objectI
ParquetSerializerOptions? options = null,
CancellationToken cancellationToken = default) {

Striper<T> striper;

lock(_striperLock) {
if(_typeToStriper.TryGetValue(typeof(T), out object? boxedStriper)) {
striper = (Striper<T>)boxedStriper;
} else {
striper = new Striper<T>(typeof(T).GetParquetSchema(false));
_typeToStriper[typeof(T)] = striper;
}
}
object boxedStriper = _typeToStriper.GetOrAdd(typeof(T), _ => new Striper<T>(typeof(T).GetParquetSchema(false)));
var striper = (Striper<T>)boxedStriper;

bool append = options != null && options.Append;
using(ParquetWriter writer = await ParquetWriter.CreateAsync(striper.Schema, destination,
Expand Down Expand Up @@ -161,16 +151,9 @@ public static async Task<IList<T>> DeserializeAsync<T>(Stream source,

private static Assembler<T> GetAssembler<T>() where T : new() {

Assembler<T> asm;
object boxedAssemblyer = _typeToAssembler.GetOrAdd(typeof(T), _ => new Assembler<T>(typeof(T).GetParquetSchema(true)));

lock(_assemblerLock) {
if(_typeToAssembler.TryGetValue(typeof(T), out object? boxedAssembler)) {
asm = (Assembler<T>)boxedAssembler;
} else {
asm = new Assembler<T>(typeof(T).GetParquetSchema(true));
_typeToAssembler[typeof(T)] = asm;
}
}
var asm = (Assembler<T>)boxedAssemblyer;

return asm;
}
Expand Down

0 comments on commit 3eb923a

Please sign in to comment.