Skip to content

Commit

Permalink
Multiple things
Browse files Browse the repository at this point in the history
Fixed up the full and partial workshop jobs
Added level request processing job
Updated zeeplevel parsing
  • Loading branch information
Thundernerd committed Sep 2, 2024
1 parent f681014 commit 0262585
Show file tree
Hide file tree
Showing 18 changed files with 629 additions and 250 deletions.
3 changes: 3 additions & 0 deletions Jobs/JobScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Hangfire.States;
using TNRD.Zeepkist.GTR.Backend.Levels.Jobs;
using TNRD.Zeepkist.GTR.Backend.Levels.Points.Jobs;
using TNRD.Zeepkist.GTR.Backend.Levels.Requests.Jobs;
using TNRD.Zeepkist.GTR.Backend.PersonalBests.Jobs;
using TNRD.Zeepkist.GTR.Backend.Users.Points.Jobs;
using TNRD.Zeepkist.GTR.Backend.WorldRecords.Jobs;
Expand Down Expand Up @@ -41,11 +42,13 @@ public void ScheduleRecurringJobs()
#if DEBUG
ScheduleRecurringJob<FixWorldRecordsJob>(Cron.Never());
ScheduleRecurringJob<FixPersonalBestsJob>(Cron.Never());
ScheduleRecurringJob<ProcessLevelRequestsJob>(Cron.Never());
ScheduleRecurringJob<FullWorkshopScanJob>(Cron.Never());
ScheduleRecurringJob<PartialWorkshopScanJob>(Cron.Never());
ScheduleRecurringJob<CalculateLevelPointsJob>(Cron.Never());
ScheduleRecurringJob<CalculateUserPointsJob>(Cron.Never());
#else
ScheduleRecurringJob<ProcessLevelRequestsJob>(Cron.MinuteInterval(5));
ScheduleRecurringJob<FullWorkshopScanJob>(Cron.Never());
ScheduleRecurringJob<PartialWorkshopScanJob>(Cron.Never());
ScheduleRecurringJob<CalculateLevelPointsJob>(Cron.Never());
Expand Down
19 changes: 19 additions & 0 deletions Levels/Items/LevelItemsService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace TNRD.Zeepkist.GTR.Backend.Levels.Items;
public interface ILevelItemsService
{
IEnumerable<LevelItem> GetAll();
IEnumerable<LevelItem> GetForPublishedFileDetails(PublishedFileDetails publishedFileDetails);
bool ExistsForLevel(int levelId);
bool Exists(PublishedFileDetails publishedFileDetails, ZeepLevel zeepLevel, string hash);

Expand All @@ -19,6 +20,8 @@ void Create(
WorkshopLevel workshopLevel,
ZeepLevel zeepLevel,
Level level);

void MarkDeleted(LevelItem levelItem);
}

public class LevelItemsService : ILevelItemsService
Expand All @@ -45,6 +48,16 @@ public IEnumerable<LevelItem> GetAll()
return _repository.GetAll();
}

public IEnumerable<LevelItem> GetForPublishedFileDetails(PublishedFileDetails publishedFileDetails)
{
if (ulong.TryParse(publishedFileDetails.PublishedFileId, out ulong publishedFileId))
{
return _repository.GetAll(item => item.Deleted == false && item.WorkshopId == publishedFileId);
}

return Enumerable.Empty<LevelItem>();
}

public bool ExistsForLevel(int levelId)
{
return _repository.Exists(x => x.IdLevel == levelId);
Expand Down Expand Up @@ -128,4 +141,10 @@ public void Create(

_repository.Insert(item);
}

public void MarkDeleted(LevelItem levelItem)
{
levelItem.Deleted = true;
_repository.Update(levelItem);
}
}
7 changes: 7 additions & 0 deletions Levels/Jobs/DownloadResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
using FluentResults;
using TNRD.Zeepkist.GTR.Backend.Steam.Resources;
using TNRD.Zeepkist.GTR.Backend.Workshop;

namespace TNRD.Zeepkist.GTR.Backend.Levels.Jobs;

public record DownloadResult(IEnumerable<PublishedFileDetails> PublishedFileDetails, Result<WorkshopDownloads> Result);
82 changes: 8 additions & 74 deletions Levels/Jobs/FullWorkshopScanJob.cs
Original file line number Diff line number Diff line change
@@ -1,43 +1,20 @@
using JetBrains.Annotations;
using Microsoft.Extensions.Options;
using TNRD.Zeepkist.GTR.Backend.Hashing;
using TNRD.Zeepkist.GTR.Backend.Levels.Items;
using TNRD.Zeepkist.GTR.Backend.Levels.Metadata;
using TNRD.Zeepkist.GTR.Backend.Steam;
using TNRD.Zeepkist.GTR.Backend.Steam.Resources;
using TNRD.Zeepkist.GTR.Backend.Workshop;
using TNRD.Zeepkist.GTR.Backend.Zeeplevel;

namespace TNRD.Zeepkist.GTR.Backend.Levels.Jobs;

public class FullWorkshopScanJob : WorkshopScanJob
{
private static bool IsRunning = false;

private readonly IPublishedFileServiceApi _publishedFileServiceApi;
private readonly SteamOptions _steamOptions;
public static bool IsRunning { get; private set; }

public FullWorkshopScanJob(
ILogger<FullWorkshopScanJob> logger,
IHashService hashService,
ILevelService levelService,
ILevelMetadataService levelMetadataService,
ILevelItemsService levelItemsService,
IWorkshopService workshopService,
IZeeplevelService zeeplevelService,
IPublishedFileServiceApi publishedFileServiceApi,
IOptions<SteamOptions> steamOptions)
: base(
logger,
hashService,
levelService,
levelMetadataService,
levelItemsService,
workshopService,
zeeplevelService)
WorkshopLister workshopLister,
WorkshopDownloader workshopDownloader,
WorkshopProcessor workshopProcessor,
IWorkshopService workshopService)
: base(logger, workshopLister, workshopDownloader, workshopProcessor, workshopService)
{
_publishedFileServiceApi = publishedFileServiceApi;
_steamOptions = steamOptions.Value;
}

[UsedImplicitly]
Expand All @@ -51,51 +28,8 @@ public async Task ExecuteAsync()

IsRunning = true;
Logger.LogInformation("Starting full workshop scan");

string cursor = "*";
int attempts = 0;

while (true)
{
Logger.LogInformation("Scanning page {Cursor}", cursor);
QueryFilesResult result
= await _publishedFileServiceApi.QueryFiles(_steamOptions.ApiKey, cursor);

string currentCursor = cursor;
cursor = result.Response.NextCursor;

if (cursor == currentCursor && result.Response.PublishedFileDetails == null)
{
Logger.LogInformation("Reached end of listing {Cursor}", cursor);
break; // No more pages
}

try
{
await ProcessPage(result);
attempts = 0;
}
catch (Exception e)
{
Logger.LogError(e, "Failed to process page {Cursor}", cursor);
if (attempts >= 3)
{
Logger.LogError("Failed to process page {Cursor} after 3 attempts", cursor);
break;
}

Logger.LogInformation("Changing cursor back from {Current} to {Previous}", cursor, currentCursor);
cursor = currentCursor;
int delay = 5 * ++attempts;
Logger.LogInformation("Waiting {Delay} minutes", delay);
await Task.Delay(TimeSpan.FromMinutes(delay));
continue;
}

if (cursor == currentCursor)
break;
}

await Run(WorkshopLister.QueryType.Normal);
Logger.LogInformation("Finished full workshop scan");
IsRunning = false;
}
}
69 changes: 18 additions & 51 deletions Levels/Jobs/PartialWorkshopScanJob.cs
Original file line number Diff line number Diff line change
@@ -1,77 +1,44 @@
using JetBrains.Annotations;
using Microsoft.Extensions.Options;
using TNRD.Zeepkist.GTR.Backend.Hashing;
using TNRD.Zeepkist.GTR.Backend.Levels.Items;
using TNRD.Zeepkist.GTR.Backend.Levels.Metadata;
using TNRD.Zeepkist.GTR.Backend.Steam;
using TNRD.Zeepkist.GTR.Backend.Steam.Resources;
using TNRD.Zeepkist.GTR.Backend.Workshop;
using TNRD.Zeepkist.GTR.Backend.Zeeplevel;

namespace TNRD.Zeepkist.GTR.Backend.Levels.Jobs;

public class PartialWorkshopScanJob : WorkshopScanJob
{
private readonly IPublishedFileServiceApi _publishedFileServiceApi;
private readonly SteamOptions _steamOptions;

public PartialWorkshopScanJob(
ILogger<PartialWorkshopScanJob> logger,
IHashService hashService,
ILevelService levelService,
ILevelMetadataService levelMetadataService,
ILevelItemsService levelItemsService,
IWorkshopService workshopService,
IZeeplevelService zeeplevelService,
IPublishedFileServiceApi publishedFileServiceApi,
IOptions<SteamOptions> steamOptions)
: base(
logger,
hashService,
levelService,
levelMetadataService,
levelItemsService,
workshopService,
zeeplevelService)
WorkshopLister workshopLister,
WorkshopDownloader workshopDownloader,
WorkshopProcessor workshopProcessor,
IWorkshopService workshopService)
: base(logger, workshopLister, workshopDownloader, workshopProcessor, workshopService)
{
_publishedFileServiceApi = publishedFileServiceApi;
_steamOptions = steamOptions.Value;
}

[UsedImplicitly]
public async Task ExecuteAsync()
{
if (FullWorkshopScanJob.IsRunning)
{
Logger.LogInformation("Skipping partial workshop scan because full workshop scan is running");
return;
}

await ScanByPublicationDate();
await ScanByUpdatedDate();
}

private async Task ScanByPublicationDate()
{
string cursor = "*";

for (int i = 0; i < 5; i++)
{
QueryFilesResult result
= await _publishedFileServiceApi.QueryFilesByPublicationDate(_steamOptions.ApiKey, cursor);

cursor = result.Response.NextCursor;

await ProcessPage(result);
}
Logger.LogInformation("Starting partial workshop scan by publication date");
await Run(WorkshopLister.QueryType.ByPublicationDate, 5);
Logger.LogInformation("Finished partial workshop scan by publication date");
}

private async Task ScanByUpdatedDate()
{
string cursor = "*";

for (int i = 0; i < 5; i++)
{
QueryFilesResult result
= await _publishedFileServiceApi.QueryFilesByUpdatedDate(_steamOptions.ApiKey, cursor);

cursor = result.Response.NextCursor;

await ProcessPage(result);
}
Logger.LogInformation("Starting partial workshop scan by updated date");
await Run(WorkshopLister.QueryType.ByUpdatedDate, 5);
Logger.LogInformation("Finished partial workshop scan by updated date");
}
}
65 changes: 65 additions & 0 deletions Levels/Jobs/WorkshopDownloader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
using FluentResults;
using TNRD.Zeepkist.GTR.Backend.Steam.Resources;
using TNRD.Zeepkist.GTR.Backend.Workshop;

namespace TNRD.Zeepkist.GTR.Backend.Levels.Jobs;

public class WorkshopDownloader
{
private const int ItemsPerChunk = 10;
private const int MaxConcurrency = 25;

private readonly ILogger<WorkshopDownloader> _logger;
private readonly IWorkshopService _workshopService;

public WorkshopDownloader(ILogger<WorkshopDownloader> logger, IWorkshopService workshopService)
{
_logger = logger;
_workshopService = workshopService;
}

public async Task<DownloadResult[]> Download(IEnumerable<PublishedFileDetails> publishedFileDetails)
{
List<PublishedFileDetails[]> chunks = publishedFileDetails.Chunk(ItemsPerChunk).ToList();
SemaphoreSlim semaphore = new(MaxConcurrency, MaxConcurrency);
List<Task<DownloadResult>> tasks = new();

for (int i = 0; i < chunks.Count; i++)
{
PublishedFileDetails[] chunk = chunks[i];
int index = i;
tasks.Add(DownloadWorkshopItems(chunk, semaphore, index));
}

_logger.LogInformation("Waiting for all ({Amount}) tasks to complete (this may take a while)", chunks.Count);
DownloadResult[] results = await Task.WhenAll(tasks);
_logger.LogInformation("All tasks completed");
return results;
}

private async Task<DownloadResult> DownloadWorkshopItems(
IEnumerable<PublishedFileDetails> publishedFileDetails,
SemaphoreSlim semaphore,
int index)
{
await semaphore.WaitAsync();
try
{
_logger.LogInformation("Starting workshop download {Index}", index);
Result<WorkshopDownloads> result =
await _workshopService.DownloadWorkshopItems(
publishedFileDetails.Select(x => ulong.Parse(x.PublishedFileId)));
_logger.LogInformation("Finished workshop download {Index}", index);
return new DownloadResult(publishedFileDetails, result);
}
catch (Exception e)
{
_logger.LogError(e, "Failed to download workshop items");
return new DownloadResult(publishedFileDetails, Result.Fail(new ExceptionalError(e)));
}
finally
{
semaphore.Release();
}
}
}
Loading

0 comments on commit 0262585

Please sign in to comment.