Skip to content

Commit

Permalink
a bit more refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
belopash committed Jan 27, 2025
1 parent 6169159 commit b472b7a
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 115 deletions.
2 changes: 1 addition & 1 deletion evm/evm-processor/src/ds-archive/portal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ export class EvmPortal implements DataSource<Block, DataRequest> {

yield {
blocks,
isHead: lastBlock >= finalizedHead.height,
isHead: lastBlock >= (finalizedHead?.height ?? -1),
}
}

Expand Down
8 changes: 5 additions & 3 deletions substrate/substrate-processor/src/ds-portal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ function getFields(fields: FieldSelection | undefined): FieldSelection {
address: true,
}),
extrinsic: mergeFields(DEFAULT_FIELDS.extrinsic, fields?.extrinsic, {
index: true
index: true,
}),
}
}
Expand Down Expand Up @@ -98,7 +98,9 @@ export class SubstratePortal implements DataSource<Block, DataRequest> {
let endBlock = req.range.to || Infinity
let query = makeQuery(req)

for await (let {finalizedHead, blocks: batch} of this.client.getFinalizedStream<ArchiveBlock>(query, {stopOnHead})) {
for await (let {finalizedHead, blocks: batch} of this.client.getFinalizedStream<ArchiveBlock>(query, {
stopOnHead,
})) {
assert(batch.length > 0, 'boundary blocks are expected to be included')
lastBlock = last(batch).header.number

Expand All @@ -119,7 +121,7 @@ export class SubstratePortal implements DataSource<Block, DataRequest> {

yield {
blocks,
isHead: lastBlock >= finalizedHead.height,
isHead: lastBlock >= (finalizedHead?.height ?? -1),
}
}

Expand Down
9 changes: 4 additions & 5 deletions util/portal-client/src/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ async function main() {
http: new HttpClient({
retryAttempts: Infinity,
}),
minBytes: 50 * 1024 * 1024
minBytes: 50 * 1024 * 1024,
})

let from = await portal.getFinalizedHeight().then((h) => h - 1_000_000)
Expand Down Expand Up @@ -43,15 +43,14 @@ async function main() {
{
address: ['0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48'],
topic0: ['0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'],
}
]
},
],
}


for await (let {blocks, finalizedHead} of portal.getFinalizedStream(query, {stopOnHead: true})) {
console.log(
`[${new Date().toISOString()}] progress: ${blocks[blocks.length - 1].header.number} / ${
finalizedHead.height
finalizedHead?.height ?? -1
}` + `, blocks: ${blocks.length}`
)
}
Expand Down
244 changes: 138 additions & 106 deletions util/portal-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ export interface HashAndHeight {
}

export interface PortalStreamData<B extends Block> {
finalizedHead: HashAndHeight
finalizedHead?: HashAndHeight
blocks: B[]
}

Expand Down Expand Up @@ -143,23 +143,23 @@ export class PortalClient {
request,
stopOnHead,
},
async (query, options) => {
async (q, o) => {
// NOTE: we emulate the same behaviour as will be implemented for hot blocks stream,
// but unfortunately we don't have any information about finalized block hash at the moment
let finalizedHead = {
height: await this.getFinalizedHeight(options),
height: await this.getFinalizedHeight(o),
hash: '',
}

let res = await this.client
.request<Readable>('POST', this.getDatasetUrl('finalized-stream'), {
...options,
json: query,
...o,
json: q,
stream: true,
})
.catch(
withErrorContext({
query,
query: q,
})
)

Expand Down Expand Up @@ -191,67 +191,12 @@ function createReadablePortalStream<B extends Block>(
options?: PortalRequestOptions
) => Promise<{finalizedHead: HashAndHeight; stream: ReadableStream<string[]>} | undefined>
): ReadableStream<PortalStreamData<B>> {
let {headPollInterval, stopOnHead, maxBytes, minBytes, request, maxIdleTime, maxWaitTime} = options
maxBytes = Math.max(maxBytes, minBytes)
let {headPollInterval, stopOnHead, request, ...bufferOptions} = options

let abortStream = new AbortController()

let buffer: {data: PortalStreamData<B>; bytes: number} | undefined
let state: 'open' | 'failed' | 'closed' = 'open'
let error: unknown

let readyFuture: Future<void> = createFuture()
let takeFuture: Future<void> = createFuture()
let putFuture: Future<void> = createFuture()

async function take() {
let waitTimeout = setTimeout(() => {
readyFuture.resolve()
}, maxWaitTime)
readyFuture.promise().finally(() => clearTimeout(waitTimeout))

await Promise.all([readyFuture.promise(), putFuture.promise()])

if (state === 'failed') {
throw error
}

let value = buffer?.data
buffer = undefined

takeFuture.resolve()

if (state === 'closed') {
return {value, done: value == null}
} else {
if (value == null) {
throw new Error('buffer is empty')
}

takeFuture = createFuture()
putFuture = createFuture()
readyFuture = createFuture()

return {value, done: false}
}
}

function close() {
if (state !== 'open') return
state = 'closed'
readyFuture.resolve()
putFuture.resolve()
takeFuture.resolve()
}

function fail(err: unknown) {
if (state !== 'open') return
state = 'failed'
error = err
readyFuture.resolve()
putFuture.resolve()
takeFuture.resolve()
}
let finalizedHead: HashAndHeight | undefined
let buffer = new PortalStreamBuffer<B>(bufferOptions)

async function ingest() {
let abortSignal = abortStream.signal
Expand All @@ -264,9 +209,6 @@ function createReadablePortalStream<B extends Block>(

let reader: ReadableStreamDefaultReader<string[]> | undefined

let lastChunkTimestamp = Date.now()
let idleInterval: ReturnType<typeof setInterval> | undefined

try {
let res = await requestStream(
{
Expand All @@ -283,61 +225,33 @@ function createReadablePortalStream<B extends Block>(
if (stopOnHead) break
await wait(headPollInterval, abortSignal)
} else {
let {finalizedHead, stream} = res
reader = stream.getReader()
finalizedHead = res.finalizedHead
reader = res.stream.getReader()

while (true) {
let data = await withAbort(reader.read(), abortSignal)
if (data.done) break
if (data.value.length == 0) continue

lastChunkTimestamp = Date.now()
if (idleInterval == null) {
idleInterval = setInterval(() => {
if (Date.now() - lastChunkTimestamp >= maxIdleTime) {
readyFuture.resolve()
}
}, Math.ceil(maxIdleTime / 3))
readyFuture.promise().finally(() => clearInterval(idleInterval))
takeFuture.promise().finally(() => (idleInterval = undefined))
}

if (buffer == null) {
buffer = {
data: {finalizedHead, blocks: []},
bytes: 0,
}
} else {
buffer.data.finalizedHead = finalizedHead
}
let blocks: B[] = []
let bytes = 0

for (let line of data.value) {
let block = JSON.parse(line) as B

buffer.bytes += line.length
buffer.data.blocks.push(block)
blocks.push(block)
bytes += line.length

fromBlock = block.header.number + 1
}

if (buffer.bytes >= minBytes) {
readyFuture.resolve()
}

putFuture.resolve()

if (buffer.bytes >= maxBytes) {
await withAbort(takeFuture.promise(), abortSignal)
}
await withAbort(buffer.put(blocks, bytes), abortSignal)
}
}

if (buffer != null) {
readyFuture.resolve()
}
buffer.ready()
} finally {
reader?.cancel().catch(() => {})
clearInterval(idleInterval)
}
}
} catch (err) {
Expand All @@ -351,15 +265,21 @@ function createReadablePortalStream<B extends Block>(

return new ReadableStream({
start() {
ingest().then(close, fail)
ingest().then(
() => buffer.close(),
(err) => buffer.fail(err)
)
},
async pull(controller) {
try {
let result = await take()
let result = await buffer.take()
if (result.done) {
controller.close()
} else {
controller.enqueue(result.value)
controller.enqueue({
finalizedHead,
blocks: result.value || [],
})
}
} catch (err) {
controller.error(err)
Expand All @@ -371,6 +291,118 @@ function createReadablePortalStream<B extends Block>(
})
}

class PortalStreamBuffer<B extends Block> {
private buffer: {blocks: B[]; bytes: number} | undefined
private state: 'open' | 'failed' | 'closed' = 'open'
private error: unknown

private readyFuture: Future<void> = createFuture()
private takeFuture: Future<void> = createFuture()
private putFuture: Future<void> = createFuture()

private lastChunkTimestamp = Date.now()
private idleInterval: ReturnType<typeof setInterval> | undefined

private minBytes: number
private maxBytes: number
private maxIdleTime: number
private maxWaitTime: number

constructor(options: {maxWaitTime: number; maxBytes: number; maxIdleTime: number; minBytes: number}) {
this.maxWaitTime = options.maxWaitTime
this.minBytes = options.minBytes
this.maxBytes = Math.max(options.maxBytes, options.minBytes)
this.maxIdleTime = options.maxIdleTime
}

async take() {
let waitTimeout = setTimeout(() => {
this.readyFuture.resolve()
}, this.maxWaitTime)
this.readyFuture.promise().finally(() => clearTimeout(waitTimeout))

await Promise.all([this.readyFuture.promise(), this.putFuture.promise()])

if (this.state === 'failed') {
throw this.error
}

let value = this.buffer?.blocks
this.buffer = undefined

this.takeFuture.resolve()

if (this.state === 'closed') {
return {value, done: value == null}
} else {
if (value == null) {
throw new Error('buffer is empty')
}

this.takeFuture = createFuture()
this.putFuture = createFuture()
this.readyFuture = createFuture()

return {value, done: false}
}
}

async put(blocks: B[], bytes: number) {
this.lastChunkTimestamp = Date.now()
if (this.idleInterval == null) {
this.idleInterval = setInterval(() => {
if (Date.now() - this.lastChunkTimestamp >= this.maxIdleTime) {
this.readyFuture.resolve()
}
}, Math.ceil(this.maxIdleTime / 3))
this.readyFuture.promise().finally(() => clearInterval(this.idleInterval))
this.takeFuture.promise().finally(() => (this.idleInterval = undefined))
}

if (this.buffer == null) {
this.buffer = {
blocks: [],
bytes: 0,
}
}

this.buffer.bytes += bytes
this.buffer.blocks.push(...blocks)

if (this.buffer.bytes >= this.minBytes) {
this.readyFuture.resolve()
}

this.putFuture.resolve()

if (this.buffer.bytes >= this.maxBytes) {
await this.takeFuture.promise()
}
}

ready() {
if (this.buffer == null) return
this.readyFuture.resolve()
}

close() {
if (this.state !== 'open') return
this.state = 'closed'
this.readyFuture.resolve()
this.putFuture.resolve()
this.takeFuture.resolve()
}

fail(err: unknown) {
if (this.state !== 'open') return
this.state = 'failed'
this.error = err
this.readyFuture.resolve()
this.putFuture.resolve()
this.takeFuture.resolve()
}
}

function withAbort<T>(promise: Promise<T>, signal: AbortSignal): Promise<T> {
return new Promise<T>((resolve, reject) => {
if (signal.aborted) {
Expand Down

0 comments on commit b472b7a

Please sign in to comment.