Skip to content

Commit

Permalink
Switched to AsyncKeyedLock
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkCiliaVincenti authored and aivit committed Apr 20, 2023
1 parent d34013f commit 14068aa
Show file tree
Hide file tree
Showing 11 changed files with 38 additions and 132 deletions.
5 changes: 2 additions & 3 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
<PropertyGroup>
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
</PropertyGroup>

<!-- Run "dotnet list package (dash,dash)outdated" to see the latest versions of each package.-->

<ItemGroup Label="Package Dependencies">
<PackageVersion Include="AsyncKeyedLock" Version="6.2.0" />
<PackageVersion Include="AutoFixture.AutoMoq" Version="4.18.0" />
<PackageVersion Include="AutoFixture.Xunit2" Version="4.18.0" />
<PackageVersion Include="AutoFixture" Version="4.18.0" />
Expand Down Expand Up @@ -87,4 +86,4 @@
<PackageVersion Include="Xunit.SkippableFact" Version="1.4.13" />
<PackageVersion Include="xunit" Version="2.4.2" />
</ItemGroup>
</Project>
</Project>
27 changes: 5 additions & 22 deletions Jellyfin.Api/Controllers/DynamicHlsController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using Nikse.SubtitleEdit.Core.CDG;

namespace Jellyfin.Api.Controllers;

Expand Down Expand Up @@ -303,9 +304,7 @@ public async Task<ActionResult> GetLiveHlsStream(

if (!System.IO.File.Exists(playlistPath))
{
var transcodingLock = _transcodingJobHelper.GetTranscodingLock(playlistPath);
await transcodingLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
using (await MediaBrowser.Common.Concurrency.AsyncKeyedLock.Locker.LockAsync(playlistPath, cancellationToken).ConfigureAwait(false))
{
if (!System.IO.File.Exists(playlistPath))
{
Expand Down Expand Up @@ -335,10 +334,6 @@ public async Task<ActionResult> GetLiveHlsStream(
}
}
}
finally
{
transcodingLock.Release();
}
}

job ??= _transcodingJobHelper.OnTranscodeBeginRequest(playlistPath, TranscodingJobType);
Expand Down Expand Up @@ -1458,18 +1453,13 @@ private async Task<ActionResult> GetDynamicSegment(StreamingRequestDto streaming
return await GetSegmentResult(state, playlistPath, segmentPath, segmentExtension, segmentId, job, cancellationToken).ConfigureAwait(false);
}

var transcodingLock = _transcodingJobHelper.GetTranscodingLock(playlistPath);
await transcodingLock.WaitAsync(cancellationToken).ConfigureAwait(false);
var released = false;
var startTranscoding = false;

try
using (await MediaBrowser.Common.Concurrency.AsyncKeyedLock.Locker.LockAsync(playlistPath, cancellationToken).ConfigureAwait(false))
{
var startTranscoding = false;

if (System.IO.File.Exists(segmentPath))
{
job = _transcodingJobHelper.OnTranscodeBeginRequest(playlistPath, TranscodingJobType);
transcodingLock.Release();
released = true;
_logger.LogDebug("returning {0} [it exists, try 2]", segmentPath);
return await GetSegmentResult(state, playlistPath, segmentPath, segmentExtension, segmentId, job, cancellationToken).ConfigureAwait(false);
}
Expand Down Expand Up @@ -1542,13 +1532,6 @@ await _transcodingJobHelper.KillTranscodingJobs(streamingRequest.DeviceId, strea
}
}
}
finally
{
if (!released)
{
transcodingLock.Release();
}
}

_logger.LogDebug("returning {0} [general case]", segmentPath);
job ??= _transcodingJobHelper.OnTranscodeBeginRequest(playlistPath, TranscodingJobType);
Expand Down
8 changes: 1 addition & 7 deletions Jellyfin.Api/Helpers/FileStreamResponseHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,7 @@ public static async Task<ActionResult> GetTranscodedFile(
return new OkResult();
}

var transcodingLock = transcodingJobHelper.GetTranscodingLock(outputPath);
await transcodingLock.WaitAsync(cancellationTokenSource.Token).ConfigureAwait(false);
try
using (await MediaBrowser.Common.Concurrency.AsyncKeyedLock.Locker.LockAsync(outputPath, cancellationTokenSource.Token).ConfigureAwait(false))
{
TranscodingJobDto? job;
if (!File.Exists(outputPath))
Expand All @@ -111,9 +109,5 @@ public static async Task<ActionResult> GetTranscodedFile(
var stream = new ProgressiveFileStream(outputPath, job, transcodingJobHelper);
return new FileStreamResult(stream, contentType);
}
finally
{
transcodingLock.Release();
}
}
}
34 changes: 0 additions & 34 deletions Jellyfin.Api/Helpers/TranscodingJobHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ public class TranscodingJobHelper : IDisposable
/// </summary>
private static readonly List<TranscodingJobDto> _activeTranscodingJobs = new List<TranscodingJobDto>();

/// <summary>
/// The transcoding locks.
/// </summary>
private static readonly Dictionary<string, SemaphoreSlim> _transcodingLocks = new Dictionary<string, SemaphoreSlim>();

private readonly IAttachmentExtractor _attachmentExtractor;
private readonly IApplicationPaths _appPaths;
private readonly EncodingHelper _encodingHelper;
Expand Down Expand Up @@ -284,11 +279,6 @@ private async Task KillTranscodingJob(TranscodingJobDto job, bool closeLiveStrea
}
}

lock (_transcodingLocks)
{
_transcodingLocks.Remove(job.Path!);
}

lock (job.ProcessLock!)
{
#pragma warning disable CA1849 // Can't await in lock block
Expand Down Expand Up @@ -759,11 +749,6 @@ public void OnTranscodeFailedToStart(string path, TranscodingJobType type, Strea
}
}

lock (_transcodingLocks)
{
_transcodingLocks.Remove(path);
}

if (!string.IsNullOrWhiteSpace(state.Request.DeviceId))
{
_sessionManager.ClearTranscodingInfo(state.Request.DeviceId);
Expand Down Expand Up @@ -855,25 +840,6 @@ private void OnTranscodeBeginRequest(TranscodingJobDto job)
}
}

/// <summary>
/// Gets the transcoding lock.
/// </summary>
/// <param name="outputPath">The output path of the transcoded file.</param>
/// <returns>A <see cref="SemaphoreSlim"/>.</returns>
public SemaphoreSlim GetTranscodingLock(string outputPath)
{
lock (_transcodingLocks)
{
if (!_transcodingLocks.TryGetValue(outputPath, out SemaphoreSlim? result))
{
result = new SemaphoreSlim(1, 1);
_transcodingLocks[outputPath] = result;
}

return result;
}
}

private void OnPlaybackProgress(object? sender, PlaybackProgressEventArgs e)
{
if (!string.IsNullOrWhiteSpace(e.PlaySessionId))
Expand Down
19 changes: 19 additions & 0 deletions MediaBrowser.Common/Concurrency/AsyncKeyedLock.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using AsyncKeyedLock;

namespace MediaBrowser.Common.Concurrency
{
/// <summary>
/// Class AsyncKeyedLock.
/// </summary>
public static class AsyncKeyedLock
{
/// <summary>
/// Gets the AsyncKeyedLock.
/// </summary>
public static AsyncKeyedLocker<string> Locker => new(o =>
{
o.PoolSize = 20;
o.PoolInitialFill = 1;
});
}
}
1 change: 1 addition & 0 deletions MediaBrowser.Common/MediaBrowser.Common.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="AsyncKeyedLock" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" />
<PackageReference Include="Microsoft.SourceLink.GitHub" PrivateAssets="All" />
Expand Down
33 changes: 3 additions & 30 deletions MediaBrowser.MediaEncoding/Attachments/AttachmentExtractor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ public class AttachmentExtractor : IAttachmentExtractor, IDisposable
private readonly IMediaEncoder _mediaEncoder;
private readonly IMediaSourceManager _mediaSourceManager;

private readonly ConcurrentDictionary<string, SemaphoreSlim> _semaphoreLocks =
new ConcurrentDictionary<string, SemaphoreSlim>();

private bool _disposed = false;

public AttachmentExtractor(
Expand Down Expand Up @@ -87,11 +84,7 @@ public async Task ExtractAllAttachments(
string outputPath,
CancellationToken cancellationToken)
{
var semaphore = _semaphoreLocks.GetOrAdd(outputPath, key => new SemaphoreSlim(1, 1));

await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);

try
using (await Common.Concurrency.AsyncKeyedLock.Locker.LockAsync(outputPath, cancellationToken).ConfigureAwait(false))
{
if (!Directory.Exists(outputPath))
{
Expand All @@ -102,10 +95,6 @@ await ExtractAllAttachmentsInternal(
cancellationToken).ConfigureAwait(false);
}
}
finally
{
semaphore.Release();
}
}

public async Task ExtractAllAttachmentsExternal(
Expand All @@ -114,11 +103,7 @@ public async Task ExtractAllAttachmentsExternal(
string outputPath,
CancellationToken cancellationToken)
{
var semaphore = _semaphoreLocks.GetOrAdd(outputPath, key => new SemaphoreSlim(1, 1));

await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);

try
using (await Common.Concurrency.AsyncKeyedLock.Locker.LockAsync(outputPath, cancellationToken).ConfigureAwait(false))
{
if (!File.Exists(Path.Join(outputPath, id)))
{
Expand All @@ -134,10 +119,6 @@ await ExtractAllAttachmentsInternal(
}
}
}
finally
{
semaphore.Release();
}
}

private async Task ExtractAllAttachmentsInternal(
Expand Down Expand Up @@ -265,11 +246,7 @@ private async Task ExtractAttachment(
string outputPath,
CancellationToken cancellationToken)
{
var semaphore = _semaphoreLocks.GetOrAdd(outputPath, key => new SemaphoreSlim(1, 1));

await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);

try
using (await Common.Concurrency.AsyncKeyedLock.Locker.LockAsync(outputPath, cancellationToken).ConfigureAwait(false))
{
if (!File.Exists(outputPath))
{
Expand All @@ -280,10 +257,6 @@ await ExtractAttachmentInternal(
cancellationToken).ConfigureAwait(false);
}
}
finally
{
semaphore.Release();
}
}

private async Task ExtractAttachmentInternal(
Expand Down
40 changes: 4 additions & 36 deletions MediaBrowser.MediaEncoding/Subtitles/SubtitleEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,6 @@ public sealed class SubtitleEncoder : ISubtitleEncoder
private readonly IHttpClientFactory _httpClientFactory;
private readonly IMediaSourceManager _mediaSourceManager;

/// <summary>
/// The _semaphoreLocks.
/// </summary>
private readonly ConcurrentDictionary<string, SemaphoreSlim> _semaphoreLocks =
new ConcurrentDictionary<string, SemaphoreSlim>();

public SubtitleEncoder(
ILogger<SubtitleEncoder> logger,
IApplicationPaths appPaths,
Expand Down Expand Up @@ -350,16 +344,6 @@ private ISubtitleWriter GetWriter(string format)
throw new ArgumentException("Unsupported format: " + format);
}

/// <summary>
/// Gets the lock.
/// </summary>
/// <param name="filename">The filename.</param>
/// <returns>System.Object.</returns>
private SemaphoreSlim GetLock(string filename)
{
return _semaphoreLocks.GetOrAdd(filename, _ => new SemaphoreSlim(1, 1));
}

/// <summary>
/// Converts the text subtitle to SRT.
/// </summary>
Expand All @@ -370,21 +354,13 @@ private SemaphoreSlim GetLock(string filename)
/// <returns>Task.</returns>
private async Task ConvertTextSubtitleToSrt(MediaStream subtitleStream, MediaSourceInfo mediaSource, string outputPath, CancellationToken cancellationToken)
{
var semaphore = GetLock(outputPath);

await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);

try
using (await Common.Concurrency.AsyncKeyedLock.Locker.LockAsync(outputPath, cancellationToken).ConfigureAwait(false))
{
if (!File.Exists(outputPath))
{
await ConvertTextSubtitleToSrtInternal(subtitleStream, mediaSource, outputPath, cancellationToken).ConfigureAwait(false);
}
}
finally
{
semaphore.Release();
}
}

/// <summary>
Expand Down Expand Up @@ -524,14 +500,10 @@ private async Task ExtractTextSubtitle(
string outputPath,
CancellationToken cancellationToken)
{
var semaphore = GetLock(outputPath);

await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);

var subtitleStreamIndex = EncodingHelper.FindIndex(mediaSource.MediaStreams, subtitleStream);

try
using (await Common.Concurrency.AsyncKeyedLock.Locker.LockAsync(outputPath, cancellationToken).ConfigureAwait(false))
{
var subtitleStreamIndex = EncodingHelper.FindIndex(mediaSource.MediaStreams, subtitleStream);

if (!File.Exists(outputPath))
{
var args = _mediaEncoder.GetInputArgument(mediaSource.Path, mediaSource);
Expand All @@ -549,10 +521,6 @@ await ExtractTextSubtitleInternal(
cancellationToken).ConfigureAwait(false);
}
}
finally
{
semaphore.Release();
}
}

private async Task ExtractTextSubtitleInternal(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Globalization;
using System.IO;
using System.Threading;
using MediaBrowser.MediaEncoding.Subtitles;
using Microsoft.Extensions.Logging.Abstractions;
using Xunit;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Globalization;
using System.IO;
using System.Threading;
using MediaBrowser.MediaEncoding.Subtitles;
using Microsoft.Extensions.Logging.Abstractions;
using Xunit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Globalization;
using System.IO;
using System.Text;
using System.Threading;
using MediaBrowser.MediaEncoding.Subtitles;
using MediaBrowser.Model.MediaInfo;
using Microsoft.Extensions.Logging.Abstractions;
Expand Down

0 comments on commit 14068aa

Please sign in to comment.