From b472b7ad20680a43ca8b6c4db2347b8a85541819 Mon Sep 17 00:00:00 2001 From: belopash Date: Mon, 27 Jan 2025 14:33:26 +0300 Subject: [PATCH] a bit more refactoring --- evm/evm-processor/src/ds-archive/portal.ts | 2 +- .../substrate-processor/src/ds-portal.ts | 8 +- util/portal-client/src/client.test.ts | 9 +- util/portal-client/src/client.ts | 244 ++++++++++-------- 4 files changed, 148 insertions(+), 115 deletions(-) diff --git a/evm/evm-processor/src/ds-archive/portal.ts b/evm/evm-processor/src/ds-archive/portal.ts index 447e9f1a..6be6df06 100644 --- a/evm/evm-processor/src/ds-archive/portal.ts +++ b/evm/evm-processor/src/ds-archive/portal.ts @@ -99,7 +99,7 @@ export class EvmPortal implements DataSource { yield { blocks, - isHead: lastBlock >= finalizedHead.height, + isHead: lastBlock >= (finalizedHead?.height ?? -1), } } diff --git a/substrate/substrate-processor/src/ds-portal.ts b/substrate/substrate-processor/src/ds-portal.ts index f6be8cc2..e0282100 100644 --- a/substrate/substrate-processor/src/ds-portal.ts +++ b/substrate/substrate-processor/src/ds-portal.ts @@ -35,7 +35,7 @@ function getFields(fields: FieldSelection | undefined): FieldSelection { address: true, }), extrinsic: mergeFields(DEFAULT_FIELDS.extrinsic, fields?.extrinsic, { - index: true + index: true, }), } } @@ -98,7 +98,9 @@ export class SubstratePortal implements DataSource { let endBlock = req.range.to || Infinity let query = makeQuery(req) - for await (let {finalizedHead, blocks: batch} of this.client.getFinalizedStream(query, {stopOnHead})) { + for await (let {finalizedHead, blocks: batch} of this.client.getFinalizedStream(query, { + stopOnHead, + })) { assert(batch.length > 0, 'boundary blocks are expected to be included') lastBlock = last(batch).header.number @@ -119,7 +121,7 @@ export class SubstratePortal implements DataSource { yield { blocks, - isHead: lastBlock >= finalizedHead.height, + isHead: lastBlock >= (finalizedHead?.height ?? -1), } } diff --git a/util/portal-client/src/client.test.ts b/util/portal-client/src/client.test.ts index e38a7cd4..508dea30 100644 --- a/util/portal-client/src/client.test.ts +++ b/util/portal-client/src/client.test.ts @@ -7,7 +7,7 @@ async function main() { http: new HttpClient({ retryAttempts: Infinity, }), - minBytes: 50 * 1024 * 1024 + minBytes: 50 * 1024 * 1024, }) let from = await portal.getFinalizedHeight().then((h) => h - 1_000_000) @@ -43,15 +43,14 @@ async function main() { { address: ['0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48'], topic0: ['0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'], - } - ] + }, + ], } - for await (let {blocks, finalizedHead} of portal.getFinalizedStream(query, {stopOnHead: true})) { console.log( `[${new Date().toISOString()}] progress: ${blocks[blocks.length - 1].header.number} / ${ - finalizedHead.height + finalizedHead?.height ?? -1 }` + `, blocks: ${blocks.length}` ) } diff --git a/util/portal-client/src/client.ts b/util/portal-client/src/client.ts index 7f26b6c5..9a32a6af 100644 --- a/util/portal-client/src/client.ts +++ b/util/portal-client/src/client.ts @@ -54,7 +54,7 @@ export interface HashAndHeight { } export interface PortalStreamData { - finalizedHead: HashAndHeight + finalizedHead?: HashAndHeight blocks: B[] } @@ -143,23 +143,23 @@ export class PortalClient { request, stopOnHead, }, - async (query, options) => { + async (q, o) => { // NOTE: we emulate the same behaviour as will be implemented for hot blocks stream, // but unfortunately we don't have any information about finalized block hash at the moment let finalizedHead = { - height: await this.getFinalizedHeight(options), + height: await this.getFinalizedHeight(o), hash: '', } let res = await this.client .request('POST', this.getDatasetUrl('finalized-stream'), { - ...options, - json: query, + ...o, + json: q, stream: true, }) .catch( withErrorContext({ - query, + query: q, }) ) @@ -191,67 +191,12 @@ function createReadablePortalStream( options?: PortalRequestOptions ) => Promise<{finalizedHead: HashAndHeight; stream: ReadableStream} | undefined> ): ReadableStream> { - let {headPollInterval, stopOnHead, maxBytes, minBytes, request, maxIdleTime, maxWaitTime} = options - maxBytes = Math.max(maxBytes, minBytes) + let {headPollInterval, stopOnHead, request, ...bufferOptions} = options let abortStream = new AbortController() - let buffer: {data: PortalStreamData; bytes: number} | undefined - let state: 'open' | 'failed' | 'closed' = 'open' - let error: unknown - - let readyFuture: Future = createFuture() - let takeFuture: Future = createFuture() - let putFuture: Future = createFuture() - - async function take() { - let waitTimeout = setTimeout(() => { - readyFuture.resolve() - }, maxWaitTime) - readyFuture.promise().finally(() => clearTimeout(waitTimeout)) - - await Promise.all([readyFuture.promise(), putFuture.promise()]) - - if (state === 'failed') { - throw error - } - - let value = buffer?.data - buffer = undefined - - takeFuture.resolve() - - if (state === 'closed') { - return {value, done: value == null} - } else { - if (value == null) { - throw new Error('buffer is empty') - } - - takeFuture = createFuture() - putFuture = createFuture() - readyFuture = createFuture() - - return {value, done: false} - } - } - - function close() { - if (state !== 'open') return - state = 'closed' - readyFuture.resolve() - putFuture.resolve() - takeFuture.resolve() - } - - function fail(err: unknown) { - if (state !== 'open') return - state = 'failed' - error = err - readyFuture.resolve() - putFuture.resolve() - takeFuture.resolve() - } + let finalizedHead: HashAndHeight | undefined + let buffer = new PortalStreamBuffer(bufferOptions) async function ingest() { let abortSignal = abortStream.signal @@ -264,9 +209,6 @@ function createReadablePortalStream( let reader: ReadableStreamDefaultReader | undefined - let lastChunkTimestamp = Date.now() - let idleInterval: ReturnType | undefined - try { let res = await requestStream( { @@ -283,61 +225,33 @@ function createReadablePortalStream( if (stopOnHead) break await wait(headPollInterval, abortSignal) } else { - let {finalizedHead, stream} = res - reader = stream.getReader() + finalizedHead = res.finalizedHead + reader = res.stream.getReader() while (true) { let data = await withAbort(reader.read(), abortSignal) if (data.done) break if (data.value.length == 0) continue - lastChunkTimestamp = Date.now() - if (idleInterval == null) { - idleInterval = setInterval(() => { - if (Date.now() - lastChunkTimestamp >= maxIdleTime) { - readyFuture.resolve() - } - }, Math.ceil(maxIdleTime / 3)) - readyFuture.promise().finally(() => clearInterval(idleInterval)) - takeFuture.promise().finally(() => (idleInterval = undefined)) - } - - if (buffer == null) { - buffer = { - data: {finalizedHead, blocks: []}, - bytes: 0, - } - } else { - buffer.data.finalizedHead = finalizedHead - } + let blocks: B[] = [] + let bytes = 0 for (let line of data.value) { let block = JSON.parse(line) as B - buffer.bytes += line.length - buffer.data.blocks.push(block) + blocks.push(block) + bytes += line.length fromBlock = block.header.number + 1 } - if (buffer.bytes >= minBytes) { - readyFuture.resolve() - } - - putFuture.resolve() - - if (buffer.bytes >= maxBytes) { - await withAbort(takeFuture.promise(), abortSignal) - } + await withAbort(buffer.put(blocks, bytes), abortSignal) } } - if (buffer != null) { - readyFuture.resolve() - } + buffer.ready() } finally { reader?.cancel().catch(() => {}) - clearInterval(idleInterval) } } } catch (err) { @@ -351,15 +265,21 @@ function createReadablePortalStream( return new ReadableStream({ start() { - ingest().then(close, fail) + ingest().then( + () => buffer.close(), + (err) => buffer.fail(err) + ) }, async pull(controller) { try { - let result = await take() + let result = await buffer.take() if (result.done) { controller.close() } else { - controller.enqueue(result.value) + controller.enqueue({ + finalizedHead, + blocks: result.value || [], + }) } } catch (err) { controller.error(err) @@ -371,6 +291,118 @@ function createReadablePortalStream( }) } +class PortalStreamBuffer { + private buffer: {blocks: B[]; bytes: number} | undefined + private state: 'open' | 'failed' | 'closed' = 'open' + private error: unknown + + private readyFuture: Future = createFuture() + private takeFuture: Future = createFuture() + private putFuture: Future = createFuture() + + private lastChunkTimestamp = Date.now() + private idleInterval: ReturnType | undefined + + private minBytes: number + private maxBytes: number + private maxIdleTime: number + private maxWaitTime: number + + constructor(options: {maxWaitTime: number; maxBytes: number; maxIdleTime: number; minBytes: number}) { + this.maxWaitTime = options.maxWaitTime + this.minBytes = options.minBytes + this.maxBytes = Math.max(options.maxBytes, options.minBytes) + this.maxIdleTime = options.maxIdleTime + } + + async take() { + let waitTimeout = setTimeout(() => { + this.readyFuture.resolve() + }, this.maxWaitTime) + this.readyFuture.promise().finally(() => clearTimeout(waitTimeout)) + + await Promise.all([this.readyFuture.promise(), this.putFuture.promise()]) + + if (this.state === 'failed') { + throw this.error + } + + let value = this.buffer?.blocks + this.buffer = undefined + + this.takeFuture.resolve() + + if (this.state === 'closed') { + return {value, done: value == null} + } else { + if (value == null) { + throw new Error('buffer is empty') + } + + this.takeFuture = createFuture() + this.putFuture = createFuture() + this.readyFuture = createFuture() + + return {value, done: false} + } + } + + async put(blocks: B[], bytes: number) { + this.lastChunkTimestamp = Date.now() + if (this.idleInterval == null) { + this.idleInterval = setInterval(() => { + if (Date.now() - this.lastChunkTimestamp >= this.maxIdleTime) { + this.readyFuture.resolve() + } + }, Math.ceil(this.maxIdleTime / 3)) + this.readyFuture.promise().finally(() => clearInterval(this.idleInterval)) + this.takeFuture.promise().finally(() => (this.idleInterval = undefined)) + } + + if (this.buffer == null) { + this.buffer = { + blocks: [], + bytes: 0, + } + } + + this.buffer.bytes += bytes + this.buffer.blocks.push(...blocks) + + if (this.buffer.bytes >= this.minBytes) { + this.readyFuture.resolve() + } + + this.putFuture.resolve() + + if (this.buffer.bytes >= this.maxBytes) { + await this.takeFuture.promise() + } + } + + ready() { + if (this.buffer == null) return + this.readyFuture.resolve() + } + + close() { + if (this.state !== 'open') return + this.state = 'closed' + this.readyFuture.resolve() + this.putFuture.resolve() + this.takeFuture.resolve() + } + + fail(err: unknown) { + if (this.state !== 'open') return + this.state = 'failed' + this.error = err + this.readyFuture.resolve() + this.putFuture.resolve() + this.takeFuture.resolve() + } +} + function withAbort(promise: Promise, signal: AbortSignal): Promise { return new Promise((resolve, reject) => { if (signal.aborted) {