From c606615dd6f649680b359cad91919833219012c7 Mon Sep 17 00:00:00 2001 From: Nikolas Haimerl Date: Tue, 28 Jan 2025 15:55:04 +0100 Subject: [PATCH 01/12] extracted loop --- backend/bin/deal-observer-backend.js | 47 ++++++---------------------- backend/lib/telemetry.js | 1 - 2 files changed, 9 insertions(+), 39 deletions(-) diff --git a/backend/bin/deal-observer-backend.js b/backend/bin/deal-observer-backend.js index 84e84dd..0659f7e 100644 --- a/backend/bin/deal-observer-backend.js +++ b/backend/bin/deal-observer-backend.js @@ -1,12 +1,10 @@ import { createPgPool } from '@filecoin-station/deal-observer-db' import * as Sentry from '@sentry/node' -import timers from 'node:timers/promises' -import slug from 'slug' import '../lib/instrument.js' import { createInflux } from '../lib/telemetry.js' -import { getChainHead, rpcRequest } from '../lib/rpc-service/service.js' -import { fetchDealWithHighestActivatedEpoch, observeBuiltinActorEvents } from '../lib/deal-observer.js' +import { rpcRequest } from '../lib/rpc-service/service.js' import assert from 'node:assert' +import { dealObserverLoop } from '../lib/loops.js' const { INFLUXDB_TOKEN } = process.env if (!INFLUXDB_TOKEN) { @@ -19,42 +17,15 @@ const finalityEpochs = 940 const maxPastEpochs = 1999 assert(finalityEpochs <= maxPastEpochs) const pgPool = await createPgPool() - -const LOOP_NAME = 'Built-in actor events' const { recordTelemetry } = createInflux(INFLUXDB_TOKEN) -const dealObserverLoop = async (makeRpcRequest, pgPool) => { - while (true) { - const start = Date.now() - try { - const currentChainHead = await getChainHead(makeRpcRequest) - const currentFinalizedChainHead = currentChainHead.Height - finalityEpochs - // If the storage is empty we start 2000 blocks into the past as that is the furthest we can go with the public glif rpc endpoints. - const lastInsertedDeal = await fetchDealWithHighestActivatedEpoch(pgPool) - const lastEpochStored = lastInsertedDeal ? lastInsertedDeal.height : currentChainHead.Height - maxPastEpochs - for (let epoch = lastEpochStored + 1; epoch <= currentFinalizedChainHead; epoch++) { - await observeBuiltinActorEvents(epoch, pgPool, makeRpcRequest) - } - } catch (e) { - console.error(e) - Sentry.captureException(e) - } - const dt = Date.now() - start - console.log(`Loop "${LOOP_NAME}" took ${dt}ms`) - - if (INFLUXDB_TOKEN) { - recordTelemetry(`loop_${slug(LOOP_NAME, '_')}`, point => { - point.intField('interval_ms', LOOP_INTERVAL) - point.intField('duration_ms', dt) - }) - } - if (dt < LOOP_INTERVAL) { - await timers.setTimeout(LOOP_INTERVAL - dt) - } - } -} - await dealObserverLoop( rpcRequest, - pgPool + pgPool, + recordTelemetry, + Sentry, + maxPastEpochs, + finalityEpochs, + LOOP_INTERVAL, + INFLUXDB_TOKEN ) diff --git a/backend/lib/telemetry.js b/backend/lib/telemetry.js index db3dd08..3b7c95a 100644 --- a/backend/lib/telemetry.js +++ b/backend/lib/telemetry.js @@ -17,7 +17,6 @@ export const createInflux = token => { setInterval(() => { writeClient.flush().catch(console.error) }, 10_000).unref() - return { influx, From c7c07c9f6af4c63795f71bba426b4b09e8dc3e5b Mon Sep 17 00:00:00 2001 From: Nikolas Haimerl Date: Tue, 28 Jan 2025 16:20:33 +0100 Subject: [PATCH 02/12] add test for observer loop --- backend/test/deal-observer.test.js | 46 ++++++++++++++++++++++++++++++ backend/test/rpc-client.test.js | 13 +-------- 2 files changed, 47 insertions(+), 12 deletions(-) diff --git a/backend/test/deal-observer.test.js b/backend/test/deal-observer.test.js index d21fd29..9bc876e 100644 --- a/backend/test/deal-observer.test.js +++ b/backend/test/deal-observer.test.js @@ -5,6 +5,8 @@ import { fetchDealWithHighestActivatedEpoch, storeActiveDeals } from '../lib/dea import { ActiveDealDbEntry } from '@filecoin-station/deal-observer-db/lib/types.js' import { Value } from '@sinclair/typebox/value' import { BlockEvent } from '../lib/rpc-service/data-types.js' +import { dealObserverLoop } from '../lib/loops.js' +import { makeRpcRequest } from './utils.js' describe('deal-observer-backend', () => { let pgPool @@ -74,3 +76,47 @@ describe('deal-observer-backend', () => { assert.deepStrictEqual(expected, actual) }) }) + +describe('deal-observer-backend binary', () => { + let pgPool + before(async () => { + pgPool = await createPgPool() + await migrateWithPgClient(pgPool) + }) + beforeEach(async () => { + await pgPool.query('DELETE FROM active_deals') + }) + after(async () => { + await pgPool.query('DELETE FROM active_deals') + await pgPool.end() + }) + it('then deal observer loop fetches new active deals and stores them in storage', async () => { + const worker = async () => { + await dealObserverLoop( + makeRpcRequest, + pgPool, + undefined, + undefined, + // The testdata has a total amount of 11 blocks + 11, + 0, + 10_000, + undefined + ) + } + // Timeout to kill the worker after 20 seconds + setTimeout(() => { + console.log('Condition not met, killing the worker.') + process.exit(1) + }, 20000) + worker() + while (true) { + const rows = (await pgPool.query('SELECT * FROM active_deals')).rows + console.log(`Rows: ${rows.length}`) + // The test data contains a total of 360 deals + if (rows.length === 360) { + process.exit(0) + } + } + }) +}) diff --git a/backend/test/rpc-client.test.js b/backend/test/rpc-client.test.js index 9ab57e6..aa5f660 100644 --- a/backend/test/rpc-client.test.js +++ b/backend/test/rpc-client.test.js @@ -1,24 +1,13 @@ import assert from 'node:assert' import { describe, it } from 'node:test' import { chainHeadTestData } from './test_data/chainHead.js' -import { rawActorEventTestData } from './test_data/rawActorEvent.js' import { parse } from '@ipld/dag-json' import { getActorEvents, getActorEventsFilter, getChainHead, rpcRequest } from '../lib/rpc-service/service.js' import { ClaimEvent } from '../lib/rpc-service/data-types.js' import { Value } from '@sinclair/typebox/value' +import { makeRpcRequest } from './utils.js' describe('RpcApiClient', () => { - const makeRpcRequest = async (method, params) => { - switch (method) { - case 'Filecoin.ChainHead': - return parse(JSON.stringify(chainHeadTestData)) - case 'Filecoin.GetActorEventsRaw': - return parse(JSON.stringify(rawActorEventTestData)).filter(e => e.height >= params[0].fromHeight && e.height <= params[0].toHeight) - default: - console.error('Unknown method') - } - } - it('retrieves the chainHead', async () => { const chainHead = await getChainHead(makeRpcRequest) assert(chainHead) From d3306ef03901aa90ffc7288689c28e49ed2f5890 Mon Sep 17 00:00:00 2001 From: Nikolas Haimerl Date: Tue, 28 Jan 2025 16:21:26 +0100 Subject: [PATCH 03/12] add new files --- backend/lib/loops.js | 50 +++++++++++++++++++++++++++++++++++++++++++ backend/test/utils.js | 14 ++++++++++++ 2 files changed, 64 insertions(+) create mode 100644 backend/lib/loops.js create mode 100644 backend/test/utils.js diff --git a/backend/lib/loops.js b/backend/lib/loops.js new file mode 100644 index 0000000..dbadd0c --- /dev/null +++ b/backend/lib/loops.js @@ -0,0 +1,50 @@ +import { fetchDealWithHighestActivatedEpoch, observeBuiltinActorEvents } from './deal-observer.js' +import { getChainHead } from './rpc-service/service.js' +import timers from 'node:timers/promises' +import slug from 'slug' +/** @import {Queryable} from '@filecoin-station/deal-observer-db' */ +/** @import {Point} from '@influxdata/influxdb-client' */ + +/** + * @param {(method:string,params:object) => object} makeRpcRequest + * @param {Queryable} pgPool + * @param {(name: string, fn: (p: Point) => void) => void} recordTelemetry + * @param {import("@sentry/node")} Sentry + * @param {number} maxPastEpochs + * @param {number} finalityEpochs + * @param {number} loopInterval + * @param {string | undefined} influxToken + * @returns {Promise} + * */ +export const dealObserverLoop = async (makeRpcRequest, pgPool, recordTelemetry, Sentry, maxPastEpochs, finalityEpochs, loopInterval, influxToken) => { + const LOOP_NAME = 'Built-in actor events' + + while (true) { + const start = Date.now() + try { + const currentChainHead = await getChainHead(makeRpcRequest) + const currentFinalizedChainHead = currentChainHead.Height - finalityEpochs + // If the storage is empty we start 2000 blocks into the past as that is the furthest we can go with the public glif rpc endpoints. + const lastInsertedDeal = await fetchDealWithHighestActivatedEpoch(pgPool) + const lastEpochStored = lastInsertedDeal ? lastInsertedDeal.activated_at_epoch : currentChainHead.Height - maxPastEpochs + for (let epoch = lastEpochStored + 1; epoch <= currentFinalizedChainHead; epoch++) { + await observeBuiltinActorEvents(epoch, pgPool, makeRpcRequest) + } + } catch (e) { + console.error(e) + Sentry.captureException(e) + } + const dt = Date.now() - start + console.log(`Loop "${LOOP_NAME}" took ${dt}ms`) + + if (influxToken) { + recordTelemetry(`loop_${slug(LOOP_NAME, '_')}`, point => { + point.intField('interval_ms', loopInterval) + point.intField('duration_ms', dt) + }) + } + if (dt < loopInterval) { + await timers.setTimeout(loopInterval - dt) + } + } +} diff --git a/backend/test/utils.js b/backend/test/utils.js new file mode 100644 index 0000000..f877949 --- /dev/null +++ b/backend/test/utils.js @@ -0,0 +1,14 @@ +import { parse } from '@ipld/dag-json' +import { chainHeadTestData } from './test_data/chainHead.js' +import { rawActorEventTestData } from './test_data/rawActorEvent.js' + +export const makeRpcRequest = async (method, params) => { + switch (method) { + case 'Filecoin.ChainHead': + return parse(JSON.stringify(chainHeadTestData)) + case 'Filecoin.GetActorEventsRaw': + return parse(JSON.stringify(rawActorEventTestData)).filter(e => e.height >= params[0].fromHeight && e.height <= params[0].toHeight) + default: + console.error('Unknown method') + } +} From c2b998224e2c7b66c0245d04aa3bcc6e92662cf8 Mon Sep 17 00:00:00 2001 From: Nikolas Haimerl Date: Tue, 28 Jan 2025 16:22:43 +0100 Subject: [PATCH 04/12] fmt --- backend/lib/telemetry.js | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/lib/telemetry.js b/backend/lib/telemetry.js index 3b7c95a..db3dd08 100644 --- a/backend/lib/telemetry.js +++ b/backend/lib/telemetry.js @@ -17,6 +17,7 @@ export const createInflux = token => { setInterval(() => { writeClient.flush().catch(console.error) }, 10_000).unref() + return { influx, From 19a5fcc50048cd527c8f9c39e8082fb6f164b17e Mon Sep 17 00:00:00 2001 From: Nikolas Haimerl Date: Wed, 29 Jan 2025 07:28:40 +0100 Subject: [PATCH 05/12] add abort signal --- backend/bin/deal-observer-backend.js | 5 ++- backend/lib/loops.js | 4 +-- backend/test/deal-observer.test.js | 53 ++++++++++++---------------- 3 files changed, 29 insertions(+), 33 deletions(-) diff --git a/backend/bin/deal-observer-backend.js b/backend/bin/deal-observer-backend.js index 0659f7e..8bef3f8 100644 --- a/backend/bin/deal-observer-backend.js +++ b/backend/bin/deal-observer-backend.js @@ -18,6 +18,8 @@ const maxPastEpochs = 1999 assert(finalityEpochs <= maxPastEpochs) const pgPool = await createPgPool() const { recordTelemetry } = createInflux(INFLUXDB_TOKEN) +const controller = new AbortController() +const { signal } = controller await dealObserverLoop( rpcRequest, @@ -27,5 +29,6 @@ await dealObserverLoop( maxPastEpochs, finalityEpochs, LOOP_INTERVAL, - INFLUXDB_TOKEN + INFLUXDB_TOKEN, + signal ) diff --git a/backend/lib/loops.js b/backend/lib/loops.js index dbadd0c..d8507b2 100644 --- a/backend/lib/loops.js +++ b/backend/lib/loops.js @@ -16,10 +16,10 @@ import slug from 'slug' * @param {string | undefined} influxToken * @returns {Promise} * */ -export const dealObserverLoop = async (makeRpcRequest, pgPool, recordTelemetry, Sentry, maxPastEpochs, finalityEpochs, loopInterval, influxToken) => { +export const dealObserverLoop = async (makeRpcRequest, pgPool, recordTelemetry, Sentry, maxPastEpochs, finalityEpochs, loopInterval, influxToken, signal) => { const LOOP_NAME = 'Built-in actor events' - while (true) { + while (!signal?.aborted) { const start = Date.now() try { const currentChainHead = await getChainHead(makeRpcRequest) diff --git a/backend/test/deal-observer.test.js b/backend/test/deal-observer.test.js index 9bc876e..efc61e2 100644 --- a/backend/test/deal-observer.test.js +++ b/backend/test/deal-observer.test.js @@ -1,5 +1,5 @@ import assert from 'node:assert' -import { after, before, beforeEach, describe, it } from 'node:test' +import { after, before, beforeEach, it, describe } from 'node:test' import { createPgPool, migrateWithPgClient } from '@filecoin-station/deal-observer-db' import { fetchDealWithHighestActivatedEpoch, storeActiveDeals } from '../lib/deal-observer.js' import { ActiveDealDbEntry } from '@filecoin-station/deal-observer-db/lib/types.js' @@ -88,35 +88,28 @@ describe('deal-observer-backend binary', () => { }) after(async () => { await pgPool.query('DELETE FROM active_deals') - await pgPool.end() }) - it('then deal observer loop fetches new active deals and stores them in storage', async () => { - const worker = async () => { - await dealObserverLoop( - makeRpcRequest, - pgPool, - undefined, - undefined, - // The testdata has a total amount of 11 blocks - 11, - 0, - 10_000, - undefined - ) - } - // Timeout to kill the worker after 20 seconds - setTimeout(() => { - console.log('Condition not met, killing the worker.') - process.exit(1) - }, 20000) - worker() - while (true) { - const rows = (await pgPool.query('SELECT * FROM active_deals')).rows - console.log(`Rows: ${rows.length}`) - // The test data contains a total of 360 deals - if (rows.length === 360) { - process.exit(0) - } - } + it('then deal observer loop fetches new active deals and stores them in storage', async (t) => { + const controller = new AbortController() + const { signal } = controller + dealObserverLoop( + makeRpcRequest, + pgPool, + undefined, + undefined, + // The testdata has a total amount of 11 blocks + 11, + 0, + 1, + undefined, + signal + ) + // Timeout to kill the worker after 1 second + let rows + setTimeout(() => { if (!signal.aborted) { throw new Error(`Test timed out. Rows inserted ${rows.length}`) } }, 1000) + do { + rows = (await pgPool.query('SELECT * FROM active_deals')).rows + } while (rows.length !== 360) + controller.abort() }) }) From 3ef6e2342ea7645fc7c65fcf615b6e59d9562f4f Mon Sep 17 00:00:00 2001 From: Nikolas Haimerl Date: Wed, 29 Jan 2025 07:29:53 +0100 Subject: [PATCH 06/12] renamed loop name --- backend/lib/deal-observer-loop.js | 50 +++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 backend/lib/deal-observer-loop.js diff --git a/backend/lib/deal-observer-loop.js b/backend/lib/deal-observer-loop.js new file mode 100644 index 0000000..d8507b2 --- /dev/null +++ b/backend/lib/deal-observer-loop.js @@ -0,0 +1,50 @@ +import { fetchDealWithHighestActivatedEpoch, observeBuiltinActorEvents } from './deal-observer.js' +import { getChainHead } from './rpc-service/service.js' +import timers from 'node:timers/promises' +import slug from 'slug' +/** @import {Queryable} from '@filecoin-station/deal-observer-db' */ +/** @import {Point} from '@influxdata/influxdb-client' */ + +/** + * @param {(method:string,params:object) => object} makeRpcRequest + * @param {Queryable} pgPool + * @param {(name: string, fn: (p: Point) => void) => void} recordTelemetry + * @param {import("@sentry/node")} Sentry + * @param {number} maxPastEpochs + * @param {number} finalityEpochs + * @param {number} loopInterval + * @param {string | undefined} influxToken + * @returns {Promise} + * */ +export const dealObserverLoop = async (makeRpcRequest, pgPool, recordTelemetry, Sentry, maxPastEpochs, finalityEpochs, loopInterval, influxToken, signal) => { + const LOOP_NAME = 'Built-in actor events' + + while (!signal?.aborted) { + const start = Date.now() + try { + const currentChainHead = await getChainHead(makeRpcRequest) + const currentFinalizedChainHead = currentChainHead.Height - finalityEpochs + // If the storage is empty we start 2000 blocks into the past as that is the furthest we can go with the public glif rpc endpoints. + const lastInsertedDeal = await fetchDealWithHighestActivatedEpoch(pgPool) + const lastEpochStored = lastInsertedDeal ? lastInsertedDeal.activated_at_epoch : currentChainHead.Height - maxPastEpochs + for (let epoch = lastEpochStored + 1; epoch <= currentFinalizedChainHead; epoch++) { + await observeBuiltinActorEvents(epoch, pgPool, makeRpcRequest) + } + } catch (e) { + console.error(e) + Sentry.captureException(e) + } + const dt = Date.now() - start + console.log(`Loop "${LOOP_NAME}" took ${dt}ms`) + + if (influxToken) { + recordTelemetry(`loop_${slug(LOOP_NAME, '_')}`, point => { + point.intField('interval_ms', loopInterval) + point.intField('duration_ms', dt) + }) + } + if (dt < loopInterval) { + await timers.setTimeout(loopInterval - dt) + } + } +} From 955a44baad3b5ad5b0bb81b3ee305661fffd7fc9 Mon Sep 17 00:00:00 2001 From: Nikolas Haimerl Date: Wed, 29 Jan 2025 07:30:39 +0100 Subject: [PATCH 07/12] renamed loop name --- backend/bin/deal-observer-backend.js | 2 +- backend/lib/loops.js | 50 ---------------------------- backend/test/deal-observer.test.js | 2 +- 3 files changed, 2 insertions(+), 52 deletions(-) delete mode 100644 backend/lib/loops.js diff --git a/backend/bin/deal-observer-backend.js b/backend/bin/deal-observer-backend.js index 8bef3f8..35af41b 100644 --- a/backend/bin/deal-observer-backend.js +++ b/backend/bin/deal-observer-backend.js @@ -4,7 +4,7 @@ import '../lib/instrument.js' import { createInflux } from '../lib/telemetry.js' import { rpcRequest } from '../lib/rpc-service/service.js' import assert from 'node:assert' -import { dealObserverLoop } from '../lib/loops.js' +import { dealObserverLoop } from '../lib/deal-observer-loop.js' const { INFLUXDB_TOKEN } = process.env if (!INFLUXDB_TOKEN) { diff --git a/backend/lib/loops.js b/backend/lib/loops.js deleted file mode 100644 index d8507b2..0000000 --- a/backend/lib/loops.js +++ /dev/null @@ -1,50 +0,0 @@ -import { fetchDealWithHighestActivatedEpoch, observeBuiltinActorEvents } from './deal-observer.js' -import { getChainHead } from './rpc-service/service.js' -import timers from 'node:timers/promises' -import slug from 'slug' -/** @import {Queryable} from '@filecoin-station/deal-observer-db' */ -/** @import {Point} from '@influxdata/influxdb-client' */ - -/** - * @param {(method:string,params:object) => object} makeRpcRequest - * @param {Queryable} pgPool - * @param {(name: string, fn: (p: Point) => void) => void} recordTelemetry - * @param {import("@sentry/node")} Sentry - * @param {number} maxPastEpochs - * @param {number} finalityEpochs - * @param {number} loopInterval - * @param {string | undefined} influxToken - * @returns {Promise} - * */ -export const dealObserverLoop = async (makeRpcRequest, pgPool, recordTelemetry, Sentry, maxPastEpochs, finalityEpochs, loopInterval, influxToken, signal) => { - const LOOP_NAME = 'Built-in actor events' - - while (!signal?.aborted) { - const start = Date.now() - try { - const currentChainHead = await getChainHead(makeRpcRequest) - const currentFinalizedChainHead = currentChainHead.Height - finalityEpochs - // If the storage is empty we start 2000 blocks into the past as that is the furthest we can go with the public glif rpc endpoints. - const lastInsertedDeal = await fetchDealWithHighestActivatedEpoch(pgPool) - const lastEpochStored = lastInsertedDeal ? lastInsertedDeal.activated_at_epoch : currentChainHead.Height - maxPastEpochs - for (let epoch = lastEpochStored + 1; epoch <= currentFinalizedChainHead; epoch++) { - await observeBuiltinActorEvents(epoch, pgPool, makeRpcRequest) - } - } catch (e) { - console.error(e) - Sentry.captureException(e) - } - const dt = Date.now() - start - console.log(`Loop "${LOOP_NAME}" took ${dt}ms`) - - if (influxToken) { - recordTelemetry(`loop_${slug(LOOP_NAME, '_')}`, point => { - point.intField('interval_ms', loopInterval) - point.intField('duration_ms', dt) - }) - } - if (dt < loopInterval) { - await timers.setTimeout(loopInterval - dt) - } - } -} diff --git a/backend/test/deal-observer.test.js b/backend/test/deal-observer.test.js index efc61e2..7d5671a 100644 --- a/backend/test/deal-observer.test.js +++ b/backend/test/deal-observer.test.js @@ -5,7 +5,7 @@ import { fetchDealWithHighestActivatedEpoch, storeActiveDeals } from '../lib/dea import { ActiveDealDbEntry } from '@filecoin-station/deal-observer-db/lib/types.js' import { Value } from '@sinclair/typebox/value' import { BlockEvent } from '../lib/rpc-service/data-types.js' -import { dealObserverLoop } from '../lib/loops.js' +import { dealObserverLoop } from '../lib/deal-observer-loop.js' import { makeRpcRequest } from './utils.js' describe('deal-observer-backend', () => { From 7fe1deea644bed3480c2c6a1a8e8e90583efb8d5 Mon Sep 17 00:00:00 2001 From: Nikolas Haimerl Date: Wed, 29 Jan 2025 14:23:26 +0100 Subject: [PATCH 08/12] promise wait all --- backend/test/deal-observer.test.js | 39 ------------------------------ 1 file changed, 39 deletions(-) diff --git a/backend/test/deal-observer.test.js b/backend/test/deal-observer.test.js index 7d5671a..8a610e6 100644 --- a/backend/test/deal-observer.test.js +++ b/backend/test/deal-observer.test.js @@ -5,8 +5,6 @@ import { fetchDealWithHighestActivatedEpoch, storeActiveDeals } from '../lib/dea import { ActiveDealDbEntry } from '@filecoin-station/deal-observer-db/lib/types.js' import { Value } from '@sinclair/typebox/value' import { BlockEvent } from '../lib/rpc-service/data-types.js' -import { dealObserverLoop } from '../lib/deal-observer-loop.js' -import { makeRpcRequest } from './utils.js' describe('deal-observer-backend', () => { let pgPool @@ -76,40 +74,3 @@ describe('deal-observer-backend', () => { assert.deepStrictEqual(expected, actual) }) }) - -describe('deal-observer-backend binary', () => { - let pgPool - before(async () => { - pgPool = await createPgPool() - await migrateWithPgClient(pgPool) - }) - beforeEach(async () => { - await pgPool.query('DELETE FROM active_deals') - }) - after(async () => { - await pgPool.query('DELETE FROM active_deals') - }) - it('then deal observer loop fetches new active deals and stores them in storage', async (t) => { - const controller = new AbortController() - const { signal } = controller - dealObserverLoop( - makeRpcRequest, - pgPool, - undefined, - undefined, - // The testdata has a total amount of 11 blocks - 11, - 0, - 1, - undefined, - signal - ) - // Timeout to kill the worker after 1 second - let rows - setTimeout(() => { if (!signal.aborted) { throw new Error(`Test timed out. Rows inserted ${rows.length}`) } }, 1000) - do { - rows = (await pgPool.query('SELECT * FROM active_deals')).rows - } while (rows.length !== 360) - controller.abort() - }) -}) From 776b8e7c4a96a75d4221a7133a87db0ee4b2b2a6 Mon Sep 17 00:00:00 2001 From: Nikolas Haimerl Date: Wed, 29 Jan 2025 14:26:11 +0100 Subject: [PATCH 09/12] promise wait all --- backend/test/deal-observer-loop.test.js | 47 +++++++++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 backend/test/deal-observer-loop.test.js diff --git a/backend/test/deal-observer-loop.test.js b/backend/test/deal-observer-loop.test.js new file mode 100644 index 0000000..4e8bc18 --- /dev/null +++ b/backend/test/deal-observer-loop.test.js @@ -0,0 +1,47 @@ +import { createPgPool, migrateWithPgClient } from '@filecoin-station/deal-observer-db' +import { dealObserverLoop } from '../lib/deal-observer-loop.js' +import { makeRpcRequest } from './utils.js' +import { before, beforeEach, it, describe, after } from 'node:test' + +describe('dealObserverLoop', () => { + let pgPool + before(async () => { + pgPool = await createPgPool() + await migrateWithPgClient(pgPool) + }) + + after(async () => { + await pgPool.end() + }) + + beforeEach(async () => { + await pgPool.query('DELETE FROM active_deals') + }) + + it('then deal observer loop fetches new active deals and stores them in storage', async (t) => { + const controller = new AbortController() + const { signal } = controller + let rows + const failOnTimeout = async () => { await setTimeout(() => { if (!signal.aborted) { throw new Error(`Test timed out. Rows inserted ${rows.length}`) } }, 1000) } + const waitForDealCount = async (targetCount) => { + while (true) { + const { rows } = await pgPool.query('SELECT COUNT(*) FROM active_deals') + if (parseInt(rows[0].count) === 360) break + } + controller.abort() + } + await Promise.all([failOnTimeout(), waitForDealCount(360), + dealObserverLoop( + makeRpcRequest, + pgPool, + undefined, + undefined, + // The testdata has a total amount of 11 blocks + 11, + 0, + 1, + undefined, + signal + )]) + }) +}) From 0eacf826dbdfbd7d22d45d2422184cd06b7cedd2 Mon Sep 17 00:00:00 2001 From: Nikolas Haimerl Date: Wed, 29 Jan 2025 14:28:57 +0100 Subject: [PATCH 10/12] remove utils --- backend/test/deal-observer-loop.test.js | 14 +++++++++++++- backend/test/rpc-client.test.js | 13 ++++++++++++- backend/test/utils.js | 14 -------------- 3 files changed, 25 insertions(+), 16 deletions(-) delete mode 100644 backend/test/utils.js diff --git a/backend/test/deal-observer-loop.test.js b/backend/test/deal-observer-loop.test.js index 4e8bc18..5f623b7 100644 --- a/backend/test/deal-observer-loop.test.js +++ b/backend/test/deal-observer-loop.test.js @@ -1,10 +1,22 @@ import { createPgPool, migrateWithPgClient } from '@filecoin-station/deal-observer-db' import { dealObserverLoop } from '../lib/deal-observer-loop.js' -import { makeRpcRequest } from './utils.js' import { before, beforeEach, it, describe, after } from 'node:test' +import { rawActorEventTestData } from './test_data/rawActorEvent.js' +import { chainHeadTestData } from './test_data/chainHead.js' +import { parse } from '@ipld/dag-json' describe('dealObserverLoop', () => { let pgPool + const makeRpcRequest = async (method, params) => { + switch (method) { + case 'Filecoin.ChainHead': + return parse(JSON.stringify(chainHeadTestData)) + case 'Filecoin.GetActorEventsRaw': + return parse(JSON.stringify(rawActorEventTestData)).filter(e => e.height >= params[0].fromHeight && e.height <= params[0].toHeight) + default: + console.error('Unknown method') + } + } before(async () => { pgPool = await createPgPool() await migrateWithPgClient(pgPool) diff --git a/backend/test/rpc-client.test.js b/backend/test/rpc-client.test.js index aa5f660..b5223bc 100644 --- a/backend/test/rpc-client.test.js +++ b/backend/test/rpc-client.test.js @@ -5,9 +5,20 @@ import { parse } from '@ipld/dag-json' import { getActorEvents, getActorEventsFilter, getChainHead, rpcRequest } from '../lib/rpc-service/service.js' import { ClaimEvent } from '../lib/rpc-service/data-types.js' import { Value } from '@sinclair/typebox/value' -import { makeRpcRequest } from './utils.js' +import { rawActorEventTestData } from './test_data/rawActorEvent.js' describe('RpcApiClient', () => { + const makeRpcRequest = async (method, params) => { + switch (method) { + case 'Filecoin.ChainHead': + return parse(JSON.stringify(chainHeadTestData)) + case 'Filecoin.GetActorEventsRaw': + return parse(JSON.stringify(rawActorEventTestData)).filter(e => e.height >= params[0].fromHeight && e.height <= params[0].toHeight) + default: + console.error('Unknown method') + } + } + it('retrieves the chainHead', async () => { const chainHead = await getChainHead(makeRpcRequest) assert(chainHead) diff --git a/backend/test/utils.js b/backend/test/utils.js deleted file mode 100644 index f877949..0000000 --- a/backend/test/utils.js +++ /dev/null @@ -1,14 +0,0 @@ -import { parse } from '@ipld/dag-json' -import { chainHeadTestData } from './test_data/chainHead.js' -import { rawActorEventTestData } from './test_data/rawActorEvent.js' - -export const makeRpcRequest = async (method, params) => { - switch (method) { - case 'Filecoin.ChainHead': - return parse(JSON.stringify(chainHeadTestData)) - case 'Filecoin.GetActorEventsRaw': - return parse(JSON.stringify(rawActorEventTestData)).filter(e => e.height >= params[0].fromHeight && e.height <= params[0].toHeight) - default: - console.error('Unknown method') - } -} From 2b6d9598f1634e171fbdecf9b4cfc95211e3e550 Mon Sep 17 00:00:00 2001 From: Nikolas Haimerl Date: Wed, 29 Jan 2025 14:29:55 +0100 Subject: [PATCH 11/12] throw error on unexpected method --- backend/test/deal-observer-loop.test.js | 2 +- backend/test/rpc-client.test.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/test/deal-observer-loop.test.js b/backend/test/deal-observer-loop.test.js index 5f623b7..53b0ebf 100644 --- a/backend/test/deal-observer-loop.test.js +++ b/backend/test/deal-observer-loop.test.js @@ -14,7 +14,7 @@ describe('dealObserverLoop', () => { case 'Filecoin.GetActorEventsRaw': return parse(JSON.stringify(rawActorEventTestData)).filter(e => e.height >= params[0].fromHeight && e.height <= params[0].toHeight) default: - console.error('Unknown method') + throw new Error(`Unsupported RPC API method: "${method}"`) } } before(async () => { diff --git a/backend/test/rpc-client.test.js b/backend/test/rpc-client.test.js index b5223bc..34b1c1d 100644 --- a/backend/test/rpc-client.test.js +++ b/backend/test/rpc-client.test.js @@ -15,7 +15,7 @@ describe('RpcApiClient', () => { case 'Filecoin.GetActorEventsRaw': return parse(JSON.stringify(rawActorEventTestData)).filter(e => e.height >= params[0].fromHeight && e.height <= params[0].toHeight) default: - console.error('Unknown method') + throw new Error(`Unsupported RPC API method: "${method}"`) } } From 97e17394216468f284da74caa8df2bb1e3b8b1d1 Mon Sep 17 00:00:00 2001 From: Nikolas Haimerl Date: Wed, 29 Jan 2025 14:53:23 +0100 Subject: [PATCH 12/12] formatting --- backend/test/deal-observer-loop.test.js | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/backend/test/deal-observer-loop.test.js b/backend/test/deal-observer-loop.test.js index 53b0ebf..842c44e 100644 --- a/backend/test/deal-observer-loop.test.js +++ b/backend/test/deal-observer-loop.test.js @@ -33,15 +33,15 @@ describe('dealObserverLoop', () => { it('then deal observer loop fetches new active deals and stores them in storage', async (t) => { const controller = new AbortController() const { signal } = controller - let rows - const failOnTimeout = async () => { await setTimeout(() => { if (!signal.aborted) { throw new Error(`Test timed out. Rows inserted ${rows.length}`) } }, 1000) } const waitForDealCount = async (targetCount) => { while (true) { - const { rows } = await pgPool.query('SELECT COUNT(*) FROM active_deals') - if (parseInt(rows[0].count) === 360) break + const { rows } = (await pgPool.query('SELECT COUNT(*) FROM active_deals')) + console.log('Current deal count:', rows[0].count) + if (parseInt(rows[0].count) === targetCount) break } controller.abort() } + const failOnTimeout = async () => { await setTimeout(() => { if (!signal.aborted) { throw new Error('Test timed out') } }, 2000) } await Promise.all([failOnTimeout(), waitForDealCount(360), dealObserverLoop( makeRpcRequest, @@ -51,7 +51,7 @@ describe('dealObserverLoop', () => { // The testdata has a total amount of 11 blocks 11, 0, - 1, + 100, undefined, signal )])