diff --git a/src/__tests__/unit/evaluation-options.test.ts b/src/__tests__/unit/evaluation-options.test.ts index f610be7d..38877c16 100644 --- a/src/__tests__/unit/evaluation-options.test.ts +++ b/src/__tests__/unit/evaluation-options.test.ts @@ -27,6 +27,7 @@ describe('Evaluation options evaluator', () => { saveState: false }, throwOnInternalWriteError: true, + transactionsPagesPerBatch: null, unsafeClient: 'throw', updateCacheForEachInteraction: false, useKVStorage: false, @@ -65,6 +66,7 @@ describe('Evaluation options evaluator', () => { saveState: false }, throwOnInternalWriteError: true, + transactionsPagesPerBatch: null, unsafeClient: 'throw', updateCacheForEachInteraction: false, useKVStorage: false, @@ -98,6 +100,7 @@ describe('Evaluation options evaluator', () => { saveState: false }, throwOnInternalWriteError: true, + transactionsPagesPerBatch: null, unsafeClient: 'allow', updateCacheForEachInteraction: false, useKVStorage: false, @@ -128,6 +131,7 @@ describe('Evaluation options evaluator', () => { saveState: false }, throwOnInternalWriteError: true, + transactionsPagesPerBatch: null, unsafeClient: 'allow', updateCacheForEachInteraction: false, useKVStorage: false, @@ -158,6 +162,7 @@ describe('Evaluation options evaluator', () => { saveState: false }, throwOnInternalWriteError: true, + transactionsPagesPerBatch: null, unsafeClient: 'throw', updateCacheForEachInteraction: false, useKVStorage: false, @@ -188,6 +193,7 @@ describe('Evaluation options evaluator', () => { saveState: false }, throwOnInternalWriteError: true, + transactionsPagesPerBatch: null, unsafeClient: 'skip', updateCacheForEachInteraction: false, useKVStorage: false, diff --git a/src/contract/Contract.ts b/src/contract/Contract.ts index bf698589..dc4650a5 100644 --- a/src/contract/Contract.ts +++ b/src/contract/Contract.ts @@ -112,11 +112,22 @@ export interface Contract { */ readState( sortKeyOrBlockHeight?: string | number, - caller?: string, - interactions?: GQLNodeInterface[] + interactions?: GQLNodeInterface[], + signal?: AbortSignal ): Promise>>; - readStateFor(sortKey: string, interactions: GQLNodeInterface[]): Promise>>; + /** + * Reads state in batches - i.e. it first loads max. 5k interactions, evaluates them, then reads another 5k..and so on. + * + * Consider this as an experimental feature + */ + readStateBatch(pagesPerBatch: number, signal: AbortSignal): Promise>>; + + readStateFor( + sortKey: string, + interactions: GQLNodeInterface[], + signal?: AbortSignal + ): Promise>>; /** * Returns the "view" of the state, computed by the SWC - @@ -138,7 +149,8 @@ export interface Contract { input: Input, tags?: Tags, transfer?: ArTransfer, - caller?: string + caller?: string, + signal?: AbortSignal ): Promise>; /** @@ -155,7 +167,8 @@ export interface Contract { */ viewStateForTx( input: Input, - transaction: GQLNodeInterface + transaction: GQLNodeInterface, + signal?: AbortSignal ): Promise>; /** @@ -177,7 +190,11 @@ export interface Contract { vrf?: boolean ): Promise>; - applyInput(input: Input, transaction: GQLNodeInterface): Promise>; + applyInput( + input: Input, + transaction: GQLNodeInterface, + signal?: AbortSignal + ): Promise>; /** * Writes a new "interaction" transaction - i.e. such transaction that stores input for the contract. @@ -189,7 +206,7 @@ export interface Contract { /** * Returns the full call tree report the last - * interaction with contract (eg. after reading state) + * interaction with contract (e.g. after reading state) */ getCallStack(): ContractCallRecord; diff --git a/src/contract/EvaluationOptionsEvaluator.ts b/src/contract/EvaluationOptionsEvaluator.ts index 477a2ce3..47d69184 100644 --- a/src/contract/EvaluationOptionsEvaluator.ts +++ b/src/contract/EvaluationOptionsEvaluator.ts @@ -107,13 +107,15 @@ export class EvaluationOptionsEvaluator { remoteStateSyncSource: () => this.rootOptions['remoteStateSyncSource'], useKVStorage: (foreignOptions) => foreignOptions['useKVStorage'], useConstructor: (foreignOptions) => foreignOptions['useConstructor'], - whitelistSources: () => this.rootOptions['whitelistSources'] + whitelistSources: () => this.rootOptions['whitelistSources'], + transactionsPagesPerBatch: () => this.rootOptions['transactionsPagesPerBatch'] }; private readonly notConflictingEvaluationOptions: (keyof EvaluationOptions)[] = [ 'useKVStorage', 'sourceType', - 'useConstructor' + 'useConstructor', + 'transactionsPagesPerBatch' ]; /** diff --git a/src/contract/HandlerBasedContract.ts b/src/contract/HandlerBasedContract.ts index 95c79e29..226584e9 100644 --- a/src/contract/HandlerBasedContract.ts +++ b/src/contract/HandlerBasedContract.ts @@ -3,13 +3,17 @@ import { SortKeyCacheResult } from '../cache/SortKeyCache'; import { ContractCallRecord, InteractionCall } from '../core/ContractCallRecord'; import { ExecutionContext } from '../core/ExecutionContext'; import { + AbortError, ContractInteraction, HandlerApi, InteractionData, InteractionResult, InteractionType -} from '../core/modules/impl/HandlerExecutorFactory'; -import { LexicographicalInteractionsSorter } from '../core/modules/impl/LexicographicalInteractionsSorter'; +} from "../core/modules/impl/HandlerExecutorFactory"; +import { + genesisSortKey, + LexicographicalInteractionsSorter +} from '../core/modules/impl/LexicographicalInteractionsSorter'; import { InteractionsSorter } from '../core/modules/InteractionsSorter'; import { DefaultEvaluationOptions, EvalStateResult, EvaluationOptions } from '../core/modules/StateEvaluator'; import { WARP_TAGS } from '../core/KnownTags'; @@ -38,9 +42,9 @@ import { Mutex } from 'async-mutex'; import { Tag, Transaction, TransactionStatusResponse } from '../utils/types/arweave-types'; import { InteractionState } from './states/InteractionState'; import { ContractInteractionState } from './states/ContractInteractionState'; -import { Crypto } from 'warp-isomorphic'; +import { Buffer, Crypto } from 'warp-isomorphic'; import { VrfPluginFunctions } from '../core/WarpPlugin'; -import { createData, tagsExceedLimit, DataItem, Signer } from 'warp-arbundles'; +import { createData, DataItem, Signer, tagsExceedLimit } from 'warp-arbundles'; /** * An implementation of {@link Contract} that is backwards compatible with current style @@ -134,8 +138,8 @@ export class HandlerBasedContract implements Contract { async readState( sortKeyOrBlockHeight?: string | number, - caller?: string, - interactions?: GQLNodeInterface[] + interactions?: GQLNodeInterface[], + signal?: AbortSignal ): Promise>> { this.logger.info('Read state for', { contractTxId: this._contractTxId, @@ -162,7 +166,13 @@ export class HandlerBasedContract implements Contract { const initBenchmark = Benchmark.measure(); this.maybeResetRootContract(); - const executionContext = await this.createExecutionContext(this._contractTxId, sortKey, false, interactions); + const executionContext = await this.createExecutionContext( + this._contractTxId, + sortKey, + false, + interactions, + signal + ); this.logger.info('Execution Context', { srcTxId: executionContext.contractDefinition?.srcTxId, missingInteractions: executionContext.sortedInteractions?.length, @@ -200,27 +210,86 @@ export class HandlerBasedContract implements Contract { async readStateFor( sortKey: string, - interactions: GQLNodeInterface[] + interactions: GQLNodeInterface[], + signal?: AbortSignal ): Promise>> { - return this.readState(sortKey, undefined, interactions); + return this.readState(sortKey, interactions, signal); + } + + async readStateBatch(pagesPerBatch = 1, signal: AbortSignal): Promise>> { + if (!this.isRoot()) { + throw new Error('readStateBatch is only allowed for root contract calls'); + } + if (pagesPerBatch < 1) { + throw new Error('At least one page per batch is required'); + } + if (signal.aborted) { + throw new AbortError('readStateBatch aborted'); + } + + const contractTxId = this._contractTxId; + const { interactionsLoader, stateEvaluator } = this.warp; + let cachedState = await stateEvaluator.latestAvailableState(contractTxId); + + const evaluationOptions = { + ...this._evaluationOptions, + transactionsPagesPerBatch: pagesPerBatch + }; + + let interactions: GQLNodeInterface[]; + let batchesLoaded = 0; + do { + const batchBenchmark = Benchmark.measure(); + this.logger.debug(`Loading ${++batchesLoaded}`, evaluationOptions); + interactions = await interactionsLoader.load(contractTxId, cachedState?.sortKey, undefined, evaluationOptions); + if (signal.aborted) { + throw new AbortError('readStateBatch aborted'); + } + if (interactions.length == 0) { + break; + } + this.logger.debug(`Evaluating ${interactions.length} in ${batchesLoaded}`); + cachedState = await this.readStateFor(cachedState?.sortKey || genesisSortKey, interactions, signal); + if (signal.aborted) { + throw new AbortError('readStateBatch aborted'); + } + this.logger.debug( + `Batch ${batchesLoaded} evaluated in ${batchBenchmark.elapsed()} at sortKey ${cachedState.sortKey}` + ); + } while (interactions.length > 0); + + return cachedState; } async viewState( input: Input, tags: Tags = [], transfer: ArTransfer = emptyTransfer, - caller?: string + caller?: string, + signal?: AbortSignal ): Promise> { this.logger.info('View state for', this._contractTxId); - return await this.callContract(input, 'view', caller, undefined, tags, transfer); + return await this.callContract( + input, + 'view', + caller, + undefined, + tags, + transfer, + false, + false, + true, + signal + ); } async viewStateForTx( input: Input, - interactionTx: GQLNodeInterface + interactionTx: GQLNodeInterface, + signal?: AbortSignal ): Promise> { this.logger.info(`View state for ${this._contractTxId}`); - return await this.doApplyInputOnTx(input, interactionTx, 'view'); + return await this.doApplyInputOnTx(input, interactionTx, 'view', signal); } async dryWrite( @@ -234,9 +303,13 @@ export class HandlerBasedContract implements Contract { return await this.callContract(input, 'write', caller, undefined, tags, transfer, undefined, vrf); } - async applyInput(input: Input, transaction: GQLNodeInterface): Promise> { + async applyInput( + input: Input, + transaction: GQLNodeInterface, + signal?: AbortSignal + ): Promise> { this.logger.info(`Apply-input from transaction ${transaction.id} for ${this._contractTxId}`); - return await this.doApplyInputOnTx(input, transaction, 'write'); + return await this.doApplyInputOnTx(input, transaction, 'write', signal); } async writeInteraction( @@ -509,7 +582,8 @@ export class HandlerBasedContract implements Contract { contractTxId: string, upToSortKey?: string, forceDefinitionLoad = false, - interactions?: GQLNodeInterface[] + interactions?: GQLNodeInterface[], + signal?: AbortSignal ): Promise>> { const { definitionLoader, interactionsLoader, stateEvaluator } = this.warp; let cachedState: SortKeyCacheResult>; @@ -531,14 +605,17 @@ export class HandlerBasedContract implements Contract { this.logger.debug('Cached state', cachedState, upToSortKey); - if (cachedState && cachedState.sortKey == upToSortKey) { + if ( + (cachedState && cachedState.sortKey == upToSortKey) || + (upToSortKey == genesisSortKey && interactions?.length) + ) { this.logger.debug('State fully cached, not loading interactions.'); if (forceDefinitionLoad || evolvedSrcTxId || interactions?.length) { contractDefinition = await definitionLoader.load(contractTxId, evolvedSrcTxId); + contractEvaluationOptions = this.resolveEvaluationOptions(contractDefinition.manifest?.evaluationOptions); + this.warp.executorFactory.checkWhiteListContractSources(contractDefinition, contractEvaluationOptions); if (interactions?.length) { - sortedInteractions = (await this._sorter.sort(interactions.map((i) => ({ node: i, cursor: null })))).map( - (i) => i.node - ); + sortedInteractions = await this.getSortedInteractions(interactions); } } } else { @@ -616,10 +693,15 @@ export class HandlerBasedContract implements Contract { evaluationOptions: contractEvaluationOptions || this.evaluationOptions(), handler, cachedState, - requestedSortKey: upToSortKey + requestedSortKey: upToSortKey, + signal }; } + private async getSortedInteractions(interactions: GQLNodeInterface[]) { + return (await this._sorter.sort(interactions.map((i) => ({ node: i, cursor: null })))).map((i) => i.node); + } + private resolveEvaluationOptions(rootManifestEvalOptions: EvaluationOptions) { if (this.isRoot()) { this._eoEvaluator = new EvaluationOptionsEvaluator(this.evaluationOptions(), rootManifestEvalOptions); @@ -661,12 +743,13 @@ export class HandlerBasedContract implements Contract { private async createExecutionContextFromTx( contractTxId: string, - transaction: GQLNodeInterface + transaction: GQLNodeInterface, + signal?: AbortSignal ): Promise>> { const caller = transaction.owner.address; const sortKey = transaction.sortKey; - const baseContext = await this.createExecutionContext(contractTxId, sortKey, true); + const baseContext = await this.createExecutionContext(contractTxId, sortKey, true, undefined, signal); return { ...baseContext, @@ -695,7 +778,8 @@ export class HandlerBasedContract implements Contract { transfer: ArTransfer = emptyTransfer, strict = false, vrf = false, - sign = true + sign = true, + signal?: AbortSignal ): Promise> { this.logger.info('Call contract input', input); this.maybeResetRootContract(); @@ -704,7 +788,7 @@ export class HandlerBasedContract implements Contract { } const { arweave, stateEvaluator } = this.warp; // create execution context - let executionContext = await this.createExecutionContext(this._contractTxId, sortKey, true); + let executionContext = await this.createExecutionContext(this._contractTxId, sortKey, true, undefined, signal); const currentBlockData = this.warp.environment == 'mainnet' && !(this.warp.interactionsLoader.type() === 'arweave') @@ -791,12 +875,13 @@ export class HandlerBasedContract implements Contract { private async doApplyInputOnTx( input: Input, interactionTx: GQLNodeInterface, - interactionType: InteractionType + interactionType: InteractionType, + signal?: AbortSignal ): Promise> { this.maybeResetRootContract(); let evalStateResult: SortKeyCacheResult>; - const executionContext = await this.createExecutionContextFromTx(this._contractTxId, interactionTx); + const executionContext = await this.createExecutionContextFromTx(this._contractTxId, interactionTx, signal); if (!this.isRoot() && this.interactionState().has(this.txId(), interactionTx.sortKey)) { evalStateResult = new SortKeyCacheResult>( diff --git a/src/core/ExecutionContext.ts b/src/core/ExecutionContext.ts index 99d7c64a..d1fcf0b6 100644 --- a/src/core/ExecutionContext.ts +++ b/src/core/ExecutionContext.ts @@ -43,4 +43,5 @@ export type ExecutionContext = { caller?: string; // note: this is only set for "viewState" and "write" operations cachedState?: SortKeyCacheResult>; requestedSortKey?: string; + signal?: AbortSignal; }; diff --git a/src/core/modules/InteractionsLoader.ts b/src/core/modules/InteractionsLoader.ts index 24590592..c7906365 100644 --- a/src/core/modules/InteractionsLoader.ts +++ b/src/core/modules/InteractionsLoader.ts @@ -27,7 +27,8 @@ export interface InteractionsLoader extends GwTypeAware, WarpAware { contractTxId: string, fromSortKey?: string, toSortKey?: string, - evaluationOptions?: EvaluationOptions + evaluationOptions?: EvaluationOptions, + signal?: AbortSignal ): Promise; clearCache(): void; diff --git a/src/core/modules/StateEvaluator.ts b/src/core/modules/StateEvaluator.ts index 0785a810..98fe638d 100644 --- a/src/core/modules/StateEvaluator.ts +++ b/src/core/modules/StateEvaluator.ts @@ -152,6 +152,8 @@ export class DefaultEvaluationOptions implements EvaluationOptions { useConstructor = false; whitelistSources = []; + + transactionsPagesPerBatch = null; } // an interface for the contract EvaluationOptions - can be used to change the behaviour of some features. @@ -242,6 +244,8 @@ export interface EvaluationOptions { remoteStateSyncSource: string; whitelistSources: string[]; + + transactionsPagesPerBatch: number; } // https://github.com/nodejs/node/issues/40678 duh... diff --git a/src/core/modules/impl/ArweaveGQLTxsFetcher.ts b/src/core/modules/impl/ArweaveGQLTxsFetcher.ts index ff5dcc5d..e7d1f414 100644 --- a/src/core/modules/impl/ArweaveGQLTxsFetcher.ts +++ b/src/core/modules/impl/ArweaveGQLTxsFetcher.ts @@ -5,6 +5,7 @@ import { sleep } from '../../../utils/utils'; import { Benchmark } from '../../../logging/Benchmark'; import { Warp } from '../../Warp'; import { WARP_TAGS } from '../../KnownTags'; +import { AbortError } from "./HandlerExecutorFactory"; const TRANSACTIONS_QUERY = `query Transactions($tags: [TagFilter!]!, $blockFilter: BlockFilter!, $first: Int!, $after: String) { transactions(tags: $tags, block: $blockFilter, first: $first, sort: HEIGHT_ASC, after: $after) { @@ -117,7 +118,6 @@ export class ArweaveGQLTxsFetcher { async transaction(transactionId: string): Promise { const response = await this.fetch(TRANSACTION_QUERY, { id: transactionId }); - return response.transaction; } @@ -138,10 +138,26 @@ export class ArweaveGQLTxsFetcher { return response.edges[0].node; } - async transactions(variables: ArweaveTransactionQuery): Promise { + async transactions( + variables: ArweaveTransactionQuery, + pagesPerBatch: number, + signal?: AbortSignal + ): Promise { let pageResult = (await this.fetch(TRANSACTIONS_QUERY, variables)).transactions; const edges: GQLEdgeInterface[] = [...pageResult.edges]; - while (pageResult.pageInfo.hasNextPage) { + let pagesLoaded = 1; + if (pagesLoaded >= pagesPerBatch) { + return edges; + } + if (signal?.aborted) { + throw new AbortError(`Abort signal in ${ArweaveGQLTxsFetcher.name}`); + } + + while (pageResult.pageInfo.hasNextPage && pagesLoaded < pagesPerBatch) { + if (signal?.aborted) { + throw new AbortError(`Abort signal in ${ArweaveGQLTxsFetcher.name}`); + } + const cursor = pageResult.edges[MAX_REQUEST - 1].cursor; const newVariables = { @@ -151,6 +167,7 @@ export class ArweaveGQLTxsFetcher { pageResult = (await this.fetch(TRANSACTIONS_QUERY, newVariables)).transactions; edges.push(...pageResult.edges); + pagesLoaded++; } return edges; } diff --git a/src/core/modules/impl/ArweaveGatewayBundledInteractionLoader.ts b/src/core/modules/impl/ArweaveGatewayBundledInteractionLoader.ts index e60ba210..cc4f0328 100644 --- a/src/core/modules/impl/ArweaveGatewayBundledInteractionLoader.ts +++ b/src/core/modules/impl/ArweaveGatewayBundledInteractionLoader.ts @@ -59,9 +59,13 @@ export class ArweaveGatewayBundledInteractionLoader implements InteractionsLoade contractId: string, fromSortKey?: string, toSortKey?: string, - evaluationOptions?: EvaluationOptions + evaluationOptions?: EvaluationOptions, + signal?: AbortSignal ): Promise { this.logger.debug('Loading interactions for', { contractId, fromSortKey, toSortKey }); + if (evaluationOptions?.transactionsPagesPerBatch) { + throw new Error(`Loading in batches not yet implemented for ${ArweaveGatewayBundledInteractionLoader.name}`); + } const fromBlockHeight = this.sorter.extractBlockHeight(fromSortKey) || 0; const toBlockHeight = this.sorter.extractBlockHeight(toSortKey) || (await this.currentBlockHeight()); @@ -89,14 +93,15 @@ export class ArweaveGatewayBundledInteractionLoader implements InteractionsLoade }; const loadingBenchmark = Benchmark.measure(); - let interactions = await this.arweaveFetcher.transactions(mainTransactionsQuery); + let interactions = await this.arweaveFetcher.transactions(mainTransactionsQuery, Number.MAX_SAFE_INTEGER, signal); if (evaluationOptions.internalWrites) { interactions = await this.appendInternalWriteInteractions( contractId, fromBlockHeight, toBlockHeight, - interactions + interactions, + signal ); } loadingBenchmark.stop(); @@ -227,7 +232,8 @@ export class ArweaveGatewayBundledInteractionLoader implements InteractionsLoade contractId: string, fromBlockHeight: number, toBlockHeight: number, - interactions: GQLEdgeInterface[] + interactions: GQLEdgeInterface[], + signal: AbortSignal ) { const innerWritesVariables: GqlReqVariables = { tags: [ @@ -242,7 +248,11 @@ export class ArweaveGatewayBundledInteractionLoader implements InteractionsLoade }, first: MAX_REQUEST }; - const innerWritesInteractions = await this.arweaveFetcher.transactions(innerWritesVariables); + const innerWritesInteractions = await this.arweaveFetcher.transactions( + innerWritesVariables, + Number.MAX_SAFE_INTEGER, + signal + ); this.logger.debug('Inner writes interactions length:', innerWritesInteractions.length); interactions = interactions.concat(innerWritesInteractions); return interactions; diff --git a/src/core/modules/impl/ArweaveGatewayInteractionsLoader.ts b/src/core/modules/impl/ArweaveGatewayInteractionsLoader.ts index 73defff6..4b28331c 100644 --- a/src/core/modules/impl/ArweaveGatewayInteractionsLoader.ts +++ b/src/core/modules/impl/ArweaveGatewayInteractionsLoader.ts @@ -34,12 +34,15 @@ export class ArweaveGatewayInteractionsLoader implements InteractionsLoader { contractId: string, fromSortKey?: string, toSortKey?: string, - evaluationOptions?: EvaluationOptions + evaluationOptions?: EvaluationOptions, + signal?: AbortSignal ): Promise { this.logger.debug('Loading interactions for', { contractId, fromSortKey, toSortKey }); const fromBlockHeight = this.sorter.extractBlockHeight(fromSortKey); - const toBlockHeight = this.sorter.extractBlockHeight(toSortKey); + let toBlockHeight = this.sorter.extractBlockHeight(toSortKey); + const pagesPerBatch = evaluationOptions?.transactionsPagesPerBatch || Number.MAX_SAFE_INTEGER; + this.logger.debug('Pages per batch', pagesPerBatch); const mainTransactionsQuery: ArweaveTransactionQuery = { tags: [ @@ -60,12 +63,29 @@ export class ArweaveGatewayInteractionsLoader implements InteractionsLoader { }; const loadingBenchmark = Benchmark.measure(); - let interactions = (await this.arweaveTransactionQuery.transactions(mainTransactionsQuery)).filter( - bundledTxsFilter - ); + let interactions = ( + await this.arweaveTransactionQuery.transactions(mainTransactionsQuery, pagesPerBatch, signal) + ).filter(bundledTxsFilter); loadingBenchmark.stop(); + if (evaluationOptions?.transactionsPagesPerBatch && interactions.length > 0) { + interactions = await this.sorter.sort(interactions); + toBlockHeight = interactions[interactions.length - 1].node.block.height; + } if (evaluationOptions.internalWrites) { + const pagesPerBatchIw = (function () { + if (evaluationOptions?.transactionsPagesPerBatch) { + if (interactions.length > 0) { + // note: the limit in this case is the block height of the last 'direct' interaction + return Number.MAX_SAFE_INTEGER; + } else { + return evaluationOptions?.transactionsPagesPerBatch; + } + } else { + return Number.MAX_SAFE_INTEGER; + } + })(); + const innerWritesVariables: ArweaveTransactionQuery = { tags: [ { @@ -79,9 +99,9 @@ export class ArweaveGatewayInteractionsLoader implements InteractionsLoader { }, first: MAX_REQUEST }; - const innerWritesInteractions = (await this.arweaveTransactionQuery.transactions(innerWritesVariables)).filter( - bundledTxsFilter - ); + const innerWritesInteractions = ( + await this.arweaveTransactionQuery.transactions(innerWritesVariables, pagesPerBatchIw, signal) + ).filter(bundledTxsFilter); this.logger.debug('Inner writes interactions length:', innerWritesInteractions.length); interactions = interactions.concat(innerWritesInteractions); diff --git a/src/core/modules/impl/DefaultStateEvaluator.ts b/src/core/modules/impl/DefaultStateEvaluator.ts index 5217eb33..b4f46eeb 100644 --- a/src/core/modules/impl/DefaultStateEvaluator.ts +++ b/src/core/modules/impl/DefaultStateEvaluator.ts @@ -9,7 +9,7 @@ import { Benchmark } from '../../../logging/Benchmark'; import { LoggerFactory } from '../../../logging/LoggerFactory'; import { indent } from '../../../utils/utils'; import { EvalStateResult, StateEvaluator, CustomEvent } from '../StateEvaluator'; -import { ContractInteraction, HandlerApi, InteractionResult } from './HandlerExecutorFactory'; +import { AbortError, ContractInteraction, HandlerApi, InteractionResult } from "./HandlerExecutorFactory"; import { TagsParser } from './TagsParser'; import { VrfPluginFunctions } from '../../WarpPlugin'; import { BasicSortKeyCache } from '../../../cache/BasicSortKeyCache'; @@ -55,7 +55,7 @@ export abstract class DefaultStateEvaluator implements StateEvaluator { executionContext: ExecutionContext> ): Promise>> { const { ignoreExceptions, stackTrace, internalWrites } = executionContext.evaluationOptions; - const { contract, contractDefinition, sortedInteractions, warp } = executionContext; + const { contract, contractDefinition, sortedInteractions, warp, signal } = executionContext; let currentState = baseState.state; let currentSortKey = null; @@ -89,6 +89,9 @@ export abstract class DefaultStateEvaluator implements StateEvaluator { if (shouldBreakAfterEvolve) { break; } + if (signal?.aborted) { + throw new AbortError(`Abort signal in ${DefaultStateEvaluator.name}`); + } const missingInteraction = missingInteractions[i]; currentSortKey = missingInteraction.sortKey; @@ -151,7 +154,7 @@ export abstract class DefaultStateEvaluator implements StateEvaluator { let newState: EvalStateResult = null; let writingContractState: SortKeyCacheResult> = null; try { - writingContractState = await writingContract.readState(missingInteraction.sortKey); + writingContractState = await writingContract.readState(missingInteraction.sortKey, undefined, signal); newState = contract.interactionState().get(contract.txId(), missingInteraction.sortKey); } catch (e) { // ppe: not sure why we're not handling all ContractErrors here... diff --git a/src/core/modules/impl/HandlerExecutorFactory.ts b/src/core/modules/impl/HandlerExecutorFactory.ts index bbb4aa61..1b3904d4 100644 --- a/src/core/modules/impl/HandlerExecutorFactory.ts +++ b/src/core/modules/impl/HandlerExecutorFactory.ts @@ -36,6 +36,13 @@ export class NonWhitelistedSourceError extends Error { } } +export class AbortError extends Error { + constructor(readonly error: T) { + super(error.toString()); + this.name = KnownErrors.AbortError; + } +} + /** * A factory that produces handlers that are compatible with the "current" style of * writing SW contracts (i.e. using "handle" function). diff --git a/src/core/modules/impl/WarpGatewayInteractionsLoader.ts b/src/core/modules/impl/WarpGatewayInteractionsLoader.ts index b7aa0e8d..69ba37f0 100644 --- a/src/core/modules/impl/WarpGatewayInteractionsLoader.ts +++ b/src/core/modules/impl/WarpGatewayInteractionsLoader.ts @@ -6,6 +6,7 @@ import { getJsonResponse, stripTrailingSlash } from '../../../utils/utils'; import { GW_TYPE, InteractionsLoader } from '../InteractionsLoader'; import { EvaluationOptions } from '../StateEvaluator'; import { Warp } from '../../Warp'; +import { AbortError } from "./HandlerExecutorFactory"; export type ConfirmationStatus = | { @@ -65,7 +66,8 @@ export class WarpGatewayInteractionsLoader implements InteractionsLoader { contractId: string, fromSortKey?: string, toSortKey?: string, - evaluationOptions?: EvaluationOptions + evaluationOptions?: EvaluationOptions, + signal?: AbortSignal ): Promise { this.logger.debug('Loading interactions: for ', { contractId, fromSortKey, toSortKey }); @@ -73,6 +75,8 @@ export class WarpGatewayInteractionsLoader implements InteractionsLoader { let page = 0; let limit = 0; let items = 0; + const pagesPerBatch = evaluationOptions?.transactionsPagesPerBatch || Number.MAX_SAFE_INTEGER; + this.logger.debug('Pages per batch:', pagesPerBatch); const effectiveSourceType = evaluationOptions ? evaluationOptions.sourceType : this.source; const benchmarkTotalTime = Benchmark.measure(); @@ -80,6 +84,9 @@ export class WarpGatewayInteractionsLoader implements InteractionsLoader { do { const benchmarkRequestTime = Benchmark.measure(); + if (signal?.aborted) { + throw new AbortError(`Abort signal in ${WarpGatewayInteractionsLoader.name}`); + } const url = `${baseUrl}/gateway/v2/interactions-sort-key`; @@ -109,7 +116,7 @@ export class WarpGatewayInteractionsLoader implements InteractionsLoader { items = response.paging.items; this.logger.debug(`Loaded interactions length: ${interactions.length}, from: ${fromSortKey}, to: ${toSortKey}`); - } while (items == limit); // note: items < limit means that we're on the last page + } while (items == limit && page < pagesPerBatch); // note: items < limit means that we're on the last page this.logger.debug('All loaded interactions:', { from: fromSortKey, diff --git a/src/core/modules/impl/handler/AbstractContractHandler.ts b/src/core/modules/impl/handler/AbstractContractHandler.ts index 2867a127..b3ad6e40 100644 --- a/src/core/modules/impl/handler/AbstractContractHandler.ts +++ b/src/core/modules/impl/handler/AbstractContractHandler.ts @@ -63,7 +63,7 @@ export abstract class AbstractContractHandler implements HandlerApi(input, this.swGlobal._activeTx); + const result = await calleeContract.applyInput(input, this.swGlobal._activeTx, executionContext.signal); this.logger.debug('Cache result?:', !this.swGlobal._activeTx.dry); const shouldAutoThrow = @@ -117,7 +117,7 @@ export abstract class AbstractContractHandler implements HandlerApi(input, this.swGlobal._activeTx); + return await childContract.viewStateForTx(input, this.swGlobal._activeTx, executionContext.signal); }; } @@ -130,14 +130,14 @@ export abstract class AbstractContractHandler implements HandlerApi extends AbstractContractHandler { diff --git a/tools/contract-abort.ts b/tools/contract-abort.ts new file mode 100644 index 00000000..eb559667 --- /dev/null +++ b/tools/contract-abort.ts @@ -0,0 +1,52 @@ +/* eslint-disable */ +import {defaultCacheOptions, LoggerFactory, WarpFactory} from '../src'; + +const logger = LoggerFactory.INST.create('Contract'); + + +LoggerFactory.INST.logLevel('error'); +LoggerFactory.INST.logLevel('debug', 'HandlerBasedContract'); +LoggerFactory.INST.logLevel('debug', 'CacheableStateEvaluator'); + + +async function main() { + const heapUsedBefore = Math.round((process.memoryUsage().heapUsed / 1024 / 1024) * 100) / 100; + const rssUsedBefore = Math.round((process.memoryUsage().rss / 1024 / 1024) * 100) / 100; + + const warp = WarpFactory + .forMainnet({ ...defaultCacheOptions, inMemory: true }); + + try { + + const signal = AbortSignal.timeout(2000); + const contract = warp + .contract("Daj-MNSnH55TDfxqC7v4eq0lKzVIwh98srUaWqyuZtY") + .setEvaluationOptions({ + maxCallDepth: 5, + maxInteractionEvaluationTimeSeconds: 10000, + allowBigInt: true, + unsafeClient: 'skip', + }); + const result = await contract.readStateBatch(1, signal); + console.dir(result.cachedValue.state, {depth: null}); + } catch (e) { + console.error(e); + } + + const heapUsedAfter = Math.round((process.memoryUsage().heapUsed / 1024 / 1024) * 100) / 100; + const rssUsedAfter = Math.round((process.memoryUsage().rss / 1024 / 1024) * 100) / 100; + logger.warn('Heap used in MB', { + usedBefore: heapUsedBefore, + usedAfter: heapUsedAfter + }); + + logger.info('RSS used in MB', { + usedBefore: rssUsedBefore, + usedAfter: rssUsedAfter + }); + + return; +} + + +main().catch((e) => console.error(e));