Skip to content

Commit

Permalink
refactor portal client and add solana support
Browse files Browse the repository at this point in the history
  • Loading branch information
belopash committed Jan 21, 2025
1 parent e26ce7f commit 6b2753f
Show file tree
Hide file tree
Showing 11 changed files with 686 additions and 360 deletions.
14 changes: 7 additions & 7 deletions common/config/rush/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 3 additions & 8 deletions evm/evm-processor/src/ds-archive/portal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export class EvmPortal implements DataSource<Block, DataRequest> {
range: {from: height, to: height},
request: {includeAllBlocks: true},
})
let blocks = await this.client.finalizedQuery(query)
let blocks = await this.client.getFinalizedQuery(query)
assert(blocks.length == 1)
return blocks[0].header.hash
}
Expand All @@ -77,15 +77,12 @@ export class EvmPortal implements DataSource<Block, DataRequest> {
requests: RangeRequest<DataRequest>[],
stopOnHead?: boolean | undefined
): AsyncIterable<Batch<Block>> {
let height = new Throttler(() => this.client.getFinalizedHeight(), 20_000)

let top = await height.call()
for (let req of requests) {
let lastBlock = req.range.from - 1
let endBlock = req.range.to || Infinity
let query = makeQuery(req)

for await (let batch of this.client.finalizedStream(query, stopOnHead)) {
for await (let {finalizedHead, blocks: batch} of this.client.getFinalizedStream(query, {stopOnHead})) {
assert(batch.length > 0, 'boundary blocks are expected to be included')
lastBlock = last(batch).header.number

Expand All @@ -102,10 +99,8 @@ export class EvmPortal implements DataSource<Block, DataRequest> {

yield {
blocks,
isHead: lastBlock > top,
isHead: lastBlock >= finalizedHead.height,
}

top = await height.get()
}

// stream ended before requested range,
Expand Down
109 changes: 56 additions & 53 deletions evm/evm-processor/src/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ export interface RpcEndpointSettings {
requestTimeout?: number
/**
* Maximum number of retry attempts.
*
* By default, retries all "retryable" errors indefinitely.
*
* By default, retries all "retryable" errors indefinitely.
*/
retryAttempts?: number
/**
Expand All @@ -51,7 +51,6 @@ export interface RpcEndpointSettings {
headers?: Record<string, string>
}


export interface RpcDataIngestionSettings {
/**
* By default, `debug_traceBlockByHash` is used to obtain call traces,
Expand Down Expand Up @@ -93,7 +92,7 @@ export interface RpcDataIngestionSettings {
/**
* Flags to switch off the data consistency checks
*/
validationFlags?: RpcValidationFlags
validationFlags?: RpcValidationFlags
}


Expand All @@ -120,7 +119,7 @@ export interface PortalSettings {
requestTimeout?: number

retryAttempts?: number

bufferThreshold?: number

newBlockTimeout?: number
Expand Down Expand Up @@ -209,7 +208,7 @@ export class EvmBatchProcessor<F extends FieldSelection = {}> {
private blockRange?: Range
private fields?: FieldSelection
private finalityConfirmation?: number
private archive?: GatewaySettings & {type: 'gateway'} | PortalSettings & {type: 'portal'}
private archive?: (GatewaySettings & {type: 'gateway'}) | (PortalSettings & {type: 'portal'})
private rpcIngestSettings?: RpcDataIngestionSettings
private rpcEndpoint?: RpcEndpointSettings
private running = false
Expand Down Expand Up @@ -372,7 +371,7 @@ export class EvmBatchProcessor<F extends FieldSelection = {}> {
private add(request: DataRequest, range?: Range): void {
this.requests.push({
range: range || {from: 0},
request
request,
})
}

Expand Down Expand Up @@ -477,11 +476,11 @@ export class EvmBatchProcessor<F extends FieldSelection = {}> {
url: this.rpcEndpoint.url,
headers: this.rpcEndpoint.headers,
maxBatchCallSize: this.rpcEndpoint.maxBatchCallSize ?? 100,
requestTimeout: this.rpcEndpoint.requestTimeout ?? 30_000,
requestTimeout: this.rpcEndpoint.requestTimeout ?? 30_000,
capacity: this.rpcEndpoint.capacity ?? 10,
rateLimit: this.rpcEndpoint.rateLimit,
retryAttempts: this.rpcEndpoint.retryAttempts ?? Number.MAX_SAFE_INTEGER,
log: this.getLogger().child('rpc', {rpcUrl: this.rpcEndpoint.url})
log: this.getLogger().child('rpc', {rpcUrl: this.rpcEndpoint.url}),
})
this.getPrometheusServer().addChainRpcMetrics(() => client.getMetrics())
return client
Expand All @@ -493,7 +492,7 @@ export class EvmBatchProcessor<F extends FieldSelection = {}> {
return {
get client() {
return self.getChainRpcClient()
}
},
}
}

Expand All @@ -511,7 +510,7 @@ export class EvmBatchProcessor<F extends FieldSelection = {}> {
headPollInterval: this.rpcIngestSettings?.headPollInterval,
newHeadTimeout: this.rpcIngestSettings?.newHeadTimeout,
validationFlags: this.rpcIngestSettings?.validationFlags,
log: this.getLogger().child('rpc', {rpcUrl: this.getChainRpcClient().url})
log: this.getLogger().child('rpc', {rpcUrl: this.getChainRpcClient().url}),
})
}

Expand All @@ -521,34 +520,38 @@ export class EvmBatchProcessor<F extends FieldSelection = {}> {

let log = this.getLogger().child('archive')

let http = new HttpClient({
headers: {
'x-squid-id': this.getSquidId()
},
agent: new HttpAgent({
keepAlive: true
}),
log
let headers = {
'x-squid-id': this.getSquidId(),
}
let agent = new HttpAgent({
keepAlive: true,
})

return archive.type === 'gateway'
? new EvmArchive(
new ArchiveClient({
http,
http: new HttpClient({
headers,
agent,
log,
}),
url: archive.url,
queryTimeout: archive.requestTimeout,
log,
})
)
: new EvmPortal(
new PortalClient({
http,
http: new HttpClient({
headers,
agent,
log,
httpTimeout: archive.requestTimeout,
retryAttempts: archive.retryAttempts,
}),
url: archive.url,
requestTimeout: archive.requestTimeout,
retryAttempts: archive.retryAttempts,
bufferThreshold: archive.bufferThreshold,
newBlockTimeout: archive.newBlockTimeout,
log,
minBytes: archive.bufferThreshold,
maxIdleTime: archive.newBlockTimeout,
})
)
}
Expand Down Expand Up @@ -593,40 +596,40 @@ export class EvmBatchProcessor<F extends FieldSelection = {}> {
let log = this.getLogger()

runProgram(async () => {
let chain = this.getChain()
let mappingLog = log.child('mapping')

if (this.archive == null && this.rpcEndpoint == null) {
throw new Error(
'No data source where specified. ' +
'Use .setArchive() to specify Subsquid Archive and/or .setRpcEndpoint() to specify RPC endpoint.'
)
}
let chain = this.getChain()
let mappingLog = log.child('mapping')

if (this.archive == null && this.rpcEndpoint == null) {
throw new Error(
'No data source where specified. ' +
'Use .setArchive() to specify Subsquid Archive and/or .setRpcEndpoint() to specify RPC endpoint.'
)
}

if (this.archive == null && this.rpcIngestSettings?.disabled) {
throw new Error('Subsquid Archive is required when RPC data ingestion is disabled')
}
if (this.archive == null && this.rpcIngestSettings?.disabled) {
throw new Error('Subsquid Archive is required when RPC data ingestion is disabled')
}

return new Runner({
database,
requests: this.getBatchRequests(),
archive: this.archive ? this.getArchiveDataSource() : undefined,
return new Runner({
database,
requests: this.getBatchRequests(),
archive: this.archive ? this.getArchiveDataSource() : undefined,
hotDataSource: this.rpcEndpoint && !this.rpcIngestSettings?.disabled
? this.getHotDataSource()
: undefined,
allBlocksAreFinal: this.finalityConfirmation === 0,
prometheus: this.getPrometheusServer(),
log,
process(store, batch) {
return handler({
_chain: chain,
log: mappingLog,
store,
blocks: batch.blocks as any,
allBlocksAreFinal: this.finalityConfirmation === 0,
prometheus: this.getPrometheusServer(),
log,
process(store, batch) {
return handler({
_chain: chain,
log: mappingLog,
store,
blocks: batch.blocks as any,
isHead: batch.isHead
})
})
}
}).run()
}).run()
}, err => log.fatal(err))
}
}
Expand Down
3 changes: 2 additions & 1 deletion solana/solana-stream/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
"@subsquid/util-internal-processor-tools": "^4.2.1",
"@subsquid/util-internal-range": "^0.3.0",
"@subsquid/util-internal-validation": "^0.7.0",
"bs58": "^5.0.0"
"bs58": "^5.0.0",
"@subsquid/portal-client": "~0.0.0"
},
"devDependencies": {
"@types/node": "^18.18.14",
Expand Down
Loading

0 comments on commit 6b2753f

Please sign in to comment.