From ea2c1240a296d48f3ec7aeb78933fc7643550133 Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Tue, 31 Dec 2024 10:35:13 -0300 Subject: [PATCH 1/4] feat: transaction manager w/callback pattern --- packages/repository/src/external.ts | 3 + .../applicationPayoutRepository.interface.ts | 11 ++- .../applicationRepository.interface.ts | 10 ++- .../donationRepository.interface.ts | 11 ++- packages/repository/src/interfaces/index.ts | 1 + .../interfaces/projectRepository.interface.ts | 29 ++++++-- .../interfaces/roundRepository.interface.ts | 25 +++++-- .../transactionManager.interface.ts | 20 ++++++ .../kysely/application.repository.ts | 16 ++--- .../kysely/applicationPayout.repository.ts | 16 +++-- .../kysely/donation.repository.ts | 19 ++--- .../src/repositories/kysely/index.ts | 1 + .../repositories/kysely/project.repository.ts | 47 ++++++------- .../repositories/kysely/round.repository.ts | 69 +++++++++---------- .../repositories/kysely/transactionManager.ts | 12 ++++ packages/repository/src/types/index.ts | 1 + .../repository/src/types/transaction.types.ts | 8 +++ 17 files changed, 193 insertions(+), 106 deletions(-) create mode 100644 packages/repository/src/interfaces/transactionManager.interface.ts create mode 100644 packages/repository/src/repositories/kysely/transactionManager.ts create mode 100644 packages/repository/src/types/transaction.types.ts diff --git a/packages/repository/src/external.ts b/packages/repository/src/external.ts index 9966150..ed021da 100644 --- a/packages/repository/src/external.ts +++ b/packages/repository/src/external.ts @@ -70,4 +70,7 @@ export { export type { StrategyProcessingCheckpoint, NewStrategyProcessingCheckpoint } from "./internal.js"; +export type { ITransactionManager, TransactionConnection } from "./internal.js"; +export { KyselyTransactionManager } from "./internal.js"; + export { createKyselyPostgresDb as createKyselyDatabase } from "./internal.js"; diff --git a/packages/repository/src/interfaces/applicationPayoutRepository.interface.ts b/packages/repository/src/interfaces/applicationPayoutRepository.interface.ts index 6499187..2c66693 100644 --- a/packages/repository/src/interfaces/applicationPayoutRepository.interface.ts +++ b/packages/repository/src/interfaces/applicationPayoutRepository.interface.ts @@ -1,10 +1,17 @@ import { NewApplicationPayout } from "../types/applicationPayout.types.js"; +import { TransactionConnection } from "../types/transaction.types.js"; -export interface IApplicationPayoutRepository { +export interface IApplicationPayoutRepository< + TxConnection extends TransactionConnection = TransactionConnection, +> { /** * Inserts a new application payout into the database. * @param applicationPayout - The new application payout to insert. + * @param tx Optional transaction connection * @returns A promise that resolves when the application payout is inserted. */ - insertApplicationPayout(applicationPayout: NewApplicationPayout): Promise; + insertApplicationPayout( + applicationPayout: NewApplicationPayout, + tx?: TxConnection, + ): Promise; } diff --git a/packages/repository/src/interfaces/applicationRepository.interface.ts b/packages/repository/src/interfaces/applicationRepository.interface.ts index 2eb2f60..69b3d09 100644 --- a/packages/repository/src/interfaces/applicationRepository.interface.ts +++ b/packages/repository/src/interfaces/applicationRepository.interface.ts @@ -1,6 +1,7 @@ import { Address, ChainId } from "@grants-stack-indexer/shared"; import { Application, NewApplication, PartialApplication } from "../types/application.types.js"; +import { TransactionConnection } from "../types/transaction.types.js"; export interface IApplicationReadRepository { /** @@ -65,22 +66,27 @@ export interface IApplicationReadRepository { getApplicationsByRoundId(chainId: ChainId, roundId: string): Promise; } -export interface IApplicationRepository extends IApplicationReadRepository { +export interface IApplicationRepository< + TxConnection extends TransactionConnection = TransactionConnection, +> extends IApplicationReadRepository { /** * Inserts a new application into the repository. * @param application The new application to insert. + * @param tx Optional transaction connection * @returns A promise that resolves when the insertion is complete. */ - insertApplication(application: NewApplication): Promise; + insertApplication(application: NewApplication, tx?: TxConnection): Promise; /** * Updates an existing application in the repository. * @param where An object containing the (id, chainId, and roundId) of the application to update. * @param application The partial application data to update. + * @param tx Optional transaction connection * @returns A promise that resolves when the update is complete. */ updateApplication( where: { id: string; chainId: ChainId; roundId: string }, application: PartialApplication, + tx?: TxConnection, ): Promise; } diff --git a/packages/repository/src/interfaces/donationRepository.interface.ts b/packages/repository/src/interfaces/donationRepository.interface.ts index 4ddccc5..45f2b00 100644 --- a/packages/repository/src/interfaces/donationRepository.interface.ts +++ b/packages/repository/src/interfaces/donationRepository.interface.ts @@ -1,17 +1,22 @@ import { NewDonation } from "../internal.js"; +import { TransactionConnection } from "../types/transaction.types.js"; -export interface IDonationRepository { +export interface IDonationRepository< + TxConnection extends TransactionConnection = TransactionConnection, +> { /** * Insert a single donation * @param donation The donation to insert + * @param tx Optional transaction connection * @returns A promise that resolves when the donation is inserted */ - insertDonation(donation: NewDonation): Promise; + insertDonation(donation: NewDonation, tx?: TxConnection): Promise; /** * Insert many donations * @param donations The donations to insert + * @param tx Optional transaction connection * @returns A promise that resolves when the donations are inserted */ - insertManyDonations(donations: NewDonation[]): Promise; + insertManyDonations(donations: NewDonation[], tx?: TxConnection): Promise; } diff --git a/packages/repository/src/interfaces/index.ts b/packages/repository/src/interfaces/index.ts index f8818a5..391b7c9 100644 --- a/packages/repository/src/interfaces/index.ts +++ b/packages/repository/src/interfaces/index.ts @@ -6,3 +6,4 @@ export * from "./applicationPayoutRepository.interface.js"; export * from "./strategyRepository.interface.js"; export * from "./eventsRepository.interface.js"; export * from "./strategyProcessingCheckpointRepository.interface.js"; +export * from "./transactionManager.interface.js"; diff --git a/packages/repository/src/interfaces/projectRepository.interface.ts b/packages/repository/src/interfaces/projectRepository.interface.ts index 4e70c98..6f5732c 100644 --- a/packages/repository/src/interfaces/projectRepository.interface.ts +++ b/packages/repository/src/interfaces/projectRepository.interface.ts @@ -9,6 +9,7 @@ import { Project, ProjectRoleNames, } from "../types/project.types.js"; +import { TransactionConnection } from "../types/transaction.types.js"; export interface IProjectReadRepository { /** @@ -67,28 +68,37 @@ export interface IProjectReadRepository { getProjectByAnchorOrThrow(chainId: ChainId, anchorAddress: Address): Promise; } -export interface IProjectRepository extends IProjectReadRepository { +export interface IProjectRepository< + TxConnection extends TransactionConnection = TransactionConnection, +> extends IProjectReadRepository { /** * Inserts a new project into the repository. * @param project The new project to be inserted. + * @param tx Optional transaction connection * @returns A promise that resolves when the insertion is complete. */ - insertProject(project: NewProject): Promise; + insertProject(project: NewProject, tx?: TxConnection): Promise; /** * Updates an existing project in the repository. * @param where An object containing the id and chainId to identify the project to update. * @param project The partial project data to update. + * @param tx Optional transaction connection * @returns A promise that resolves when the update is complete. */ - updateProject(where: { id: string; chainId: ChainId }, project: PartialProject): Promise; + updateProject( + where: { id: string; chainId: ChainId }, + project: PartialProject, + tx?: TxConnection, + ): Promise; /** * Inserts a new project role into the repository. * @param projectRole The new project role to be inserted. + * @param tx Optional transaction connection * @returns A promise that resolves when the insertion is complete. */ - insertProjectRole(projectRole: NewProjectRole): Promise; + insertProjectRole(projectRole: NewProjectRole, tx?: TxConnection): Promise; /** * Deletes multiple project roles based on the provided criteria. @@ -96,6 +106,7 @@ export interface IProjectRepository extends IProjectReadRepository { * @param projectId The project ID of the roles to delete. * @param role The role type to delete. * @param address Optional address to further filter the roles to delete. + * @param tx Optional transaction connection * @returns A promise that resolves when the deletion is complete. */ deleteManyProjectRoles( @@ -103,19 +114,25 @@ export interface IProjectRepository extends IProjectReadRepository { projectId: string, role: ProjectRoleNames, address?: Address, + tx?: TxConnection, ): Promise; /** * Inserts a new pending project role into the repository. * @param pendingProjectRole The new pending project role to be inserted. + * @param tx Optional transaction connection * @returns A promise that resolves when the insertion is complete. */ - insertPendingProjectRole(pendingProjectRole: NewPendingProjectRole): Promise; + insertPendingProjectRole( + pendingProjectRole: NewPendingProjectRole, + tx?: TxConnection, + ): Promise; /** * Deletes multiple pending project roles based on their IDs. * @param ids An array of IDs of the pending project roles to delete. + * @param tx Optional transaction connection * @returns A promise that resolves when the deletion is complete. */ - deleteManyPendingProjectRoles(ids: number[]): Promise; + deleteManyPendingProjectRoles(ids: number[], tx?: TxConnection): Promise; } diff --git a/packages/repository/src/interfaces/roundRepository.interface.ts b/packages/repository/src/interfaces/roundRepository.interface.ts index e94fe2c..1284e86 100644 --- a/packages/repository/src/interfaces/roundRepository.interface.ts +++ b/packages/repository/src/interfaces/roundRepository.interface.ts @@ -10,6 +10,7 @@ import { RoundRole, RoundRoleNames, } from "../types/round.types.js"; +import { TransactionConnection } from "../types/transaction.types.js"; export interface IRoundReadRepository { /** @@ -94,23 +95,28 @@ export interface IRoundReadRepository { getPendingRoundRoles(chainId: ChainId, role: RoundRoleNames): Promise; } -export interface IRoundRepository extends IRoundReadRepository { +export interface IRoundRepository< + TxConnection extends TransactionConnection = TransactionConnection, +> extends IRoundReadRepository { /** * Inserts a new round into the repository. * @param round The new round to insert. + * @param tx Optional transaction connection * @returns A promise that resolves when the insertion is complete. */ - insertRound(round: NewRound): Promise; + insertRound(round: NewRound, tx?: TxConnection): Promise; /** * Updates an existing round in the repository. * @param where An object containing the id and chainId of the round to update. * @param round The partial round data to update. + * @param tx Optional transaction connection * @returns A promise that resolves when the update is complete. */ updateRound( where: { id: string; chainId: ChainId } | { chainId: ChainId; strategyAddress: Address }, round: PartialRound, + tx?: TxConnection, ): Promise; /** @@ -118,6 +124,7 @@ export interface IRoundRepository extends IRoundReadRepository { * @param where An object containing the chainId and roundId of the round to update. * @param amount The amount to increment by. * @param amountInUsd The amount in USD to increment by. + * @param tx Optional transaction connection * @returns A promise that resolves when the increment is complete. */ incrementRoundFunds( @@ -127,12 +134,14 @@ export interface IRoundRepository extends IRoundReadRepository { }, amount: bigint, amountInUsd: string, + tx?: TxConnection, ): Promise; /** * Increments the total distributed amount for a specific round. * @param where An object containing the chainId and roundId of the round to update. * @param amount The amount to increment by. + * @param tx Optional transaction connection * @returns A promise that resolves when the increment is complete. */ incrementRoundTotalDistributed( @@ -141,14 +150,16 @@ export interface IRoundRepository extends IRoundReadRepository { roundId: string; }, amount: bigint, + tx?: TxConnection, ): Promise; /** * Inserts a new round role into the repository. * @param roundRole The new round role to insert. + * @param tx Optional transaction connection * @returns A promise that resolves when the insertion is complete. */ - insertRoundRole(roundRole: NewRoundRole): Promise; + insertRoundRole(roundRole: NewRoundRole, tx?: TxConnection): Promise; /** * Deletes multiple round roles based on chain ID, round ID, role, and address. @@ -156,6 +167,7 @@ export interface IRoundRepository extends IRoundReadRepository { * @param roundId The round ID of the roles to delete. * @param role The role name of the roles to delete. * @param address The address associated with the roles to delete. + * @param tx Optional transaction connection * @returns A promise that resolves when the deletion is complete. */ deleteManyRoundRolesByRoleAndAddress( @@ -163,19 +175,22 @@ export interface IRoundRepository extends IRoundReadRepository { roundId: string, role: RoundRoleNames, address: Address, + tx?: TxConnection, ): Promise; /** * Inserts a new pending round role into the repository. * @param pendingRoundRole The new pending round role to insert. + * @param tx Optional transaction connection * @returns A promise that resolves when the insertion is complete. */ - insertPendingRoundRole(pendingRoundRole: NewPendingRoundRole): Promise; + insertPendingRoundRole(pendingRoundRole: NewPendingRoundRole, tx?: TxConnection): Promise; /** * Deletes multiple pending round roles by their IDs. * @param ids An array of IDs of the pending round roles to delete. + * @param tx Optional transaction connection * @returns A promise that resolves when the deletion is complete. */ - deleteManyPendingRoundRoles(ids: number[]): Promise; + deleteManyPendingRoundRoles(ids: number[], tx?: TxConnection): Promise; } diff --git a/packages/repository/src/interfaces/transactionManager.interface.ts b/packages/repository/src/interfaces/transactionManager.interface.ts new file mode 100644 index 0000000..7da171f --- /dev/null +++ b/packages/repository/src/interfaces/transactionManager.interface.ts @@ -0,0 +1,20 @@ +import { TransactionConnection } from "../internal.js"; + +/** + * The ITransactionManager interface provides a generic transaction management solution using a callback pattern. + * + * The generic type parameter TxConnection extends TransactionConnection to allow for different transaction + * connection implementations while maintaining type safety. + */ +export interface ITransactionManager< + TxConnection extends TransactionConnection = TransactionConnection, +> { + /* + * Provides a transaction connection to the given function. + * If the function throws an error, the transaction will be rolled back. + * If the function returns a promise, the transaction will be committed after the promise is resolved. + * + * Note: only DB calls that use the provided transaction connection will be executed in the transaction. + */ + runInTransaction(fn: (tx: TxConnection) => Promise): Promise; +} diff --git a/packages/repository/src/repositories/kysely/application.repository.ts b/packages/repository/src/repositories/kysely/application.repository.ts index 48d6f8f..4d1e202 100644 --- a/packages/repository/src/repositories/kysely/application.repository.ts +++ b/packages/repository/src/repositories/kysely/application.repository.ts @@ -11,7 +11,7 @@ import { PartialApplication, } from "../../internal.js"; -export class KyselyApplicationRepository implements IApplicationRepository { +export class KyselyApplicationRepository implements IApplicationRepository> { constructor( private readonly db: Kysely, private readonly schemaName: string, @@ -96,25 +96,23 @@ export class KyselyApplicationRepository implements IApplicationRepository { } /* @inheritdoc */ - async insertApplication(application: NewApplication): Promise { + async insertApplication(application: NewApplication, tx?: Kysely): Promise { const _application = this.formatApplication(application); + const queryBuilder = (tx || this.db).withSchema(this.schemaName); - await this.db - .withSchema(this.schemaName) - .insertInto("applications") - .values(_application) - .execute(); + await queryBuilder.insertInto("applications").values(_application).execute(); } /* @inheritdoc */ async updateApplication( where: { id: string; chainId: ChainId; roundId: string }, application: PartialApplication, + tx?: Kysely, ): Promise { const _application = this.formatApplication(application); + const queryBuilder = (tx || this.db).withSchema(this.schemaName); - await this.db - .withSchema(this.schemaName) + await queryBuilder .updateTable("applications") .set(_application) .where("id", "=", where.id) diff --git a/packages/repository/src/repositories/kysely/applicationPayout.repository.ts b/packages/repository/src/repositories/kysely/applicationPayout.repository.ts index b5d0196..a384e8a 100644 --- a/packages/repository/src/repositories/kysely/applicationPayout.repository.ts +++ b/packages/repository/src/repositories/kysely/applicationPayout.repository.ts @@ -2,18 +2,20 @@ import { Kysely } from "kysely"; import { Database, IApplicationPayoutRepository, NewApplicationPayout } from "../../internal.js"; -export class KyselyApplicationPayoutRepository implements IApplicationPayoutRepository { +export class KyselyApplicationPayoutRepository + implements IApplicationPayoutRepository> +{ constructor( private readonly db: Kysely, private readonly schemaName: string, ) {} /** @inheritdoc */ - async insertApplicationPayout(applicationPayout: NewApplicationPayout): Promise { - await this.db - .withSchema(this.schemaName) - .insertInto("applicationsPayouts") - .values(applicationPayout) - .execute(); + async insertApplicationPayout( + applicationPayout: NewApplicationPayout, + tx?: Kysely, + ): Promise { + const queryBuilder = (tx || this.db).withSchema(this.schemaName); + await queryBuilder.insertInto("applicationsPayouts").values(applicationPayout).execute(); } } diff --git a/packages/repository/src/repositories/kysely/donation.repository.ts b/packages/repository/src/repositories/kysely/donation.repository.ts index 85a762d..bf4a001 100644 --- a/packages/repository/src/repositories/kysely/donation.repository.ts +++ b/packages/repository/src/repositories/kysely/donation.repository.ts @@ -1,18 +1,18 @@ import { Kysely } from "kysely"; -import { IDonationRepository } from "../../interfaces/donationRepository.interface.js"; -import { Database, NewDonation } from "../../internal.js"; +import { Database, IDonationRepository, NewDonation } from "../../internal.js"; -export class KyselyDonationRepository implements IDonationRepository { +export class KyselyDonationRepository implements IDonationRepository> { constructor( private readonly db: Kysely, private readonly schemaName: string, ) {} /** @inheritdoc */ - async insertDonation(donation: NewDonation): Promise { - await this.db - .withSchema(this.schemaName) + async insertDonation(donation: NewDonation, tx?: Kysely): Promise { + const queryBuilder = (tx || this.db).withSchema(this.schemaName); + + await queryBuilder .insertInto("donations") .values(donation) .onConflict((c) => { @@ -22,9 +22,10 @@ export class KyselyDonationRepository implements IDonationRepository { } /** @inheritdoc */ - async insertManyDonations(donations: NewDonation[]): Promise { - await this.db - .withSchema(this.schemaName) + async insertManyDonations(donations: NewDonation[], tx?: Kysely): Promise { + const queryBuilder = (tx || this.db).withSchema(this.schemaName); + + await queryBuilder .insertInto("donations") .values(donations) .onConflict((c) => { diff --git a/packages/repository/src/repositories/kysely/index.ts b/packages/repository/src/repositories/kysely/index.ts index fcdd4e3..40ed093 100644 --- a/packages/repository/src/repositories/kysely/index.ts +++ b/packages/repository/src/repositories/kysely/index.ts @@ -6,3 +6,4 @@ export * from "./applicationPayout.repository.js"; export * from "./strategyRegistry.repository.js"; export * from "./eventRegistry.repository.js"; export * from "./strategyProcessingCheckpoint.repository.js"; +export * from "./transactionManager.js"; diff --git a/packages/repository/src/repositories/kysely/project.repository.ts b/packages/repository/src/repositories/kysely/project.repository.ts index abba90f..58cfb46 100644 --- a/packages/repository/src/repositories/kysely/project.repository.ts +++ b/packages/repository/src/repositories/kysely/project.repository.ts @@ -15,7 +15,7 @@ import { ProjectRoleNames, } from "../../internal.js"; -export class KyselyProjectRepository implements IProjectRepository { +export class KyselyProjectRepository implements IProjectRepository> { constructor( private readonly db: Kysely, private readonly schemaName: string, @@ -73,17 +73,19 @@ export class KyselyProjectRepository implements IProjectRepository { } /* @inheritdoc */ - async insertProject(project: NewProject): Promise { - await this.db.withSchema(this.schemaName).insertInto("projects").values(project).execute(); + async insertProject(project: NewProject, tx?: Kysely): Promise { + const queryBuilder = (tx || this.db).withSchema(this.schemaName); + await queryBuilder.insertInto("projects").values(project).execute(); } /* @inheritdoc */ async updateProject( where: { id: string; chainId: ChainId }, project: PartialProject, + tx?: Kysely, ): Promise { - await this.db - .withSchema(this.schemaName) + const queryBuilder = (tx || this.db).withSchema(this.schemaName); + await queryBuilder .updateTable("projects") .set(project) .where("id", "=", where.id) @@ -94,12 +96,9 @@ export class KyselyProjectRepository implements IProjectRepository { // ============================ PROJECT ROLES ============================ /* @inheritdoc */ - async insertProjectRole(projectRole: NewProjectRole): Promise { - await this.db - .withSchema(this.schemaName) - .insertInto("projectRoles") - .values(projectRole) - .execute(); + async insertProjectRole(projectRole: NewProjectRole, tx?: Kysely): Promise { + const queryBuilder = (tx || this.db).withSchema(this.schemaName); + await queryBuilder.insertInto("projectRoles").values(projectRole).execute(); } /* @inheritdoc */ @@ -108,9 +107,10 @@ export class KyselyProjectRepository implements IProjectRepository { projectId: string, role: ProjectRoleNames, address?: Address, + tx?: Kysely, ): Promise { - const query = this.db - .withSchema(this.schemaName) + const queryBuilder = (tx || this.db).withSchema(this.schemaName); + const query = queryBuilder .deleteFrom("projectRoles") .where("chainId", "=", chainId) .where("projectId", "=", projectId) @@ -149,20 +149,17 @@ export class KyselyProjectRepository implements IProjectRepository { } /* @inheritdoc */ - async insertPendingProjectRole(pendingProjectRole: NewPendingProjectRole): Promise { - await this.db - .withSchema(this.schemaName) - .insertInto("pendingProjectRoles") - .values(pendingProjectRole) - .execute(); + async insertPendingProjectRole( + pendingProjectRole: NewPendingProjectRole, + tx?: Kysely, + ): Promise { + const queryBuilder = (tx || this.db).withSchema(this.schemaName); + await queryBuilder.insertInto("pendingProjectRoles").values(pendingProjectRole).execute(); } /* @inheritdoc */ - async deleteManyPendingProjectRoles(ids: number[]): Promise { - await this.db - .withSchema(this.schemaName) - .deleteFrom("pendingProjectRoles") - .where("id", "in", ids) - .execute(); + async deleteManyPendingProjectRoles(ids: number[], tx?: Kysely): Promise { + const queryBuilder = (tx || this.db).withSchema(this.schemaName); + await queryBuilder.deleteFrom("pendingProjectRoles").where("id", "in", ids).execute(); } } diff --git a/packages/repository/src/repositories/kysely/round.repository.ts b/packages/repository/src/repositories/kysely/round.repository.ts index 400ab72..356a840 100644 --- a/packages/repository/src/repositories/kysely/round.repository.ts +++ b/packages/repository/src/repositories/kysely/round.repository.ts @@ -17,7 +17,7 @@ import { RoundRoleNames, } from "../../internal.js"; -export class KyselyRoundRepository implements IRoundRepository { +export class KyselyRoundRepository implements IRoundRepository> { constructor( private readonly db: Kysely, private readonly schemaName: string, @@ -114,19 +114,21 @@ export class KyselyRoundRepository implements IRoundRepository { } /* @inheritdoc */ - async insertRound(round: NewRound): Promise { - await this.db.withSchema(this.schemaName).insertInto("rounds").values(round).execute(); + async insertRound(round: NewRound, tx?: Kysely): Promise { + const _round = this.formatRound(round); + const queryBuilder = (tx || this.db).withSchema(this.schemaName); + await queryBuilder.insertInto("rounds").values(_round).execute(); } /* @inheritdoc */ async updateRound( where: { id: string; chainId: ChainId } | { chainId: ChainId; strategyAddress: Address }, round: PartialRound, + tx?: Kysely, ): Promise { const _round = this.formatRound(round); - - const query = this.db - .withSchema(this.schemaName) + const queryBuilder = (tx || this.db).withSchema(this.schemaName); + const query = queryBuilder .updateTable("rounds") .set(_round) .where("chainId", "=", where.chainId); @@ -140,15 +142,13 @@ export class KyselyRoundRepository implements IRoundRepository { /* @inheritdoc */ async incrementRoundFunds( - where: { - chainId: ChainId; - roundId: string; - }, + where: { chainId: ChainId; roundId: string }, amount: bigint, amountInUsd: string, + tx?: Kysely, ): Promise { - await this.db - .withSchema(this.schemaName) + const queryBuilder = (tx || this.db).withSchema(this.schemaName); + await queryBuilder .updateTable("rounds") .set((eb) => ({ fundedAmount: eb("fundedAmount", "+", amount), @@ -161,14 +161,12 @@ export class KyselyRoundRepository implements IRoundRepository { /* @inheritdoc */ async incrementRoundTotalDistributed( - where: { - chainId: ChainId; - roundId: string; - }, + where: { chainId: ChainId; roundId: string }, amount: bigint, + tx?: Kysely, ): Promise { - await this.db - .withSchema(this.schemaName) + const queryBuilder = (tx || this.db).withSchema(this.schemaName); + await queryBuilder .updateTable("rounds") .set((eb) => ({ totalDistributed: eb("totalDistributed", "+", amount), @@ -186,12 +184,9 @@ export class KyselyRoundRepository implements IRoundRepository { } /* @inheritdoc */ - async insertRoundRole(roundRole: NewRoundRole): Promise { - await this.db - .withSchema(this.schemaName) - .insertInto("roundRoles") - .values(roundRole) - .execute(); + async insertRoundRole(roundRole: NewRoundRole, tx?: Kysely): Promise { + const queryBuilder = (tx || this.db).withSchema(this.schemaName); + await queryBuilder.insertInto("roundRoles").values(roundRole).execute(); } /* @inheritdoc */ @@ -200,9 +195,10 @@ export class KyselyRoundRepository implements IRoundRepository { roundId: string, role: RoundRoleNames, address: Address, + tx?: Kysely, ): Promise { - await this.db - .withSchema(this.schemaName) + const queryBuilder = (tx || this.db).withSchema(this.schemaName); + await queryBuilder .deleteFrom("roundRoles") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -228,21 +224,18 @@ export class KyselyRoundRepository implements IRoundRepository { } /* @inheritdoc */ - async insertPendingRoundRole(pendingRoundRole: NewPendingRoundRole): Promise { - await this.db - .withSchema(this.schemaName) - .insertInto("pendingRoundRoles") - .values(pendingRoundRole) - .execute(); + async insertPendingRoundRole( + pendingRoundRole: NewPendingRoundRole, + tx?: Kysely, + ): Promise { + const queryBuilder = (tx || this.db).withSchema(this.schemaName); + await queryBuilder.insertInto("pendingRoundRoles").values(pendingRoundRole).execute(); } /* @inheritdoc */ - async deleteManyPendingRoundRoles(ids: number[]): Promise { - await this.db - .withSchema(this.schemaName) - .deleteFrom("pendingRoundRoles") - .where("id", "in", ids) - .execute(); + async deleteManyPendingRoundRoles(ids: number[], tx?: Kysely): Promise { + const queryBuilder = (tx || this.db).withSchema(this.schemaName); + await queryBuilder.deleteFrom("pendingRoundRoles").where("id", "in", ids).execute(); } /** diff --git a/packages/repository/src/repositories/kysely/transactionManager.ts b/packages/repository/src/repositories/kysely/transactionManager.ts new file mode 100644 index 0000000..bb4f619 --- /dev/null +++ b/packages/repository/src/repositories/kysely/transactionManager.ts @@ -0,0 +1,12 @@ +import { Kysely } from "kysely"; + +import { Database, ITransactionManager } from "../../internal.js"; + +export class KyselyTransactionManager implements ITransactionManager> { + constructor(private readonly db: Kysely) {} + + /** @inheritdoc */ + async runInTransaction(fn: (tx: Kysely) => Promise): Promise { + return this.db.transaction().execute(fn); + } +} diff --git a/packages/repository/src/types/index.ts b/packages/repository/src/types/index.ts index 64da8f9..0a7e401 100644 --- a/packages/repository/src/types/index.ts +++ b/packages/repository/src/types/index.ts @@ -7,3 +7,4 @@ export * from "./applicationPayout.types.js"; export * from "./strategy.types.js"; export * from "./event.types.js"; export * from "./strategyProcessingCheckpoint.types.js"; +export * from "./transaction.types.js"; diff --git a/packages/repository/src/types/transaction.types.ts b/packages/repository/src/types/transaction.types.ts new file mode 100644 index 0000000..89d2347 --- /dev/null +++ b/packages/repository/src/types/transaction.types.ts @@ -0,0 +1,8 @@ +// packages/repository/src/types/transaction.types.ts +import { Kysely } from "kysely"; + +import { Database } from "../internal.js"; + +export type KyselyTransaction = Kysely; + +export type TransactionConnection = KyselyTransaction; From 93c6b91afda64b0756069dc902fcc170da8bf281 Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Tue, 31 Dec 2024 12:23:49 -0300 Subject: [PATCH 2/4] feat: use transaction manager in data loader --- .../services/sharedDependencies.service.ts | 4 + .../unit/sharedDependencies.service.spec.ts | 2 + .../data-flow/src/data-loader/dataLoader.ts | 50 +++----- .../handlers/application.handlers.ts | 12 +- .../handlers/applicationPayout.handlers.ts | 4 +- .../data-loader/handlers/donation.handlers.ts | 8 +- .../data-loader/handlers/project.handlers.ts | 28 ++--- .../data-loader/handlers/round.handlers.ts | 44 ++++--- .../data-flow/src/data-loader/types/index.ts | 3 +- .../src/interfaces/dataLoader.interface.ts | 4 +- packages/data-flow/src/orchestrator.ts | 16 +-- .../data-flow/src/retroactiveProcessor.ts | 13 +- packages/data-flow/src/types/index.ts | 14 +-- .../test/data-loader/dataLoader.spec.ts | 48 +++++--- .../handlers/application.handlers.spec.ts | 12 +- .../applicationPayout.handlers.spec.ts | 42 +++++++ .../handlers/donation.handlers.spec.ts | 54 +++++++++ .../handlers/project.handlers.spec.ts | 36 ++++-- .../handlers/round.handlers.spec.ts | 36 ++++-- .../data-flow/test/unit/orchestrator.spec.ts | 113 ++++++++---------- .../test/unit/retroactiveProcessor.spec.ts | 50 ++------ 21 files changed, 337 insertions(+), 256 deletions(-) create mode 100644 packages/data-flow/test/data-loader/handlers/applicationPayout.handlers.spec.ts create mode 100644 packages/data-flow/test/data-loader/handlers/donation.handlers.spec.ts diff --git a/apps/processing/src/services/sharedDependencies.service.ts b/apps/processing/src/services/sharedDependencies.service.ts index d2d79ea..fc29306 100644 --- a/apps/processing/src/services/sharedDependencies.service.ts +++ b/apps/processing/src/services/sharedDependencies.service.ts @@ -15,6 +15,7 @@ import { KyselyRoundRepository, KyselyStrategyProcessingCheckpointRepository, KyselyStrategyRegistryRepository, + KyselyTransactionManager, } from "@grants-stack-indexer/repository"; import { ILogger, Logger } from "@grants-stack-indexer/shared"; @@ -50,6 +51,8 @@ export class SharedDependenciesService { logger, ); + const transactionManager = new KyselyTransactionManager(kyselyDatabase); + const projectRepository = new KyselyProjectRepository(kyselyDatabase, env.DATABASE_SCHEMA); const roundRepository = new KyselyRoundRepository(kyselyDatabase, env.DATABASE_SCHEMA); const applicationRepository = new KyselyApplicationRepository( @@ -97,6 +100,7 @@ export class SharedDependenciesService { donationRepository, metadataProvider, applicationPayoutRepository, + transactionManager, }, registriesRepositories: { eventRegistryRepository, diff --git a/apps/processing/test/unit/sharedDependencies.service.spec.ts b/apps/processing/test/unit/sharedDependencies.service.spec.ts index df37a6e..cdd3c75 100644 --- a/apps/processing/test/unit/sharedDependencies.service.spec.ts +++ b/apps/processing/test/unit/sharedDependencies.service.spec.ts @@ -42,6 +42,7 @@ vi.mock("@grants-stack-indexer/repository", () => ({ })), KyselyEventRegistryRepository: vi.fn(), KyselyStrategyProcessingCheckpointRepository: vi.fn(), + KyselyTransactionManager: vi.fn(), })); vi.mock("@grants-stack-indexer/pricing", () => ({ @@ -145,6 +146,7 @@ describe("SharedDependenciesService", () => { expect(dependencies.core).toHaveProperty("donationRepository"); expect(dependencies.core).toHaveProperty("metadataProvider"); expect(dependencies.core).toHaveProperty("applicationPayoutRepository"); + expect(dependencies.core).toHaveProperty("transactionManager"); // Verify registries expect(dependencies.registriesRepositories).toHaveProperty("eventRegistryRepository"); diff --git a/packages/data-flow/src/data-loader/dataLoader.ts b/packages/data-flow/src/data-loader/dataLoader.ts index 2f07d1a..d87b0cc 100644 --- a/packages/data-flow/src/data-loader/dataLoader.ts +++ b/packages/data-flow/src/data-loader/dataLoader.ts @@ -5,10 +5,11 @@ import { IDonationRepository, IProjectRepository, IRoundRepository, + ITransactionManager, } from "@grants-stack-indexer/repository"; -import { ILogger, stringify } from "@grants-stack-indexer/shared"; +import { ILogger } from "@grants-stack-indexer/shared"; -import { ExecutionResult, IDataLoader, InvalidChangeset } from "../internal.js"; +import { IDataLoader, InvalidChangeset } from "../internal.js"; import { createApplicationHandlers, createApplicationPayoutHandlers, @@ -42,6 +43,7 @@ export class DataLoader implements IDataLoader { donation: IDonationRepository; applicationPayout: IApplicationPayoutRepository; }, + private readonly transactionManager: ITransactionManager, private readonly logger: ILogger, ) { this.handlers = { @@ -54,40 +56,26 @@ export class DataLoader implements IDataLoader { } /** @inheritdoc */ - public async applyChanges(changesets: Changeset[]): Promise { - const result: ExecutionResult = { - changesets: [], - numExecuted: 0, - numSuccessful: 0, - numFailed: 0, - errors: [], - }; - + public async applyChanges(changesets: Changeset[]): Promise { const invalidTypes = changesets.filter((changeset) => !this.handlers[changeset.type]); if (invalidTypes.length > 0) { throw new InvalidChangeset(invalidTypes.map((changeset) => changeset.type)); } - //TODO: research how to manage transactions so we can rollback on error - for (const changeset of changesets) { - result.numExecuted++; - try { - //TODO: inside each handler, we should add zod validation that the args match the expected type - await this.handlers[changeset.type](changeset as never); - result.changesets.push(changeset.type); - result.numSuccessful++; - } catch (error) { - result.numFailed++; - result.errors.push( - `Failed to apply changeset ${changeset.type}: ${ - error instanceof Error ? error.message : String(error) - }`, - ); - this.logger.error(`${stringify(error, Object.getOwnPropertyNames(error))}`); - break; - } - } + await this.transactionManager.runInTransaction(async (tx) => { + this.logger.debug("Starting transaction..."); + for (const changeset of changesets) { + try { + //TODO: inside each handler, we should add zod validation that the args match the expected type + await this.handlers[changeset.type](changeset as never, tx); + } catch (error) { + this.logger.debug( + `Error applying changeset ${changeset.type}. Rolling back transaction with ${changesets.length} changesets`, + ); - return result; + throw error; + } + } + }); } } diff --git a/packages/data-flow/src/data-loader/handlers/application.handlers.ts b/packages/data-flow/src/data-loader/handlers/application.handlers.ts index 20afd8a..7167efc 100644 --- a/packages/data-flow/src/data-loader/handlers/application.handlers.ts +++ b/packages/data-flow/src/data-loader/handlers/application.handlers.ts @@ -19,12 +19,16 @@ export type ApplicationHandlers = { export const createApplicationHandlers = ( repository: IApplicationRepository, ): ApplicationHandlers => ({ - InsertApplication: (async (changeset): Promise => { - await repository.insertApplication(changeset.args); + InsertApplication: (async (changeset, txConnection): Promise => { + await repository.insertApplication(changeset.args, txConnection); }) satisfies ChangesetHandler<"InsertApplication">, - UpdateApplication: (async (changeset): Promise => { + UpdateApplication: (async (changeset, txConnection): Promise => { const { chainId, roundId, applicationId, application } = changeset.args; - await repository.updateApplication({ chainId, roundId, id: applicationId }, application); + await repository.updateApplication( + { chainId, roundId, id: applicationId }, + application, + txConnection, + ); }) satisfies ChangesetHandler<"UpdateApplication">, }); diff --git a/packages/data-flow/src/data-loader/handlers/applicationPayout.handlers.ts b/packages/data-flow/src/data-loader/handlers/applicationPayout.handlers.ts index f14fc24..a96de33 100644 --- a/packages/data-flow/src/data-loader/handlers/applicationPayout.handlers.ts +++ b/packages/data-flow/src/data-loader/handlers/applicationPayout.handlers.ts @@ -22,7 +22,7 @@ export type ApplicationPayoutHandlers = { export const createApplicationPayoutHandlers = ( repository: IApplicationPayoutRepository, ): ApplicationPayoutHandlers => ({ - InsertApplicationPayout: (async (changeset): Promise => { - await repository.insertApplicationPayout(changeset.args.applicationPayout); + InsertApplicationPayout: (async (changeset, txConnection): Promise => { + await repository.insertApplicationPayout(changeset.args.applicationPayout, txConnection); }) satisfies ChangesetHandler<"InsertApplicationPayout">, }); diff --git a/packages/data-flow/src/data-loader/handlers/donation.handlers.ts b/packages/data-flow/src/data-loader/handlers/donation.handlers.ts index b42d9e0..841fbd3 100644 --- a/packages/data-flow/src/data-loader/handlers/donation.handlers.ts +++ b/packages/data-flow/src/data-loader/handlers/donation.handlers.ts @@ -17,11 +17,11 @@ export type DonationHandlers = { * @returns An object containing all application-related handlers */ export const createDonationHandlers = (repository: IDonationRepository): DonationHandlers => ({ - InsertDonation: (async (changeset): Promise => { - await repository.insertDonation(changeset.args.donation); + InsertDonation: (async (changeset, txConnection): Promise => { + await repository.insertDonation(changeset.args.donation, txConnection); }) satisfies ChangesetHandler<"InsertDonation">, - InsertManyDonations: (async (changeset): Promise => { - await repository.insertManyDonations(changeset.args.donations); + InsertManyDonations: (async (changeset, txConnection): Promise => { + await repository.insertManyDonations(changeset.args.donations, txConnection); }) satisfies ChangesetHandler<"InsertManyDonations">, }); diff --git a/packages/data-flow/src/data-loader/handlers/project.handlers.ts b/packages/data-flow/src/data-loader/handlers/project.handlers.ts index b32a9db..01cb06f 100644 --- a/packages/data-flow/src/data-loader/handlers/project.handlers.ts +++ b/packages/data-flow/src/data-loader/handlers/project.handlers.ts @@ -17,38 +17,38 @@ export type ProjectHandlers = { * @returns An object containing all project-related handlers */ export const createProjectHandlers = (repository: IProjectRepository): ProjectHandlers => ({ - InsertProject: (async (changeset): Promise => { + InsertProject: (async (changeset, txConnection): Promise => { const { project } = changeset.args; - await repository.insertProject(project); + await repository.insertProject(project, txConnection); }) satisfies ChangesetHandler<"InsertProject">, - UpdateProject: (async (changeset): Promise => { + UpdateProject: (async (changeset, txConnection): Promise => { const { chainId, projectId, project } = changeset.args; - await repository.updateProject({ id: projectId, chainId }, project); + await repository.updateProject({ id: projectId, chainId }, project, txConnection); }) satisfies ChangesetHandler<"UpdateProject">, - InsertPendingProjectRole: (async (changeset): Promise => { + InsertPendingProjectRole: (async (changeset, txConnection): Promise => { const { pendingProjectRole } = changeset.args; - await repository.insertPendingProjectRole(pendingProjectRole); + await repository.insertPendingProjectRole(pendingProjectRole, txConnection); }) satisfies ChangesetHandler<"InsertPendingProjectRole">, - DeletePendingProjectRoles: (async (changeset): Promise => { + DeletePendingProjectRoles: (async (changeset, txConnection): Promise => { const { ids } = changeset.args; - await repository.deleteManyPendingProjectRoles(ids); + await repository.deleteManyPendingProjectRoles(ids, txConnection); }) satisfies ChangesetHandler<"DeletePendingProjectRoles">, - InsertProjectRole: (async (changeset): Promise => { + InsertProjectRole: (async (changeset, txConnection): Promise => { const { projectRole } = changeset.args; - await repository.insertProjectRole(projectRole); + await repository.insertProjectRole(projectRole, txConnection); }) satisfies ChangesetHandler<"InsertProjectRole">, - DeleteAllProjectRolesByRole: (async (changeset): Promise => { + DeleteAllProjectRolesByRole: (async (changeset, txConnection): Promise => { const { chainId, projectId, role } = changeset.args.projectRole; - await repository.deleteManyProjectRoles(chainId, projectId, role); + await repository.deleteManyProjectRoles(chainId, projectId, role, undefined, txConnection); }) satisfies ChangesetHandler<"DeleteAllProjectRolesByRole">, - DeleteAllProjectRolesByRoleAndAddress: (async (changeset): Promise => { + DeleteAllProjectRolesByRoleAndAddress: (async (changeset, txConnection): Promise => { const { chainId, projectId, role, address } = changeset.args.projectRole; - await repository.deleteManyProjectRoles(chainId, projectId, role, address); + await repository.deleteManyProjectRoles(chainId, projectId, role, address, txConnection); }) satisfies ChangesetHandler<"DeleteAllProjectRolesByRoleAndAddress">, }); diff --git a/packages/data-flow/src/data-loader/handlers/round.handlers.ts b/packages/data-flow/src/data-loader/handlers/round.handlers.ts index cb6c831..d877850 100644 --- a/packages/data-flow/src/data-loader/handlers/round.handlers.ts +++ b/packages/data-flow/src/data-loader/handlers/round.handlers.ts @@ -17,24 +17,28 @@ export type RoundHandlers = { * @returns An object containing all round-related handlers */ export const createRoundHandlers = (repository: IRoundRepository): RoundHandlers => ({ - InsertRound: (async (changeset): Promise => { + InsertRound: (async (changeset, txConnection): Promise => { const { round } = changeset.args; - await repository.insertRound(round); + await repository.insertRound(round, txConnection); }) satisfies ChangesetHandler<"InsertRound">, - UpdateRound: (async (changeset): Promise => { + UpdateRound: (async (changeset, txConnection): Promise => { const { chainId, roundId, round } = changeset.args; - await repository.updateRound({ id: roundId, chainId }, round); + await repository.updateRound({ id: roundId, chainId }, round, txConnection); }) satisfies ChangesetHandler<"UpdateRound">, - UpdateRoundByStrategyAddress: (async (changeset): Promise => { + UpdateRoundByStrategyAddress: (async (changeset, txConnection): Promise => { const { chainId, strategyAddress, round } = changeset.args; if (round) { - await repository.updateRound({ strategyAddress, chainId: chainId }, round); + await repository.updateRound( + { strategyAddress, chainId: chainId }, + round, + txConnection, + ); } }) satisfies ChangesetHandler<"UpdateRoundByStrategyAddress">, - IncrementRoundFundedAmount: (async (changeset): Promise => { + IncrementRoundFundedAmount: (async (changeset, txConnection): Promise => { const { chainId, roundId, fundedAmount, fundedAmountInUsd } = changeset.args; await repository.incrementRoundFunds( { @@ -43,10 +47,11 @@ export const createRoundHandlers = (repository: IRoundRepository): RoundHandlers }, fundedAmount, fundedAmountInUsd, + txConnection, ); }) satisfies ChangesetHandler<"IncrementRoundFundedAmount">, - IncrementRoundTotalDistributed: (async (changeset): Promise => { + IncrementRoundTotalDistributed: (async (changeset, txConnection): Promise => { const { chainId, roundId, amount } = changeset.args; await repository.incrementRoundTotalDistributed( { @@ -54,26 +59,33 @@ export const createRoundHandlers = (repository: IRoundRepository): RoundHandlers roundId, }, amount, + txConnection, ); }) satisfies ChangesetHandler<"IncrementRoundTotalDistributed">, - InsertPendingRoundRole: (async (changeset): Promise => { + InsertPendingRoundRole: (async (changeset, txConnection): Promise => { const { pendingRoundRole } = changeset.args; - await repository.insertPendingRoundRole(pendingRoundRole); + await repository.insertPendingRoundRole(pendingRoundRole, txConnection); }) satisfies ChangesetHandler<"InsertPendingRoundRole">, - DeletePendingRoundRoles: (async (changeset): Promise => { + DeletePendingRoundRoles: (async (changeset, txConnection): Promise => { const { ids } = changeset.args; - await repository.deleteManyPendingRoundRoles(ids); + await repository.deleteManyPendingRoundRoles(ids, txConnection); }) satisfies ChangesetHandler<"DeletePendingRoundRoles">, - InsertRoundRole: (async (changeset): Promise => { + InsertRoundRole: (async (changeset, txConnection): Promise => { const { roundRole } = changeset.args; - await repository.insertRoundRole(roundRole); + await repository.insertRoundRole(roundRole, txConnection); }) satisfies ChangesetHandler<"InsertRoundRole">, - DeleteAllRoundRolesByRoleAndAddress: (async (changeset): Promise => { + DeleteAllRoundRolesByRoleAndAddress: (async (changeset, txConnection): Promise => { const { chainId, roundId, role, address } = changeset.args.roundRole; - await repository.deleteManyRoundRolesByRoleAndAddress(chainId, roundId, role, address); + await repository.deleteManyRoundRolesByRoleAndAddress( + chainId, + roundId, + role, + address, + txConnection, + ); }) satisfies ChangesetHandler<"DeleteAllRoundRolesByRoleAndAddress">, }); diff --git a/packages/data-flow/src/data-loader/types/index.ts b/packages/data-flow/src/data-loader/types/index.ts index c8cae2b..9637860 100644 --- a/packages/data-flow/src/data-loader/types/index.ts +++ b/packages/data-flow/src/data-loader/types/index.ts @@ -1,7 +1,8 @@ -import { Changeset } from "@grants-stack-indexer/repository"; +import { Changeset, TransactionConnection } from "@grants-stack-indexer/repository"; export type ChangesetHandler = ( changeset: Extract, + txConnection?: TransactionConnection, ) => Promise; export type ChangesetHandlers = { diff --git a/packages/data-flow/src/interfaces/dataLoader.interface.ts b/packages/data-flow/src/interfaces/dataLoader.interface.ts index 16a28fb..3b78046 100644 --- a/packages/data-flow/src/interfaces/dataLoader.interface.ts +++ b/packages/data-flow/src/interfaces/dataLoader.interface.ts @@ -1,7 +1,5 @@ import type { Changeset } from "@grants-stack-indexer/repository"; -import type { ExecutionResult } from "../internal.js"; - export interface IDataLoader { /** * Applies the changesets to the database. @@ -9,5 +7,5 @@ export interface IDataLoader { * @returns The execution result. * @throws {InvalidChangeset} if there are changesets with invalid types. */ - applyChanges(changesets: Changeset[]): Promise; + applyChanges(changesets: Changeset[]): Promise; } diff --git a/packages/data-flow/src/orchestrator.ts b/packages/data-flow/src/orchestrator.ts index 89c9bcf..0265750 100644 --- a/packages/data-flow/src/orchestrator.ts +++ b/packages/data-flow/src/orchestrator.ts @@ -94,6 +94,7 @@ export class Orchestrator { donation: this.dependencies.donationRepository, applicationPayout: this.dependencies.applicationPayoutRepository, }, + this.dependencies.transactionManager, this.logger, ); this.eventsQueue = new Queue>(fetchLimit); @@ -145,20 +146,7 @@ export class Orchestrator { } const changesets = await this.eventsProcessor.processEvent(event); - const executionResult = await this.dataLoader.applyChanges(changesets); - - if (executionResult.numFailed > 0) { - //TODO: should we retry the failed changesets? - this.logger.error( - `Failed to apply changesets. ${executionResult.errors.join("\n")} Event: ${stringify( - event, - )}`, - { - className: Orchestrator.name, - chainId: this.chainId, - }, - ); - } + await this.dataLoader.applyChanges(changesets); } catch (error: unknown) { // TODO: improve error handling, retries and notify if ( diff --git a/packages/data-flow/src/retroactiveProcessor.ts b/packages/data-flow/src/retroactiveProcessor.ts index 1553701..ddb25c6 100644 --- a/packages/data-flow/src/retroactiveProcessor.ts +++ b/packages/data-flow/src/retroactiveProcessor.ts @@ -100,6 +100,7 @@ export class RetroactiveProcessor { donation: this.dependencies.donationRepository, applicationPayout: this.dependencies.applicationPayoutRepository, }, + this.dependencies.transactionManager, this.logger, ); } @@ -208,17 +209,7 @@ export class RetroactiveProcessor { event.strategyId = strategyId; const changesets = await this.eventsProcessor.processEvent(event); - const executionResult = await this.dataLoader.applyChanges(changesets); - - if (executionResult.numFailed > 0) { - this.logger.error( - `Failed to apply changesets. ${executionResult.errors.join("\n")} Event: ${stringify(event)}`, - { - className: RetroactiveProcessor.name, - chainId: this.chainId, - }, - ); - } + await this.dataLoader.applyChanges(changesets); } catch (error) { if (error instanceof InvalidEvent || error instanceof UnsupportedEventException) { // Expected errors that we can safely ignore diff --git a/packages/data-flow/src/types/index.ts b/packages/data-flow/src/types/index.ts index a83c8d8..563071a 100644 --- a/packages/data-flow/src/types/index.ts +++ b/packages/data-flow/src/types/index.ts @@ -1,24 +1,13 @@ import { ProcessorDependencies } from "@grants-stack-indexer/processors"; import { - Changeset, IApplicationPayoutRepository, IApplicationRepository, IDonationRepository, IProjectRepository, IRoundRepository, + ITransactionManager, } from "@grants-stack-indexer/repository"; -/** - * The result of the execution of the changesets. - */ -export type ExecutionResult = { - changesets: Changeset["type"][]; - numExecuted: number; - numSuccessful: number; - numFailed: number; - errors: string[]; -}; - /** * The core dependencies for the data flow * @@ -35,4 +24,5 @@ export type CoreDependencies = Pick< applicationRepository: IApplicationRepository; donationRepository: IDonationRepository; applicationPayoutRepository: IApplicationPayoutRepository; + transactionManager: ITransactionManager; }; diff --git a/packages/data-flow/test/data-loader/dataLoader.spec.ts b/packages/data-flow/test/data-loader/dataLoader.spec.ts index 167284b..f0f62e7 100644 --- a/packages/data-flow/test/data-loader/dataLoader.spec.ts +++ b/packages/data-flow/test/data-loader/dataLoader.spec.ts @@ -1,4 +1,4 @@ -import { beforeEach, describe, expect, it, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { Changeset, @@ -7,6 +7,8 @@ import { IDonationRepository, IProjectRepository, IRoundRepository, + ITransactionManager, + TransactionConnection, } from "@grants-stack-indexer/repository"; import { ILogger } from "@grants-stack-indexer/shared"; @@ -14,6 +16,7 @@ import { DataLoader } from "../../src/data-loader/dataLoader.js"; import { InvalidChangeset } from "../../src/internal.js"; describe("DataLoader", () => { + let dataLoader: DataLoader; const mockProjectRepository = { insertProject: vi.fn(), updateProject: vi.fn(), @@ -44,8 +47,14 @@ describe("DataLoader", () => { info: vi.fn(), warn: vi.fn(), }; - const createDataLoader = (): DataLoader => - new DataLoader( + + const mockTx = { query: vi.fn() } as unknown as TransactionConnection; + const mockTransactionManager = { + runInTransaction: async (fn) => await fn(mockTx), + } as ITransactionManager; + + beforeEach(() => { + dataLoader = new DataLoader( { project: mockProjectRepository, round: mockRoundRepository, @@ -53,16 +62,17 @@ describe("DataLoader", () => { donation: mockDonationRepository, applicationPayout: mockApplicationPayoutRepository, }, + mockTransactionManager, logger, ); + }); - beforeEach(() => { + afterEach(() => { vi.clearAllMocks(); }); describe("applyChanges", () => { it("successfully process multiple changesets", async () => { - const dataLoader = createDataLoader(); const changesets = [ { type: "InsertProject", @@ -74,18 +84,21 @@ describe("DataLoader", () => { } as unknown as Changeset, ]; - const result = await dataLoader.applyChanges(changesets); + await dataLoader.applyChanges(changesets); - expect(result.numExecuted).toBe(2); - expect(result.numSuccessful).toBe(2); - expect(result.numFailed).toBe(0); - expect(result.errors).toHaveLength(0); expect(mockProjectRepository.insertProject).toHaveBeenCalledTimes(1); + expect(mockProjectRepository.insertProject).toHaveBeenCalledWith( + { id: "1", name: "Test Project" }, + mockTx, + ); expect(mockRoundRepository.insertRound).toHaveBeenCalledTimes(1); + expect(mockRoundRepository.insertRound).toHaveBeenCalledWith( + { id: "1", name: "Test Round" }, + mockTx, + ); }); it("throw InvalidChangeset when encountering unknown changeset type", async () => { - const dataLoader = createDataLoader(); const changesets = [ { type: "UnknownType", @@ -97,8 +110,7 @@ describe("DataLoader", () => { ); }); - it("stops processing changesets on first error", async () => { - const dataLoader = createDataLoader(); + it("throws an error if the database operation fails", async () => { const error = new Error("Database error"); vi.spyOn(mockProjectRepository, "insertProject").mockRejectedValueOnce(error); @@ -113,13 +125,11 @@ describe("DataLoader", () => { } as unknown as Changeset, ]; - const result = await dataLoader.applyChanges(changesets); + await expect(dataLoader.applyChanges(changesets)).rejects.toThrow(error); - expect(result.numExecuted).toBe(1); - expect(result.numSuccessful).toBe(0); - expect(result.numFailed).toBe(1); - expect(result.errors).toHaveLength(1); - expect(result.errors[0]).toContain("Database error"); + expect(logger.debug).toHaveBeenCalledWith( + `Error applying changeset InsertProject. Rolling back transaction with 2 changesets`, + ); expect(mockRoundRepository.insertRound).not.toHaveBeenCalled(); }); }); diff --git a/packages/data-flow/test/data-loader/handlers/application.handlers.spec.ts b/packages/data-flow/test/data-loader/handlers/application.handlers.spec.ts index 86dad31..1c853d6 100644 --- a/packages/data-flow/test/data-loader/handlers/application.handlers.spec.ts +++ b/packages/data-flow/test/data-loader/handlers/application.handlers.spec.ts @@ -1,6 +1,10 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; -import { IApplicationRepository, NewApplication } from "@grants-stack-indexer/repository"; +import { + IApplicationRepository, + NewApplication, + TransactionConnection, +} from "@grants-stack-indexer/repository"; import { ChainId } from "@grants-stack-indexer/shared"; import { createApplicationHandlers } from "../../../src/data-loader/handlers/application.handlers.js"; @@ -10,6 +14,7 @@ describe("Application Handlers", () => { insertApplication: vi.fn(), updateApplication: vi.fn(), } as unknown as IApplicationRepository; + const mockTxConnection = { query: vi.fn() } as unknown as TransactionConnection; const handlers = createApplicationHandlers(mockRepository); @@ -24,7 +29,7 @@ describe("Application Handlers", () => { args: application, }); - expect(mockRepository.insertApplication).toHaveBeenCalledWith(application); + expect(mockRepository.insertApplication).toHaveBeenCalledWith(application, undefined); }); it("handle UpdateApplication changeset", async () => { @@ -38,11 +43,12 @@ describe("Application Handlers", () => { }, } as const; - await handlers.UpdateApplication(update); + await handlers.UpdateApplication(update, mockTxConnection); expect(mockRepository.updateApplication).toHaveBeenCalledWith( { chainId: 1, roundId: "round1", id: "app1" }, { status: "APPROVED" }, + mockTxConnection, ); }); }); diff --git a/packages/data-flow/test/data-loader/handlers/applicationPayout.handlers.spec.ts b/packages/data-flow/test/data-loader/handlers/applicationPayout.handlers.spec.ts new file mode 100644 index 0000000..0d17dfd --- /dev/null +++ b/packages/data-flow/test/data-loader/handlers/applicationPayout.handlers.spec.ts @@ -0,0 +1,42 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import { + IApplicationPayoutRepository, + NewApplicationPayout, + TransactionConnection, +} from "@grants-stack-indexer/repository"; + +import { createApplicationPayoutHandlers } from "../../../src/data-loader/handlers/applicationPayout.handlers.js"; + +describe("ApplicationPayout Handlers", () => { + const mockRepository = { + insertApplicationPayout: vi.fn(), + } as IApplicationPayoutRepository; + const mockTxConnection = { query: vi.fn() } as unknown as TransactionConnection; + + const handlers = createApplicationPayoutHandlers(mockRepository); + + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("handle InsertApplication changeset", async () => { + const applicationPayout = { + id: "1", + name: "Test Application", + } as unknown as NewApplicationPayout; + + await handlers.InsertApplicationPayout( + { + type: "InsertApplicationPayout", + args: { applicationPayout }, + }, + mockTxConnection, + ); + + expect(mockRepository.insertApplicationPayout).toHaveBeenCalledWith( + applicationPayout, + mockTxConnection, + ); + }); +}); diff --git a/packages/data-flow/test/data-loader/handlers/donation.handlers.spec.ts b/packages/data-flow/test/data-loader/handlers/donation.handlers.spec.ts new file mode 100644 index 0000000..deb562d --- /dev/null +++ b/packages/data-flow/test/data-loader/handlers/donation.handlers.spec.ts @@ -0,0 +1,54 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import { + IDonationRepository, + NewDonation, + TransactionConnection, +} from "@grants-stack-indexer/repository"; + +import { createDonationHandlers } from "../../../src/data-loader/handlers/donation.handlers.js"; + +describe("Donation Handlers", () => { + const mockRepository = { + insertDonation: vi.fn(), + insertManyDonations: vi.fn(), + } as IDonationRepository; + const mockTxConnection = { query: vi.fn() } as unknown as TransactionConnection; + + const handlers = createDonationHandlers(mockRepository); + + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("handle InsertDonation changeset", async () => { + const donation = { id: "1", name: "Test Donation" } as unknown as NewDonation; + await handlers.InsertDonation({ + type: "InsertDonation", + args: { donation }, + }); + + expect(mockRepository.insertDonation).toHaveBeenCalledWith(donation, undefined); + }); + + it("handle InsertManyDonations changeset", async () => { + const donations = [ + { id: "1", name: "Test Donation" }, + { id: "2", name: "Test Donation 2" }, + { id: "3", name: "Test Donation 3" }, + ] as unknown as NewDonation[]; + + await handlers.InsertManyDonations( + { + type: "InsertManyDonations", + args: { donations }, + }, + mockTxConnection, + ); + + expect(mockRepository.insertManyDonations).toHaveBeenCalledWith( + donations, + mockTxConnection, + ); + }); +}); diff --git a/packages/data-flow/test/data-loader/handlers/project.handlers.spec.ts b/packages/data-flow/test/data-loader/handlers/project.handlers.spec.ts index 5ef0f45..087ece9 100644 --- a/packages/data-flow/test/data-loader/handlers/project.handlers.spec.ts +++ b/packages/data-flow/test/data-loader/handlers/project.handlers.spec.ts @@ -1,6 +1,10 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; -import { IProjectRepository, NewProject } from "@grants-stack-indexer/repository"; +import { + IProjectRepository, + NewProject, + TransactionConnection, +} from "@grants-stack-indexer/repository"; import { Address, ChainId } from "@grants-stack-indexer/shared"; import { createProjectHandlers } from "../../../src/data-loader/handlers/project.handlers.js"; @@ -14,6 +18,7 @@ describe("Project Handlers", () => { insertProjectRole: vi.fn(), deleteManyProjectRoles: vi.fn(), } as unknown as IProjectRepository; + const mockTxConnection = { query: vi.fn() } as unknown as TransactionConnection; const handlers = createProjectHandlers(mockRepository); @@ -39,12 +44,15 @@ describe("Project Handlers", () => { projectType: "canonical", } as NewProject; - await handlers.InsertProject({ - type: "InsertProject", - args: { project }, - }); + await handlers.InsertProject( + { + type: "InsertProject", + args: { project }, + }, + mockTxConnection, + ); - expect(mockRepository.insertProject).toHaveBeenCalledWith(project); + expect(mockRepository.insertProject).toHaveBeenCalledWith(project, mockTxConnection); }); it("handle UpdateProject changeset", async () => { @@ -65,6 +73,7 @@ describe("Project Handlers", () => { expect(mockRepository.updateProject).toHaveBeenCalledWith( { id: "project-1", chainId: 1 }, { name: "Updated Project", updatedAtBlock: 200n }, + undefined, ); }); @@ -81,7 +90,10 @@ describe("Project Handlers", () => { args: { pendingProjectRole: pendingRole }, }); - expect(mockRepository.insertPendingProjectRole).toHaveBeenCalledWith(pendingRole); + expect(mockRepository.insertPendingProjectRole).toHaveBeenCalledWith( + pendingRole, + undefined, + ); }); it("handle DeletePendingProjectRoles changeset", async () => { @@ -94,7 +106,10 @@ describe("Project Handlers", () => { await handlers.DeletePendingProjectRoles(changeset); - expect(mockRepository.deleteManyPendingProjectRoles).toHaveBeenCalledWith([1, 2, 3]); + expect(mockRepository.deleteManyPendingProjectRoles).toHaveBeenCalledWith( + [1, 2, 3], + undefined, + ); }); it("handle InsertProjectRole changeset", async () => { @@ -111,7 +126,7 @@ describe("Project Handlers", () => { args: { projectRole }, }); - expect(mockRepository.insertProjectRole).toHaveBeenCalledWith(projectRole); + expect(mockRepository.insertProjectRole).toHaveBeenCalledWith(projectRole, undefined); }); it("handle DeleteAllProjectRolesByRole changeset", async () => { @@ -132,6 +147,8 @@ describe("Project Handlers", () => { changeset.args.projectRole.chainId, changeset.args.projectRole.projectId, changeset.args.projectRole.role, + undefined, + undefined, ); }); @@ -155,6 +172,7 @@ describe("Project Handlers", () => { changeset.args.projectRole.projectId, changeset.args.projectRole.role, changeset.args.projectRole.address, + undefined, ); }); }); diff --git a/packages/data-flow/test/data-loader/handlers/round.handlers.spec.ts b/packages/data-flow/test/data-loader/handlers/round.handlers.spec.ts index 6ddb625..c06a960 100644 --- a/packages/data-flow/test/data-loader/handlers/round.handlers.spec.ts +++ b/packages/data-flow/test/data-loader/handlers/round.handlers.spec.ts @@ -1,6 +1,10 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; -import { IRoundRepository, NewRound } from "@grants-stack-indexer/repository"; +import { + IRoundRepository, + NewRound, + TransactionConnection, +} from "@grants-stack-indexer/repository"; import { Address, ChainId } from "@grants-stack-indexer/shared"; import { createRoundHandlers } from "../../../src/data-loader/handlers/round.handlers.js"; @@ -16,6 +20,7 @@ describe("Round Handlers", () => { insertRoundRole: vi.fn(), deleteManyRoundRolesByRoleAndAddress: vi.fn(), } as unknown as IRoundRepository; + const mockTxConnection = { query: vi.fn() } as unknown as TransactionConnection; const handlers = createRoundHandlers(mockRepository); @@ -30,12 +35,15 @@ describe("Round Handlers", () => { matchAmount: 1000n, } as NewRound; - await handlers.InsertRound({ - type: "InsertRound" as const, - args: { round }, - }); + await handlers.InsertRound( + { + type: "InsertRound" as const, + args: { round }, + }, + mockTxConnection, + ); - expect(mockRepository.insertRound).toHaveBeenCalledWith(round); + expect(mockRepository.insertRound).toHaveBeenCalledWith(round, mockTxConnection); }); it("handle UpdateRound changeset", async () => { @@ -56,6 +64,7 @@ describe("Round Handlers", () => { expect(mockRepository.updateRound).toHaveBeenCalledWith( { id: "round-1", chainId: 1 as ChainId }, { matchAmount: 2000n, matchAmountInUsd: "2000" }, + undefined, ); }); @@ -77,6 +86,7 @@ describe("Round Handlers", () => { expect(mockRepository.updateRound).toHaveBeenCalledWith( { chainId: 1 as ChainId, strategyAddress: "0x123" as Address }, { matchAmount: 2000n, matchAmountInUsd: "2000" }, + undefined, ); }); @@ -97,6 +107,7 @@ describe("Round Handlers", () => { { chainId: 1 as ChainId, roundId: "round-1" }, 1000n, "1000", + undefined, ); }); @@ -115,6 +126,7 @@ describe("Round Handlers", () => { expect(mockRepository.incrementRoundTotalDistributed).toHaveBeenCalledWith( { chainId: 1 as ChainId, roundId: "round-1" }, 1000n, + undefined, ); }); @@ -135,6 +147,7 @@ describe("Round Handlers", () => { expect(mockRepository.insertPendingRoundRole).toHaveBeenCalledWith( changeset.args.pendingRoundRole, + undefined, ); }); @@ -148,7 +161,10 @@ describe("Round Handlers", () => { await handlers.DeletePendingRoundRoles(changeset); - expect(mockRepository.deleteManyPendingRoundRoles).toHaveBeenCalledWith([1, 2, 3]); + expect(mockRepository.deleteManyPendingRoundRoles).toHaveBeenCalledWith( + [1, 2, 3], + undefined, + ); }); it("handle InsertRoundRole changeset", async () => { @@ -167,7 +183,10 @@ describe("Round Handlers", () => { await handlers.InsertRoundRole(changeset); - expect(mockRepository.insertRoundRole).toHaveBeenCalledWith(changeset.args.roundRole); + expect(mockRepository.insertRoundRole).toHaveBeenCalledWith( + changeset.args.roundRole, + undefined, + ); }); it("handle DeleteAllRoundRolesByRoleAndAddress changeset", async () => { @@ -190,6 +209,7 @@ describe("Round Handlers", () => { changeset.args.roundRole.roundId, changeset.args.roundRole.role, changeset.args.roundRole.address, + undefined, ); }); }); diff --git a/packages/data-flow/test/unit/orchestrator.spec.ts b/packages/data-flow/test/unit/orchestrator.spec.ts index ec74912..057784b 100644 --- a/packages/data-flow/test/unit/orchestrator.spec.ts +++ b/packages/data-flow/test/unit/orchestrator.spec.ts @@ -11,6 +11,7 @@ import { IDonationRepository, IProjectRepository, IRoundRepository, + ITransactionManager, } from "@grants-stack-indexer/repository"; import { AlloEvent, @@ -92,6 +93,7 @@ describe("Orchestrator", { sequential: true }, () => { const dependencies: CoreDependencies = { evmProvider: mockEvmProvider, + transactionManager: {} as unknown as ITransactionManager, projectRepository: {} as unknown as IProjectRepository, roundRepository: {} as unknown as IRoundRepository, applicationRepository: {} as unknown as IApplicationRepository, @@ -151,13 +153,9 @@ describe("Orchestrator", { sequential: true }, () => { .mockResolvedValueOnce(mockEvents) .mockResolvedValue([]); eventsProcessorSpy.mockResolvedValue([]); - vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue({ - numFailed: 0, - errors: [], - changesets: [], - numExecuted: 1, - numSuccessful: 1, - }); + vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue( + await Promise.resolve(), + ); vi.spyOn(mockEventsRegistry, "saveLastProcessedEvent").mockImplementation(() => { return Promise.resolve(); }); @@ -246,13 +244,9 @@ describe("Orchestrator", { sequential: true }, () => { return Promise.resolve(); }); - vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue({ - numFailed: 0, - errors: [], - changesets: ["InsertProject", "InsertRoundRole", "DeletePendingRoundRoles"], - numExecuted: 3, - numSuccessful: 3, - }); + vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue( + await Promise.resolve(), + ); runPromise = orchestrator.run(abortController.signal); @@ -374,13 +368,9 @@ describe("Orchestrator", { sequential: true }, () => { vi.spyOn(mockEventsRegistry, "saveLastProcessedEvent").mockImplementation(() => { return Promise.resolve(); }); - vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue({ - numFailed: 0, - errors: [], - changesets: ["InsertApplication"], - numExecuted: 1, - numSuccessful: 1, - }); + vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue( + await Promise.resolve(), + ); runPromise = orchestrator.run(abortController.signal); @@ -485,13 +475,9 @@ describe("Orchestrator", { sequential: true }, () => { .mockResolvedValue([]); eventsProcessorSpy.mockResolvedValue([]); - vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue({ - numFailed: 0, - errors: [], - changesets: [], - numExecuted: 1, - numSuccessful: 1, - }); + vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue( + await Promise.resolve(), + ); vi.spyOn(mockEventsRegistry, "saveLastProcessedEvent").mockImplementation(() => { return Promise.resolve(); }); @@ -544,13 +530,9 @@ describe("Orchestrator", { sequential: true }, () => { return Promise.resolve(); }); - vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue({ - numFailed: 0, - errors: [], - changesets: ["InsertPendingRoundRole"], - numExecuted: 1, - numSuccessful: 1, - }); + vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue( + await Promise.resolve(), + ); runPromise = orchestrator.run(abortController.signal); @@ -579,13 +561,9 @@ describe("Orchestrator", { sequential: true }, () => { .mockResolvedValue([]); eventsProcessorSpy.mockRejectedValueOnce(error).mockResolvedValueOnce([]); - vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue({ - numFailed: 0, - errors: [], - changesets: [], - numExecuted: 1, - numSuccessful: 1, - }); + vi.spyOn(orchestrator["dataLoader"], "applyChanges").mockResolvedValue( + await Promise.resolve(), + ); vi.spyOn(mockEventsRegistry, "saveLastProcessedEvent").mockImplementation(() => { return Promise.resolve(); }); @@ -674,41 +652,50 @@ describe("Orchestrator", { sequential: true }, () => { expect(mockEventsRegistry.saveLastProcessedEvent).not.toHaveBeenCalled(); }); - it.skip("logs DataLoader errors", async () => { - const mockEvent = createMockEvent("Allo", "PoolCreated", 1); - const mockChangesets: Changeset[] = [ - { type: "UpdateProject", args: { chainId, projectId: "1", project: {} } }, + it("logs DataLoader errors", async () => { + const mockEvent = createMockEvent("Registry", "ProfileCreated", 1, undefined); + const changesets = [ + { + type: "InsertPendingRoundRole", + args: { chainId, roundId: "1", roundRole: {} }, + } as unknown as Changeset, ]; - const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); - const dataLoaderSpy = vi.spyOn(orchestrator["dataLoader"], "applyChanges"); + const eventsProcessorSpy = vi.spyOn(orchestrator["eventsProcessor"], "processEvent"); + vi.spyOn(mockEventsRegistry, "getLastProcessedEvent").mockResolvedValue(undefined); vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") .mockResolvedValueOnce([mockEvent]) .mockResolvedValue([]); - vi.spyOn(orchestrator["eventsProcessor"], "processEvent").mockResolvedValue( - mockChangesets, - ); - dataLoaderSpy.mockResolvedValue({ - numFailed: 1, - errors: ["Failed to update project"], - changesets: ["UpdateProject"], - numExecuted: 1, - numSuccessful: 0, + + eventsProcessorSpy.mockResolvedValue(changesets); + + vi.spyOn(mockEventsRegistry, "saveLastProcessedEvent").mockImplementation(() => { + return Promise.resolve(); }); + const dataLoaderSpy = vi.spyOn(orchestrator["dataLoader"], "applyChanges"); + const error = new Error("Failed to apply changesets"); + dataLoaderSpy.mockRejectedValue(error); + runPromise = orchestrator.run(abortController.signal); await vi.waitFor(() => { - if (dataLoaderSpy.mock.calls.length < 1) throw new Error("Not yet called"); + if (eventsProcessorSpy.mock.calls.length < 1) throw new Error("Not yet called"); }); - expect(consoleSpy).toHaveBeenCalledWith( - expect.stringContaining("Failed to apply changesets"), - ); - expect(consoleSpy).toHaveBeenCalledWith(expect.stringContaining(stringify(mockEvent))); + runPromise = orchestrator.run(abortController.signal); + + await vi.waitFor(() => { + if (eventsProcessorSpy.mock.calls.length < 1) throw new Error("Not yet called"); + }); + + expect(logger.error).toHaveBeenCalledWith(error, { + event: mockEvent, + className: Orchestrator.name, + chainId, + }); expect(dataLoaderSpy).toHaveBeenCalledTimes(1); - expect(mockEventsRegistry.saveLastProcessedEvent).not.toHaveBeenCalled(); }); }); }); diff --git a/packages/data-flow/test/unit/retroactiveProcessor.spec.ts b/packages/data-flow/test/unit/retroactiveProcessor.spec.ts index cb52126..0dd931c 100644 --- a/packages/data-flow/test/unit/retroactiveProcessor.spec.ts +++ b/packages/data-flow/test/unit/retroactiveProcessor.spec.ts @@ -9,6 +9,7 @@ import { IProjectRepository, IRoundRepository, IStrategyProcessingCheckpointRepository, + ITransactionManager, Strategy, } from "@grants-stack-indexer/repository"; import { @@ -138,6 +139,7 @@ describe("RetroactiveProcessor", () => { applicationRepository: {} as IApplicationRepository, donationRepository: {} as IDonationRepository, applicationPayoutRepository: {} as IApplicationPayoutRepository, + transactionManager: {} as ITransactionManager, pricingProvider: { getTokenPrice: vi.fn(), }, @@ -216,13 +218,7 @@ describe("RetroactiveProcessor", () => { .mockResolvedValueOnce([mockEvent]) .mockResolvedValue([]); vi.spyOn(mockEventsProcessor, "processEvent").mockResolvedValue([]); - vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue({ - numFailed: 0, - errors: [], - changesets: [], - numExecuted: 1, - numSuccessful: 1, - }); + vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue(await Promise.resolve()); vi.spyOn(mockStrategyRegistry, "saveStrategyId").mockResolvedValue(); await processor.processRetroactiveStrategies(); @@ -271,13 +267,7 @@ describe("RetroactiveProcessor", () => { return []; }); vi.spyOn(mockEventsProcessor, "processEvent").mockResolvedValue([]); - vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue({ - numFailed: 0, - errors: [], - changesets: [], - numExecuted: 1, - numSuccessful: 1, - }); + vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue(await Promise.resolve()); vi.spyOn(mockStrategyRegistry, "saveStrategyId").mockResolvedValue(); await processor.processRetroactiveStrategies(); @@ -325,13 +315,7 @@ describe("RetroactiveProcessor", () => { .mockResolvedValueOnce([mockEvent]) .mockResolvedValue([]); vi.spyOn(mockEventsProcessor, "processEvent").mockResolvedValue([]); - vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue({ - numFailed: 0, - errors: [], - changesets: [], - numExecuted: 1, - numSuccessful: 1, - }); + vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue(await Promise.resolve()); vi.spyOn(mockStrategyRegistry, "saveStrategyId").mockResolvedValue(); await processor.processRetroactiveStrategies(); @@ -372,13 +356,7 @@ describe("RetroactiveProcessor", () => { vi.spyOn(processor["eventsFetcher"], "fetchEvents").mockResolvedValueOnce([mockEvent]); vi.spyOn(mockEventsProcessor, "processEvent").mockResolvedValue([]); - vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue({ - numFailed: 0, - errors: [], - changesets: [], - numExecuted: 1, - numSuccessful: 1, - }); + vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue(await Promise.resolve()); vi.spyOn(mockStrategyRegistry, "saveStrategyId").mockResolvedValue(); await processor.processRetroactiveStrategies(); @@ -417,13 +395,7 @@ describe("RetroactiveProcessor", () => { .mockResolvedValue([]); vi.spyOn(mockEventsProcessor, "processEvent").mockResolvedValue([]); - vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue({ - numFailed: 0, - errors: [], - changesets: [], - numExecuted: 1, - numSuccessful: 1, - }); + vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue(await Promise.resolve()); vi.spyOn(mockStrategyRegistry, "saveStrategyId").mockResolvedValue(); await processor.processRetroactiveStrategies(); @@ -464,13 +436,7 @@ describe("RetroactiveProcessor", () => { .mockRejectedValueOnce(new InvalidEvent(mockEvent1)) .mockResolvedValueOnce([]); - vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue({ - numFailed: 0, - errors: [], - changesets: [], - numExecuted: 1, - numSuccessful: 1, - }); + vi.spyOn(mockDataLoader, "applyChanges").mockResolvedValue(await Promise.resolve()); await processor.processRetroactiveStrategies(); From 6d5b44e3ee2078a627016f8739d1436064ba202a Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Tue, 31 Dec 2024 16:52:20 -0300 Subject: [PATCH 3/4] fix: tx kysely typing and add more verbose log --- .../data-flow/src/data-loader/dataLoader.ts | 10 +++++++++- .../kysely/application.repository.ts | 7 ++++--- .../kysely/applicationPayout.repository.ts | 11 ++++++++--- .../kysely/donation.repository.ts | 8 ++++---- .../repositories/kysely/project.repository.ts | 15 ++++++++------- .../repositories/kysely/round.repository.ts | 19 ++++++++++--------- .../repositories/kysely/transactionManager.ts | 6 +++--- .../repository/src/types/transaction.types.ts | 5 ++--- 8 files changed, 48 insertions(+), 33 deletions(-) diff --git a/packages/data-flow/src/data-loader/dataLoader.ts b/packages/data-flow/src/data-loader/dataLoader.ts index d87b0cc..91557a2 100644 --- a/packages/data-flow/src/data-loader/dataLoader.ts +++ b/packages/data-flow/src/data-loader/dataLoader.ts @@ -63,7 +63,9 @@ export class DataLoader implements IDataLoader { } await this.transactionManager.runInTransaction(async (tx) => { - this.logger.debug("Starting transaction..."); + this.logger.debug(`Starting transaction on ${changesets.length} changesets...`, { + className: DataLoader.name, + }); for (const changeset of changesets) { try { //TODO: inside each handler, we should add zod validation that the args match the expected type @@ -71,11 +73,17 @@ export class DataLoader implements IDataLoader { } catch (error) { this.logger.debug( `Error applying changeset ${changeset.type}. Rolling back transaction with ${changesets.length} changesets`, + { + className: DataLoader.name, + }, ); throw error; } } }); + this.logger.debug(`Successfully applied ${changesets.length} changesets`, { + className: DataLoader.name, + }); } } diff --git a/packages/repository/src/repositories/kysely/application.repository.ts b/packages/repository/src/repositories/kysely/application.repository.ts index 4d1e202..563c93d 100644 --- a/packages/repository/src/repositories/kysely/application.repository.ts +++ b/packages/repository/src/repositories/kysely/application.repository.ts @@ -7,11 +7,12 @@ import { ApplicationNotFound, Database, IApplicationRepository, + KyselyTransaction, NewApplication, PartialApplication, } from "../../internal.js"; -export class KyselyApplicationRepository implements IApplicationRepository> { +export class KyselyApplicationRepository implements IApplicationRepository { constructor( private readonly db: Kysely, private readonly schemaName: string, @@ -96,7 +97,7 @@ export class KyselyApplicationRepository implements IApplicationRepository): Promise { + async insertApplication(application: NewApplication, tx?: KyselyTransaction): Promise { const _application = this.formatApplication(application); const queryBuilder = (tx || this.db).withSchema(this.schemaName); @@ -107,7 +108,7 @@ export class KyselyApplicationRepository implements IApplicationRepository, + tx?: KyselyTransaction, ): Promise { const _application = this.formatApplication(application); const queryBuilder = (tx || this.db).withSchema(this.schemaName); diff --git a/packages/repository/src/repositories/kysely/applicationPayout.repository.ts b/packages/repository/src/repositories/kysely/applicationPayout.repository.ts index a384e8a..2a5e177 100644 --- a/packages/repository/src/repositories/kysely/applicationPayout.repository.ts +++ b/packages/repository/src/repositories/kysely/applicationPayout.repository.ts @@ -1,9 +1,14 @@ import { Kysely } from "kysely"; -import { Database, IApplicationPayoutRepository, NewApplicationPayout } from "../../internal.js"; +import { + Database, + IApplicationPayoutRepository, + KyselyTransaction, + NewApplicationPayout, +} from "../../internal.js"; export class KyselyApplicationPayoutRepository - implements IApplicationPayoutRepository> + implements IApplicationPayoutRepository { constructor( private readonly db: Kysely, @@ -13,7 +18,7 @@ export class KyselyApplicationPayoutRepository /** @inheritdoc */ async insertApplicationPayout( applicationPayout: NewApplicationPayout, - tx?: Kysely, + tx?: KyselyTransaction, ): Promise { const queryBuilder = (tx || this.db).withSchema(this.schemaName); await queryBuilder.insertInto("applicationsPayouts").values(applicationPayout).execute(); diff --git a/packages/repository/src/repositories/kysely/donation.repository.ts b/packages/repository/src/repositories/kysely/donation.repository.ts index bf4a001..ba47ef8 100644 --- a/packages/repository/src/repositories/kysely/donation.repository.ts +++ b/packages/repository/src/repositories/kysely/donation.repository.ts @@ -1,15 +1,15 @@ import { Kysely } from "kysely"; -import { Database, IDonationRepository, NewDonation } from "../../internal.js"; +import { Database, IDonationRepository, KyselyTransaction, NewDonation } from "../../internal.js"; -export class KyselyDonationRepository implements IDonationRepository> { +export class KyselyDonationRepository implements IDonationRepository { constructor( private readonly db: Kysely, private readonly schemaName: string, ) {} /** @inheritdoc */ - async insertDonation(donation: NewDonation, tx?: Kysely): Promise { + async insertDonation(donation: NewDonation, tx?: KyselyTransaction): Promise { const queryBuilder = (tx || this.db).withSchema(this.schemaName); await queryBuilder @@ -22,7 +22,7 @@ export class KyselyDonationRepository implements IDonationRepository): Promise { + async insertManyDonations(donations: NewDonation[], tx?: KyselyTransaction): Promise { const queryBuilder = (tx || this.db).withSchema(this.schemaName); await queryBuilder diff --git a/packages/repository/src/repositories/kysely/project.repository.ts b/packages/repository/src/repositories/kysely/project.repository.ts index 58cfb46..0d4b069 100644 --- a/packages/repository/src/repositories/kysely/project.repository.ts +++ b/packages/repository/src/repositories/kysely/project.repository.ts @@ -5,6 +5,7 @@ import { Address, ChainId } from "@grants-stack-indexer/shared"; import { IProjectRepository } from "../../interfaces/projectRepository.interface.js"; import { Database, + KyselyTransaction, NewPendingProjectRole, NewProject, NewProjectRole, @@ -15,7 +16,7 @@ import { ProjectRoleNames, } from "../../internal.js"; -export class KyselyProjectRepository implements IProjectRepository> { +export class KyselyProjectRepository implements IProjectRepository { constructor( private readonly db: Kysely, private readonly schemaName: string, @@ -73,7 +74,7 @@ export class KyselyProjectRepository implements IProjectRepository): Promise { + async insertProject(project: NewProject, tx?: KyselyTransaction): Promise { const queryBuilder = (tx || this.db).withSchema(this.schemaName); await queryBuilder.insertInto("projects").values(project).execute(); } @@ -82,7 +83,7 @@ export class KyselyProjectRepository implements IProjectRepository, + tx?: KyselyTransaction, ): Promise { const queryBuilder = (tx || this.db).withSchema(this.schemaName); await queryBuilder @@ -96,7 +97,7 @@ export class KyselyProjectRepository implements IProjectRepository): Promise { + async insertProjectRole(projectRole: NewProjectRole, tx?: KyselyTransaction): Promise { const queryBuilder = (tx || this.db).withSchema(this.schemaName); await queryBuilder.insertInto("projectRoles").values(projectRole).execute(); } @@ -107,7 +108,7 @@ export class KyselyProjectRepository implements IProjectRepository, + tx?: KyselyTransaction, ): Promise { const queryBuilder = (tx || this.db).withSchema(this.schemaName); const query = queryBuilder @@ -151,14 +152,14 @@ export class KyselyProjectRepository implements IProjectRepository, + tx?: KyselyTransaction, ): Promise { const queryBuilder = (tx || this.db).withSchema(this.schemaName); await queryBuilder.insertInto("pendingProjectRoles").values(pendingProjectRole).execute(); } /* @inheritdoc */ - async deleteManyPendingProjectRoles(ids: number[], tx?: Kysely): Promise { + async deleteManyPendingProjectRoles(ids: number[], tx?: KyselyTransaction): Promise { const queryBuilder = (tx || this.db).withSchema(this.schemaName); await queryBuilder.deleteFrom("pendingProjectRoles").where("id", "in", ids).execute(); } diff --git a/packages/repository/src/repositories/kysely/round.repository.ts b/packages/repository/src/repositories/kysely/round.repository.ts index 356a840..a48727d 100644 --- a/packages/repository/src/repositories/kysely/round.repository.ts +++ b/packages/repository/src/repositories/kysely/round.repository.ts @@ -5,6 +5,7 @@ import { Address, ChainId, stringify } from "@grants-stack-indexer/shared"; import { Database, IRoundRepository, + KyselyTransaction, NewPendingRoundRole, NewRound, NewRoundRole, @@ -17,7 +18,7 @@ import { RoundRoleNames, } from "../../internal.js"; -export class KyselyRoundRepository implements IRoundRepository> { +export class KyselyRoundRepository implements IRoundRepository { constructor( private readonly db: Kysely, private readonly schemaName: string, @@ -114,7 +115,7 @@ export class KyselyRoundRepository implements IRoundRepository> } /* @inheritdoc */ - async insertRound(round: NewRound, tx?: Kysely): Promise { + async insertRound(round: NewRound, tx?: KyselyTransaction): Promise { const _round = this.formatRound(round); const queryBuilder = (tx || this.db).withSchema(this.schemaName); await queryBuilder.insertInto("rounds").values(_round).execute(); @@ -124,7 +125,7 @@ export class KyselyRoundRepository implements IRoundRepository> async updateRound( where: { id: string; chainId: ChainId } | { chainId: ChainId; strategyAddress: Address }, round: PartialRound, - tx?: Kysely, + tx?: KyselyTransaction, ): Promise { const _round = this.formatRound(round); const queryBuilder = (tx || this.db).withSchema(this.schemaName); @@ -145,7 +146,7 @@ export class KyselyRoundRepository implements IRoundRepository> where: { chainId: ChainId; roundId: string }, amount: bigint, amountInUsd: string, - tx?: Kysely, + tx?: KyselyTransaction, ): Promise { const queryBuilder = (tx || this.db).withSchema(this.schemaName); await queryBuilder @@ -163,7 +164,7 @@ export class KyselyRoundRepository implements IRoundRepository> async incrementRoundTotalDistributed( where: { chainId: ChainId; roundId: string }, amount: bigint, - tx?: Kysely, + tx?: KyselyTransaction, ): Promise { const queryBuilder = (tx || this.db).withSchema(this.schemaName); await queryBuilder @@ -184,7 +185,7 @@ export class KyselyRoundRepository implements IRoundRepository> } /* @inheritdoc */ - async insertRoundRole(roundRole: NewRoundRole, tx?: Kysely): Promise { + async insertRoundRole(roundRole: NewRoundRole, tx?: KyselyTransaction): Promise { const queryBuilder = (tx || this.db).withSchema(this.schemaName); await queryBuilder.insertInto("roundRoles").values(roundRole).execute(); } @@ -195,7 +196,7 @@ export class KyselyRoundRepository implements IRoundRepository> roundId: string, role: RoundRoleNames, address: Address, - tx?: Kysely, + tx?: KyselyTransaction, ): Promise { const queryBuilder = (tx || this.db).withSchema(this.schemaName); await queryBuilder @@ -226,14 +227,14 @@ export class KyselyRoundRepository implements IRoundRepository> /* @inheritdoc */ async insertPendingRoundRole( pendingRoundRole: NewPendingRoundRole, - tx?: Kysely, + tx?: KyselyTransaction, ): Promise { const queryBuilder = (tx || this.db).withSchema(this.schemaName); await queryBuilder.insertInto("pendingRoundRoles").values(pendingRoundRole).execute(); } /* @inheritdoc */ - async deleteManyPendingRoundRoles(ids: number[], tx?: Kysely): Promise { + async deleteManyPendingRoundRoles(ids: number[], tx?: KyselyTransaction): Promise { const queryBuilder = (tx || this.db).withSchema(this.schemaName); await queryBuilder.deleteFrom("pendingRoundRoles").where("id", "in", ids).execute(); } diff --git a/packages/repository/src/repositories/kysely/transactionManager.ts b/packages/repository/src/repositories/kysely/transactionManager.ts index bb4f619..6c4c810 100644 --- a/packages/repository/src/repositories/kysely/transactionManager.ts +++ b/packages/repository/src/repositories/kysely/transactionManager.ts @@ -1,12 +1,12 @@ import { Kysely } from "kysely"; -import { Database, ITransactionManager } from "../../internal.js"; +import { Database, ITransactionManager, KyselyTransaction } from "../../internal.js"; -export class KyselyTransactionManager implements ITransactionManager> { +export class KyselyTransactionManager implements ITransactionManager { constructor(private readonly db: Kysely) {} /** @inheritdoc */ - async runInTransaction(fn: (tx: Kysely) => Promise): Promise { + async runInTransaction(fn: (tx: KyselyTransaction) => Promise): Promise { return this.db.transaction().execute(fn); } } diff --git a/packages/repository/src/types/transaction.types.ts b/packages/repository/src/types/transaction.types.ts index 89d2347..fddf619 100644 --- a/packages/repository/src/types/transaction.types.ts +++ b/packages/repository/src/types/transaction.types.ts @@ -1,8 +1,7 @@ -// packages/repository/src/types/transaction.types.ts -import { Kysely } from "kysely"; +import { Transaction } from "kysely"; import { Database } from "../internal.js"; -export type KyselyTransaction = Kysely; +export type KyselyTransaction = Transaction; export type TransactionConnection = KyselyTransaction; From cff70d558d715246362f9f2cb41bda0d5e3558b5 Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Wed, 1 Jan 2025 22:24:09 -0300 Subject: [PATCH 4/4] test: fix tests --- packages/data-flow/test/data-loader/dataLoader.spec.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/data-flow/test/data-loader/dataLoader.spec.ts b/packages/data-flow/test/data-loader/dataLoader.spec.ts index f0f62e7..8c994de 100644 --- a/packages/data-flow/test/data-loader/dataLoader.spec.ts +++ b/packages/data-flow/test/data-loader/dataLoader.spec.ts @@ -127,8 +127,11 @@ describe("DataLoader", () => { await expect(dataLoader.applyChanges(changesets)).rejects.toThrow(error); - expect(logger.debug).toHaveBeenCalledWith( + expect(logger.debug).toHaveBeenLastCalledWith( `Error applying changeset InsertProject. Rolling back transaction with 2 changesets`, + { + className: DataLoader.name, + }, ); expect(mockRoundRepository.insertRound).not.toHaveBeenCalled(); });