diff --git a/packages/core/src/monitor/deal-monitor-alert-tick.js b/packages/core/src/monitor/deal-monitor-alert-tick.js index 17fe591..225706f 100644 --- a/packages/core/src/monitor/deal-monitor-alert-tick.js +++ b/packages/core/src/monitor/deal-monitor-alert-tick.js @@ -41,8 +41,8 @@ export async function dealMonitorAlertTick (context) { const currentTime = Date.now() for (const offeredAggregate of offeredAggregates.ok) { const offerTime = (new Date(offeredAggregate.insertedAt)).getTime() - // Monitor if offer time + monitor threshold is bigger than current time - if (offerTime + context.aggregateMonitorThresholdMs > currentTime) { + // Monitor if current time is bigger than offer time + monitor threshold + if (currentTime > (offerTime + context.aggregateMonitorThresholdMs)) { offeredAggregatesToMonitor.push(offeredAggregate) } } diff --git a/packages/core/test/monitor/handle-deal-monitor-cront-tick.test.js b/packages/core/test/monitor/handle-deal-monitor-cront-tick.test.js index 1be8216..a714af2 100644 --- a/packages/core/test/monitor/handle-deal-monitor-cront-tick.test.js +++ b/packages/core/test/monitor/handle-deal-monitor-cront-tick.test.js @@ -87,10 +87,10 @@ test('handles deal monitor tick with aggregates in warn type', async t => { const tickRes = await dealMonitorAlertTick.dealMonitorAlertTick({ ...context, - // do not wait minPieceCriticalThresholdMs: threshold * 10, minPieceWarnThresholdMs: 0, - aggregateMonitorThresholdMs: threshold + // do not wait + aggregateMonitorThresholdMs: 0 }) t.assert(tickRes.ok) @@ -148,7 +148,7 @@ test('handles deal monitor tick with aggregates in critical type', async t => { // should do critical check first minPieceCriticalThresholdMs: 0, minPieceWarnThresholdMs: 0, - aggregateMonitorThresholdMs: threshold + aggregateMonitorThresholdMs: 0 }) t.assert(tickRes.ok) @@ -158,6 +158,61 @@ test('handles deal monitor tick with aggregates in critical type', async t => { t.assert(tickRes.ok?.alerts[0].aggregate.equals(aggregate.link)) }) +test('handles deal monitor tick ignoring aggregates within a threshold', async t => { + const context = await getContext(t.context) + const storefront = await Signer.generate() + const group = storefront.did() + const { pieces, aggregate } = await randomAggregate(10, 128) + const threshold = 1000 + const pieceInsertTime = Date.now() - threshold + const buffer = { + pieces: pieces.map((p) => ({ + piece: p.link, + insertedAt: new Date( + pieceInsertTime + ).toISOString(), + policy: 0, + })), + group, + } + const block = await CBOR.write(buffer) + // Store aggregate record into store + const offer = pieces.map((p) => p.link) + const piecesBlock = await CBOR.write(offer) + + // Store aggregate in aggregator + const aggregatePutRes = await context.aggregatorAggregateStore.put({ + aggregate: aggregate.link, + pieces: piecesBlock.cid, + buffer: block.cid, + group, + insertedAt: new Date().toISOString(), + minPieceInsertedAt: new Date().toISOString(), + }) + t.assert(aggregatePutRes.ok) + + // Propagate aggregate to dealer + const putRes = await context.dealerAggregateStore.put({ + aggregate: aggregate.link, + pieces: piecesBlock.cid, + status: 'offered', + insertedAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }) + t.assert(putRes.ok) + + const tickRes = await dealMonitorAlertTick.dealMonitorAlertTick({ + ...context, + minPieceCriticalThresholdMs: threshold * 10, + minPieceWarnThresholdMs: 0, + // wait some time + aggregateMonitorThresholdMs: threshold + }) + + t.assert(tickRes.ok) + t.is(tickRes.ok?.alerts.length, 0) +}) + /** * @param {import('../helpers/context.js').DbContext} context */