Skip to content

Commit

Permalink
feat(Connection): add an observer api on transaction (#160)
Browse files Browse the repository at this point in the history
* feat(Connection): add an observer api on transaction

* `'commit'` event if transaction committed and is not aborted
* `'abort'` event if error happen during transaction
* `'abort'` event if `session.abortTransaction()`

* refactor: code flow in transaction
  * Do not need anymore hacks to get rif of TS and eslint warnings.
  * Better error management

```
> void Promise.resolve('resolve').then((v) => {console.log(v); throw new Error('throw from onfulfilled')}, e => console.error('catch in onrejected', e)).catch(e => console.error('catch in .catch', e))
undefined
> resolve
catch in .catch Error: throw from onfulfilled
    at REPL22:1:63
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5)

> void Promise.reject(new Error('promise error')).then(() => {/* noop */}, e => {console.error('catch in onrejected and rethrow', e); throw e}).catch(e => console.error('catch in .catch', e))
undefined
> catch in onrejected and rethrow Error: promise error
    at REPL23:1:16
catch in .catch Error: promise error
    at REPL23:1:16
```

Refs: zakodium/profid#1699
  • Loading branch information
tpoisseau authored Jun 17, 2024
1 parent 7d4d94b commit 721fe35
Show file tree
Hide file tree
Showing 7 changed files with 249 additions and 11 deletions.
2 changes: 2 additions & 0 deletions .eslintrc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ extends:
rules:
no-await-in-loop: off
'@typescript-eslint/use-unknown-in-catch-callback-variable': off
# issues with EventTarget typing (seems not in node types, and should not use dom types)
unicorn/prefer-event-target: off
8 changes: 7 additions & 1 deletion adonis-typings/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ declare module '@ioc:Zakodium/Mongodb/Database' {
Document,
} from 'mongodb';

import { TransactionEventEmitter } from '@ioc:Zakodium/Mongodb/Database/Transaction';

/**
* Shape of the configuration in `config/mongodb.ts`.
*/
Expand Down Expand Up @@ -169,7 +171,11 @@ declare module '@ioc:Zakodium/Mongodb/Database' {
collectionName: string,
): Promise<Collection<TSchema>>;
transaction<TResult>(
handler: (client: ClientSession, db: Db) => Promise<TResult>,
handler: (
client: ClientSession,
db: Db,
tx: TransactionEventEmitter,
) => Promise<TResult>,
options?: TransactionOptions,
): Promise<TResult>;
}
Expand Down
42 changes: 42 additions & 0 deletions adonis-typings/transaction.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
declare module '@ioc:Zakodium/Mongodb/Database/Transaction' {
import { EventEmitter } from 'node:events';

import { ClientSession, Db } from 'mongodb';

export interface TransactionEvents {
/**
* The transaction commits successfully.
*
* @example
* Consider you have a collection of items storing metadata of file is filesystem.
* Consider when you delete an item from this collection, you must delete associated file.
*
* ```ts
* const item = await connection.transaction((session, db, tx) => {
* const item = await db.collection('test').findOneAndDelete({ _id }, { session });
*
* tx.on('commit', () => {
* Drive.delete(deletedItem.file.path);
* });
*
* // some other logic that could fail
* // or await session.abortTransaction()
* // commit will not emit in this case
*
* return item;
* })
* ```
*/
commit: [session: ClientSession, db: Db];

/**
* The transaction aborted (optional error).
* Two cases of abortion are possible:
* - if from `session.abortTransaction()`, no error
* - if from a throw, error is set
*/
abort: [session: ClientSession, db: Db, error?: Error];
}

export class TransactionEventEmitter extends EventEmitter<TransactionEvents> {}
}
13 changes: 13 additions & 0 deletions providers/MongodbProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {

import { getMongodbModelAuthProvider } from '../src/Auth/MongodbModelAuthProvider';
import { Database } from '../src/Database/Database';
import { TransactionEventEmitter } from '../src/Database/TransactionEventEmitter';
import createMigration from '../src/Migration';
import { BaseModel, BaseAutoIncrementModel } from '../src/Model/Model';
import { field, computed } from '../src/Odm/decorators';
Expand Down Expand Up @@ -42,6 +43,17 @@ export default class MongodbProvider {
});
}

private registerTransactionEvent(): void {
this.app.container.singleton(
'Zakodium/Mongodb/Database/Transaction',
() => {
return {
TransactionEventEmitter,
};
},
);
}

private registerMigration(): void {
this.app.container.singleton('Zakodium/Mongodb/Migration', () => {
return createMigration(
Expand All @@ -52,6 +64,7 @@ export default class MongodbProvider {

public register(): void {
this.registerOdm();
this.registerTransactionEvent();
this.registerDatabase();
this.registerMigration();
}
Expand Down
56 changes: 46 additions & 10 deletions src/Database/Connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import type {
ConnectionContract,
} from '@ioc:Zakodium/Mongodb/Database';

import { TransactionEventEmitter } from './TransactionEventEmitter';

enum ConnectionStatus {
CONNECTED = 'CONNECTED',
DISCONNECTED = 'DISCONNECTED',
Expand Down Expand Up @@ -45,7 +47,7 @@ export declare interface Connection {
): this;
}

// eslint-disable-next-line unicorn/prefer-event-target, @typescript-eslint/no-unsafe-declaration-merging
// eslint-disable-next-line @typescript-eslint/no-unsafe-declaration-merging
export class Connection extends EventEmitter implements ConnectionContract {
public readonly client: MongoClient;
public readonly name: string;
Expand Down Expand Up @@ -131,17 +133,51 @@ export class Connection extends EventEmitter implements ConnectionContract {
}

public async transaction<TResult>(
handler: (session: ClientSession, db: Db) => Promise<TResult>,
handler: (
session: ClientSession,
db: Db,
transactionEventEmitter: TransactionEventEmitter,
) => Promise<TResult>,
options?: TransactionOptions,
): Promise<TResult> {
const db = await this._ensureDb();
let result: TResult;
await this.client.withSession(async (session) => {
return session.withTransaction(async (session) => {
result = await handler(session, db);
}, options);
});
// @ts-expect-error The `await` ensures `result` has a value.
return result;

let session: ClientSession;
const emitter = new TransactionEventEmitter();

return this.client
.withSession((_session) =>
_session.withTransaction(async (_session) => {
session = _session;
return handler(session, db, emitter);
}, options),
)
.then(
(result) => {
// https://github.com/mongodb/node-mongodb-native/blob/v6.7.0/src/transactions.ts#L147
// https://github.com/mongodb/node-mongodb-native/blob/v6.7.0/src/transactions.ts#L54
// session.transaction.isCommitted is not a sufficient indicator,
// because it's true if transaction commits or aborts.
const isCommitted = session.transaction.isCommitted;
const isAborted =
// https://github.com/mongodb/node-mongodb-native/blob/v6.7.0/src/transactions.ts#L11
Reflect.get(session.transaction, 'state') === 'TRANSACTION_ABORTED';

emitter.emit(
isCommitted && isAborted ? 'abort' : 'commit',
session,
db,
);

return result;
// If an error occurs in this scope,
// it will not be caught by this then's error handler, but by the caller's catch.
// This is what we want, as an error in this scope should not trigger the abort event.
},
(error) => {
emitter.emit('abort', session, db, error);
throw error;
},
);
}
}
10 changes: 10 additions & 0 deletions src/Database/TransactionEventEmitter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { EventEmitter } from 'node:events';

import { ClientSession, Db } from 'mongodb';

export interface TransactionEvents {
commit: [session: ClientSession, db: Db];
abort: [session: ClientSession, db: Db, error?: Error];
}

export class TransactionEventEmitter extends EventEmitter<TransactionEvents> {}
129 changes: 129 additions & 0 deletions src/__tests__/Connection.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as crypto from 'node:crypto';
import { setTimeout as sleep } from 'node:timers/promises';

import { getConnection, getLogger } from '../../test-utils/TestUtils';
Expand Down Expand Up @@ -33,3 +34,131 @@ test('get database', async () => {
const db = await connection.database();
expect(db).toBeDefined();
});

describe('transactions', () => {
const id = crypto.randomUUID();
beforeEach(async () => {
const Test = await connection.collection('test');
await Test.deleteMany({});
await Test.insertOne({ id });
});

test('commit event', async () => {
const txCommitController = promiseWithResolvers<number | null>();

const [txResult, count] = await Promise.all([
connection.transaction(async (session, db, tx) => {
await db.collection('test').findOneAndDelete({ id }, { session });

tx.on('commit', (session, db) => {
expect(session.transaction.isCommitted).toBe(true);

let count: number | null = null;
db.collection('test')
.countDocuments({})
.then((_count) => {
count = _count;
})
// eslint-disable-next-line no-console
.catch(console.error)
.finally(() => txCommitController.resolve(count));
});

return true;
}),
txCommitController.promise,
]);

expect(txResult).toBe(true);
expect(count).toBe(0);
});

test('abort manual event', async () => {
const txAbortController = promiseWithResolvers<number | null>();

const [txResult, count] = await Promise.all([
connection.transaction(async (session, db, tx) => {
await db.collection('test').deleteOne({ id }, { session });
await session.abortTransaction();

tx.on('abort', (session, db) => {
expect(Reflect.get(session.transaction, 'state')).toBe(
'TRANSACTION_ABORTED',
);

let count: number | null = null;
db.collection('test')
.countDocuments({})
.then((_count) => {
count = _count;
})
// eslint-disable-next-line no-console
.catch(console.error)
.finally(() => txAbortController.resolve(count));
});

return 'aborted';
}),
txAbortController.promise,
]);

expect(txResult).toBe('aborted');
expect(count).toBe(1);
});

test('abort error event', async () => {
const txAbortController = promiseWithResolvers<number | null>();
const error = new Error('Unexpected error');

const [txResult, count] = await Promise.allSettled([
connection.transaction(async (session, db, tx) => {
await db.collection('test').deleteOne({ id }, { session });

tx.on('abort', (session, db, err) => {
expect(Reflect.get(session.transaction, 'state')).toBe(
'TRANSACTION_ABORTED',
);
expect(err).toBe(error);

let count: number | null = null;
db.collection('test')
.countDocuments({})
.then((_count) => {
count = _count;
})
// eslint-disable-next-line no-console
.catch(console.error)
.finally(() => txAbortController.resolve(count));
});

throw error;
}),
txAbortController.promise,
]);

expect(txResult.status).toBe('rejected');
expect(count.status === 'fulfilled' && count.value).toBe(1);
});
});

/**
* @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/withResolvers#browser_compatibility
* TODO: use ES api when this project target Node.js >=v22
*/
function promiseWithResolvers<R>() {
let resolve: (value: R | PromiseLike<R>) => void;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
let reject: (reason: any) => void;
const promise = new Promise<R>((_resolve, _reject) => {
resolve = _resolve;
reject = _reject;
});

return {
// @ts-expect-error The Promise executor is synchronous
resolve,
// @ts-expect-error The Promise executor is synchronous
reject,
promise,
};
}

0 comments on commit 721fe35

Please sign in to comment.