Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: enable strict type checking #80

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/test/test-helpers.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import { AssertionError } from 'node:assert'

/**
* @param {Response} res
* @param {number} status
*/
export const assertResponseStatus = async (res, status) => {
if (res.status !== status) {
throw new AssertionError({
Expand Down
14 changes: 12 additions & 2 deletions backend/bin/deal-observer-backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { fetchDealWithHighestActivatedEpoch, countStoredActiveDeals, observeBuil
import { indexPieces } from '../lib/piece-indexer.js'
import { findAndSubmitUnsubmittedDeals, submitDealsToSparkApi } from '../lib/spark-api-submit-deals.js'
import { getDealPayloadCid } from '../lib/piece-indexer-service.js'
/** @import {Queryable} from '@filecoin-station/deal-observer-db' */

const {
INFLUXDB_TOKEN,
Expand All @@ -37,6 +38,10 @@ assert(finalityEpochs <= maxPastEpochs)
const pgPool = await createPgPool()
const { recordTelemetry } = createInflux(INFLUXDB_TOKEN)

/**
* @param {(method:string,params:any[]) => Promise<any>} makeRpcRequest
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RPC call can return multiple different types of responses which are usually validated using TypeBox right after they return.
The parameters also differ in each request which is why the any type was used.

* @param {Queryable} pgPool
*/
const observeActorEventsLoop = async (makeRpcRequest, pgPool) => {
const LOOP_NAME = 'Observe actor events'
while (true) {
Expand All @@ -46,7 +51,7 @@ const observeActorEventsLoop = async (makeRpcRequest, pgPool) => {
const lastInsertedDeal = await fetchDealWithHighestActivatedEpoch(pgPool)
const startEpoch = Math.max(
currentChainHead.Height - maxPastEpochs,
(lastInsertedDeal?.activated_at_epoch + 1) || 0
lastInsertedDeal ? (lastInsertedDeal.activated_at_epoch ?? -1) + 1 : 0
)
const endEpoch = currentChainHead.Height - finalityEpochs

Expand All @@ -57,7 +62,7 @@ const observeActorEventsLoop = async (makeRpcRequest, pgPool) => {
const numberOfStoredDeals = await countStoredActiveDeals(pgPool)
if (INFLUXDB_TOKEN) {
recordTelemetry('observed_deals_stats', point => {
point.intField('last_searched_epoch', newLastInsertedDeal.activated_at_epoch)
point.intField('last_searched_epoch', newLastInsertedDeal?.activated_at_epoch || 0)
point.intField('number_of_stored_active_deals', numberOfStoredDeals)
})
}
Expand Down Expand Up @@ -126,6 +131,11 @@ const sparkApiSubmitDealsLoop = async (pgPool, { sparkApiBaseUrl, sparkApiToken,
}
}

/**
* @param {(method:string,params:object) => object} makeRpcRequest
* @param {(providerId:string,pieceCid:string) => Promise<string|null>} getDealPayloadCid
* @param {*} pgPool
*/
export const pieceIndexerLoop = async (makeRpcRequest, getDealPayloadCid, pgPool) => {
const LOOP_NAME = 'Piece Indexer'
while (true) {
Expand Down
6 changes: 3 additions & 3 deletions backend/lib/deal-observer.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { convertBlockEventToActiveDealDbEntry } from './utils.js'
/**
* @param {number} blockHeight
* @param {Queryable} pgPool
* @param {(method:string,params:object) => object} makeRpcRequest
* @param {(method:string,params:any[]) => Promise<any>} makeRpcRequest
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the code we are validating the response from the RPC request with typebox, so there is no need to infer a type with the function makeRpcRequest.

* @returns {Promise<void>}
*/
export async function observeBuiltinActorEvents (blockHeight, pgPool, makeRpcRequest) {
Expand Down Expand Up @@ -94,11 +94,11 @@ export async function storeActiveDeals (activeDeals, pgPool) {
/**
* @param {Queryable} pgPool
* @param {string} query
* @param {Array} args
* @param {Array<number | string>} args
* @returns {Promise<Array<Static <typeof ActiveDealDbEntry>>>}
*/
export async function loadDeals (pgPool, query, args = []) {
const result = (await pgPool.query(query, args)).rows.map(deal => {
const result = (await pgPool.query(query, args)).rows.map((/** @type {any} */ deal) => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The deal is validated shortly after this call using typebox. This means that it is safe to use any type.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bajtos what do you think about introducing a new type, called ToBeParsed or similar, which equals any? This way we don't have any any, and can forbid it, but at the same time, signal that this type needs to be parsed, and will be parsed.

// SQL used null, typebox needs undefined for null values
Object.keys(deal).forEach(key => {
if (deal[key] === null) {
Expand Down
9 changes: 8 additions & 1 deletion backend/lib/rpc-service/data-types.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,17 @@ const RpcRespone = Type.Object({
result: Type.Any()
})

const ChainHead = Type.Object({
Height: Type.Number(),
Blocks: Type.Any(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not really interested in Blocks or CIDs, but only the Height of the Chainhead. We could follow this up with a PR that adds explicit types for Blocks and Cids, but I believe it is low priority at this point.
To keep this already big PR reviewable, I set this to Any.

Cids: Type.Any()
})

export {
ClaimEvent,
Entry,
RawActorEvent,
BlockEvent,
RpcRespone
RpcRespone,
ChainHead
}
16 changes: 11 additions & 5 deletions backend/lib/rpc-service/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { encode as cborEncode } from '@ipld/dag-cbor'
import { rawEventEntriesToEvent } from './utils.js'
import { Value } from '@sinclair/typebox/value'
import * as util from 'node:util'
import { ClaimEvent, RawActorEvent, BlockEvent, RpcRespone } from './data-types.js'
import { ClaimEvent, RawActorEvent, BlockEvent, RpcRespone, ChainHead } from './data-types.js'
import pRetry from 'p-retry'
/** @import { Static } from '@sinclair/typebox' */

Expand Down Expand Up @@ -40,8 +40,9 @@ export const rpcRequest = async (method, params) => {
}
}
/**
* @param {object} actorEventFilter
* @param {{fromHeight:number,toHeight:number,fields: any}} actorEventFilter
* Returns actor events filtered by the given actorEventFilter
* @param {(method: string, params: any[]) => Promise<any>} makeRpcRequest
* @returns {Promise<Array<Static<typeof BlockEvent>>>}
*/
export async function getActorEvents (actorEventFilter, makeRpcRequest) {
Expand All @@ -52,7 +53,7 @@ export async function getActorEvents (actorEventFilter, makeRpcRequest) {
}
// TODO: handle reverted events
// https://github.com/filecoin-station/deal-observer/issues/22
const typedRawEventEntries = rawEvents.map((rawEvent) => Value.Parse(RawActorEvent, rawEvent))
const typedRawEventEntries = rawEvents.map((/** @type {any} */ rawEvent) => Value.Parse(RawActorEvent, rawEvent))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rawEvents are parsed right after this point, so it is safe to use any.

// An emitted event contains the height at which it was emitted, the emitter and the event itself
const emittedEvents = []
for (const typedEventEntries of typedRawEventEntries) {
Expand Down Expand Up @@ -81,10 +82,15 @@ export async function getActorEvents (actorEventFilter, makeRpcRequest) {

/**
* @param {function} makeRpcRequest
* @returns {Promise<object>}
* @returns {Promise<Static<typeof ChainHead>>}
*/
export async function getChainHead (makeRpcRequest) {
return await makeRpcRequest('Filecoin.ChainHead', [])
const result = await makeRpcRequest('Filecoin.ChainHead', [])
try {
return Value.Parse(ChainHead, result)
} catch (e) {
throw Error(util.format('Failed to parse chain head: %o', result))
}
}

/**
Expand Down
5 changes: 5 additions & 0 deletions backend/lib/rpc-service/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import { base64pad } from 'multiformats/bases/base64'
import { decode as cborDecode } from '@ipld/dag-cbor'
import * as util from 'node:util'

/**
* @param {string} data
* @returns
*/
const decodeCborInBase64 = (data) => {
return cborDecode(base64pad.baseDecode(data))
}
Expand All @@ -14,6 +18,7 @@ const decodeCborInBase64 = (data) => {
*/
const rawEventEntriesToEvent = (rawEventEntries) => {
// Each event is defined by a list of event entries which will parsed into a typed event
/** @type {Record<string, any>} */
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not yet know the event type at this point, which is why I used any as a type declaration here.

const event = {}
let eventType
for (const entry of rawEventEntries) {
Expand Down
8 changes: 4 additions & 4 deletions backend/lib/spark-api-submit-deals.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import * as Sentry from '@sentry/node'
*
* @param {PgPool} pgPool
* @param {number} batchSize
* @param {(eligibleDeals: Array) => Promise<{ingested: number; skipped: number}>} submitDeals
* @param {(eligibleDeals: Array<any>) => Promise<{ingested: number; skipped: number}>} submitDeals
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest following up the explicit typing for eligible deals with another PR, as this would make the existing PR a lot bigger. WDYT?

* @returns {Promise<{submitted: number; ingested: number; skipped: number;}>} Number of deals submitted, ingested and skipped
*/
export const findAndSubmitUnsubmittedDeals = async (pgPool, batchSize, submitDeals) => {
Expand Down Expand Up @@ -45,7 +45,7 @@ export const findAndSubmitUnsubmittedDeals = async (pgPool, batchSize, submitDea
*
* @param {PgPool} pgPool
* @param {number} batchSize
* @returns {AsyncGenerator<Array>}
* @returns {AsyncGenerator<Array<any>>}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as here.

*/
const findUnsubmittedDeals = async function * (pgPool, batchSize) {
const client = await pgPool.connect()
Expand Down Expand Up @@ -82,7 +82,7 @@ const findUnsubmittedDeals = async function * (pgPool, batchSize) {
* Mark deals as submitted.
*
* @param {Queryable} pgPool
* @param {Array} eligibleDeals
* @param {Array<any>} eligibleDeals
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as here.

*/
const markDealsAsSubmitted = async (pgPool, eligibleDeals) => {
await pgPool.query(`
Expand Down Expand Up @@ -112,7 +112,7 @@ const markDealsAsSubmitted = async (pgPool, eligibleDeals) => {
*
* @param {string} sparkApiBaseURL
* @param {string} sparkApiToken
* @param {Array} deals
* @param {Array<any>} deals
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as here.

* @returns {Promise<{ingested: number; skipped: number}>}
*/
export const submitDealsToSparkApi = async (sparkApiBaseURL, sparkApiToken, deals) => {
Expand Down
4 changes: 4 additions & 0 deletions backend/lib/telemetry.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ import createDebug from 'debug'

const debug = createDebug('spark:deal-observer:telemetry')

/**
* @param {string | undefined} token
* @returns {{influx: InfluxDB,recordTelemetry: (name: string, fn: (p: Point) => void) => void}}
*/
export const createInflux = token => {
const influx = new InfluxDB({
url: 'https://eu-central-1-1.aws.cloud2.influxdata.com',
Expand Down
3 changes: 3 additions & 0 deletions backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
"test": "node --test --test-reporter=spec --test-concurrency=1"
},
"devDependencies": {
"@types/debug": "^4.1.12",
"@types/pg-cursor": "^2.7.2",
"@types/slug": "^5.0.9",
"standard": "^17.1.2"
},
"dependencies": {
Expand Down
14 changes: 11 additions & 3 deletions backend/test/deal-observer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@ import { after, before, beforeEach, describe, it } from 'node:test'
import { createPgPool, migrateWithPgClient } from '@filecoin-station/deal-observer-db'
import { fetchDealWithHighestActivatedEpoch, countStoredActiveDeals, loadDeals, storeActiveDeals } from '../lib/deal-observer.js'
import { Value } from '@sinclair/typebox/value'
import { BlockEvent } from '../lib/rpc-service/data-types.js'
import { BlockEvent, ClaimEvent } from '../lib/rpc-service/data-types.js'
import { convertBlockEventToActiveDealDbEntry } from '../lib/utils.js'
/** @import {PgPool} from '@filecoin-station/deal-observer-db' */
/** @import { Static } from '@sinclair/typebox' */

describe('deal-observer-backend', () => {
/**
* @type {PgPool}
*/
let pgPool
before(async () => {
pgPool = await createPgPool()
Expand Down Expand Up @@ -74,12 +79,15 @@ describe('deal-observer-backend', () => {
})

it('check number of stored deals', async () => {
/**
* @param {Static<typeof ClaimEvent>} eventData
*/
const storeBlockEvent = async (eventData) => {
const event = Value.Parse(BlockEvent, { height: 1, event: eventData, emitter: 'f06' })
const dbEntry = convertBlockEventToActiveDealDbEntry(event)
await storeActiveDeals([dbEntry], pgPool)
}
const data = {
const data = Value.Parse(ClaimEvent, {
id: 1,
provider: 2,
client: 3,
Expand All @@ -89,7 +97,7 @@ describe('deal-observer-backend', () => {
termMin: 12340,
termMax: 12340,
sector: 6n
}
})
assert.strictEqual(await countStoredActiveDeals(pgPool), 0n)
await storeBlockEvent(data)
assert.strictEqual(await countStoredActiveDeals(pgPool), 1n)
Expand Down
16 changes: 15 additions & 1 deletion backend/test/piece-indexer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,29 @@ import assert from 'assert'
import { minerPeerIds } from './test_data/minerInfo.js'
import { payloadCIDs } from './test_data/payloadCIDs.js'
import { indexPieces } from '../lib/piece-indexer.js'
/** @import {PgPool} from '@filecoin-station/deal-observer-db' */

describe('deal-observer-backend piece indexer', () => {
/**
* @param {string} method
* @param {any[]} params
* @returns
*/
const makeRpcRequest = async (method, params) => {
switch (method) {
case 'Filecoin.ChainHead':
return parse(JSON.stringify(chainHeadTestData))
case 'Filecoin.GetActorEventsRaw':
return parse(JSON.stringify(rawActorEventTestData)).filter(e => e.height >= params[0].fromHeight && e.height <= params[0].toHeight)
return parse(JSON.stringify(rawActorEventTestData)).filter((/** @type {{ height: number; }} */ e) => e.height >= params[0].fromHeight && e.height <= params[0].toHeight)
case 'Filecoin.StateMinerInfo':
return minerPeerIds.get(params[0])
default:
console.error('Unknown method')
}
}
/**
* @type {PgPool}
*/
let pgPool
before(async () => {
pgPool = await createPgPool()
Expand All @@ -46,6 +55,11 @@ describe('deal-observer-backend piece indexer', () => {

it('piece indexer loop function fetches deals where there exists no payload yet and updates the database entry', async (t) => {
const getDealPayloadCidCalls = []
/**
* @param {number} providerId
* @param {string} pieceCid
* @returns {Promise<string | undefined>}
*/
const getDealPayloadCid = async (providerId, pieceCid) => {
getDealPayloadCidCalls.push({ providerId, pieceCid })
const payloadCid = payloadCIDs.get(JSON.stringify({ minerId: providerId, pieceCid }))
Expand Down
10 changes: 8 additions & 2 deletions backend/test/rpc-client.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@ import { ClaimEvent } from '../lib/rpc-service/data-types.js'
import { Value } from '@sinclair/typebox/value'

describe('RpcApiClient', () => {
/**
* @param {string} method
* @param {any[]} params
* @returns
*/
const makeRpcRequest = async (method, params) => {
switch (method) {
case 'Filecoin.ChainHead':
return parse(JSON.stringify(chainHeadTestData))
case 'Filecoin.GetActorEventsRaw':
return parse(JSON.stringify(rawActorEventTestData)).filter(e => e.height >= params[0].fromHeight && e.height <= params[0].toHeight)
return parse(JSON.stringify(rawActorEventTestData)).filter((/** @type {{ height: number; }} */ e) => e.height >= params[0].fromHeight && e.height <= params[0].toHeight)
default:
console.error('Unknown method')
}
Expand All @@ -23,7 +28,8 @@ describe('RpcApiClient', () => {
const chainHead = await getChainHead(makeRpcRequest)
assert(chainHead)
const expected = parse(JSON.stringify(chainHeadTestData))
assert.deepStrictEqual(chainHead, expected)
assert(chainHead.Height)
assert.deepStrictEqual(expected.Height, chainHead.Height)
})

for (let blockHeight = 4622129; blockHeight < 4622129 + 11; blockHeight++) {
Expand Down
28 changes: 20 additions & 8 deletions backend/test/spark-api-submit-deals.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,13 @@ import { after, before, beforeEach, describe, it, mock } from 'node:test'
import { createPgPool, migrateWithPgClient } from '@filecoin-station/deal-observer-db'
import { calculateActiveDealEpochs, daysAgo, daysFromNow, today } from './test-helpers.js'
import { findAndSubmitUnsubmittedDeals } from '../lib/spark-api-submit-deals.js'
/** @import {PgPool} from '@filecoin-station/deal-observer-db' */
/** @import {Queryable} from '@filecoin-station/deal-observer-db' */

describe('Submit deals to spark-api', () => {
/**
* @type {PgPool}
*/
let pgPool

before(async () => {
Expand Down Expand Up @@ -91,6 +96,18 @@ describe('Submit deals to spark-api', () => {
})
})

/**
* @param {Queryable} pgPool
* @param {Object} activeDeal
* @param {string} activeDeal.createdAt
* @param {string} activeDeal.startsAt
* @param {string} activeDeal.expiresAt
* @param {number} [activeDeal.minerId=2]
* @param {number} [activeDeal.clientId=3]
* @param {string} [activeDeal.pieceCid='cidone']
* @param {string | null} [activeDeal.payloadCid=null]
* @returns {Promise<void>}
*/
const givenActiveDeal = async (pgPool, { createdAt, startsAt, expiresAt, minerId = 2, clientId = 3, pieceCid = 'cidone', payloadCid = null }) => {
const { activatedAtEpoch, termStart, termMin, termMax } = calculateActiveDealEpochs(createdAt, startsAt, expiresAt)
await pgPool.query(
Expand All @@ -103,12 +120,7 @@ const givenActiveDeal = async (pgPool, { createdAt, startsAt, expiresAt, minerId

// TODO: allow callers of this helper to define how many deals should be reported as skipped
const createSubmitEligibleDealsMock = () => {
return mock.fn(
// original - unused param
() => {},
// implementation
async (deals) => {
return { ingested: deals.length, skipped: 0 }
}
)
return mock.fn(async (deals) => {
return { ingested: deals.length, skipped: 0 }
})
}
4 changes: 2 additions & 2 deletions backend/test/test-helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ export const getLocalDayAsISOString = (d) => {

export const today = () => getLocalDayAsISOString(new Date())
export const yesterday = () => getLocalDayAsISOString(new Date(Date.now() - 24 * 60 * 60 * 1000))
export const daysAgo = (n) => getLocalDayAsISOString(new Date(Date.now() - n * 24 * 60 * 60 * 1000))
export const daysFromNow = (n) => getLocalDayAsISOString(new Date(Date.now() + n * 24 * 60 * 60 * 1000))
export const daysAgo = (/** @type {number} */ n) => getLocalDayAsISOString(new Date(Date.now() - n * 24 * 60 * 60 * 1000))
export const daysFromNow = (/** @type {number} */ n) => getLocalDayAsISOString(new Date(Date.now() + n * 24 * 60 * 60 * 1000))

/**
* Calculates activated at, term start, term min, and term max.
Expand Down
Loading