Skip to content

Commit

Permalink
compat: two-step send
Browse files Browse the repository at this point in the history
  • Loading branch information
Mark committed Jul 8, 2024
1 parent b34058e commit 3e4ed1d
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 12 deletions.
24 changes: 23 additions & 1 deletion src/chan.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ describe('Chan', () => {
expect(readResult).toEqual(pushResult);

expect(ch.stat).toEqual({
readers: { peakLength: 0 },
readers: { peakLength: 1 },
// because we have only 1 writer and it is quicker than the reader
writers: { peakLength: 1 },
data: { peakLength: 5 },
Expand Down Expand Up @@ -183,4 +183,26 @@ describe('Chan', () => {
await ch.close();
await expect(ch.send(1)).rejects.toThrow(ClosedChanError);
});

describe('recv', () => {
it('receives data', async () => {
const ch = new Chan<number>();

// Schedule a send in the future.
(async () => {
await setTimeout(0);
ch.send(1);
})();

// Wait for the send to complete.
const result = await ch.recv();
expect(result).toEqual([1, true]);

await ch.close();

// Attempt to receive from a closed channel.
const result2 = await ch.recv();
expect(result2).toEqual([undefined, false]);
});
});
});
35 changes: 24 additions & 11 deletions src/chan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,35 @@ export class Chan<T> implements SendOnlyChan<T>, ReceiveOnlyChan<T> {

constructor(private bufferSize = Infinity) {}

async send(value: T) {
protected async _readySend(): Promise<void> {
if (this.#isClosed) {
throw new ClosedChanError();
}

if (this.#queue.length < this.bufferSize) {
if (this.#qReaders.length > 0) {
this.#qReaders.continue({ value, done: false });
} else {
this.#queue.push(value);
this.#countStats();
}
return;
} else {
return this.#qWriters.block();
}
}

protected _sendSync(value: T): boolean {
if (this.#isClosed) {
return false;
}

if (this.#qReaders.length > 0) {
this.#qReaders.continue({ value, done: false });
} else {
// Block the writer until we have room in the queue.
await this.#qWriters.block();
this.send(value);
this.#queue.push(value);
this.#countStats();
}
return true;
}

async send(value: T) {
await this._readySend();
if (!this._sendSync(value)) {
throw new ClosedChanError();
}
}

Expand Down
18 changes: 18 additions & 0 deletions src/compat-chan.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { Chan } from './chan';

/**
* Compatibility wrapper for Chan, where send operation is available as
* a combination of the asynchronous `readySend` and synchronous `sendImmediately`
*
* Used entirely as a replacement for the Fifo
* in @sweepbright/iter-helpers
*/
export class CompatChan<T> extends Chan<T> {
public readySend(): Promise<void> {
return this._readySend();
}

public sendSync(value: T): boolean {
return this._sendSync(value);
}
}
1 change: 1 addition & 0 deletions src/main.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './chan.js';
export * from './resolver-queue.js';
export * from './compat-chan.js';

0 comments on commit 3e4ed1d

Please sign in to comment.