From cf9f6b55a0a862e43be0f5a03c6acca71d1442a0 Mon Sep 17 00:00:00 2001 From: divdavem Date: Wed, 4 Dec 2024 11:43:37 +0100 Subject: [PATCH] Rewriting tansu with a signal-first approach (#139) --- .../js-reactivity-benchmarks/dynamic.bench.ts | 3 +- benchmarks/jsonArrayReporter.ts | 4 +- eslint.config.mjs | 3 +- src/index.spec.ts | 526 ++++++-- src/index.ts | 1103 ++--------------- src/internal/batch.ts | 76 ++ src/internal/equal.ts | 12 + src/internal/exposeRawStores.ts | 48 + src/internal/store.ts | 48 + src/internal/storeComputed.ts | 127 ++ src/internal/storeComputedOrDerived.ts | 83 ++ src/internal/storeConst.ts | 40 + src/internal/storeDerived.ts | 140 +++ src/internal/storeSubscribable.ts | 41 + src/internal/storeTrackingUsage.ts | 90 ++ src/internal/storeWithOnUse.ts | 28 + src/internal/storeWritable.ts | 162 +++ src/internal/subscribeConsumer.ts | 64 + src/internal/unsubscribe.ts | 19 + src/internal/untrack.ts | 31 + src/types.ts | 262 ++++ 21 files changed, 1826 insertions(+), 1084 deletions(-) create mode 100644 src/internal/batch.ts create mode 100644 src/internal/equal.ts create mode 100644 src/internal/exposeRawStores.ts create mode 100644 src/internal/store.ts create mode 100644 src/internal/storeComputed.ts create mode 100644 src/internal/storeComputedOrDerived.ts create mode 100644 src/internal/storeConst.ts create mode 100644 src/internal/storeDerived.ts create mode 100644 src/internal/storeSubscribable.ts create mode 100644 src/internal/storeTrackingUsage.ts create mode 100644 src/internal/storeWithOnUse.ts create mode 100644 src/internal/storeWritable.ts create mode 100644 src/internal/subscribeConsumer.ts create mode 100644 src/internal/unsubscribe.ts create mode 100644 src/internal/untrack.ts create mode 100644 src/types.ts diff --git a/benchmarks/js-reactivity-benchmarks/dynamic.bench.ts b/benchmarks/js-reactivity-benchmarks/dynamic.bench.ts index 3bf866d..536f0f3 100644 --- a/benchmarks/js-reactivity-benchmarks/dynamic.bench.ts +++ b/benchmarks/js-reactivity-benchmarks/dynamic.bench.ts @@ -309,8 +309,7 @@ const perfTests = [ for (const config of perfTests) { const { graph, counter } = makeGraph(config); - // FIXME: remove .skip when tansu is faster - bench.skip( + bench( `dynamic ${config.name}`, () => { counter.count = 0; diff --git a/benchmarks/jsonArrayReporter.ts b/benchmarks/jsonArrayReporter.ts index 1957ce4..045901a 100644 --- a/benchmarks/jsonArrayReporter.ts +++ b/benchmarks/jsonArrayReporter.ts @@ -1,5 +1,5 @@ -import { RunnerTestFile } from 'vitest'; -import { Reporter } from 'vitest/reporters'; +import type { RunnerTestFile } from 'vitest'; +import type { Reporter } from 'vitest/reporters'; import { writeFile } from 'fs/promises'; class JsonArrayReporter implements Reporter { diff --git a/eslint.config.mjs b/eslint.config.mjs index 069dcad..8ba8055 100644 --- a/eslint.config.mjs +++ b/eslint.config.mjs @@ -29,7 +29,8 @@ export default [ '@typescript-eslint/no-empty-function': 0, '@typescript-eslint/no-non-null-assertion': 0, '@typescript-eslint/explicit-module-boundary-types': 2, - '@typescript-eslint/no-unused-vars': 2, + '@typescript-eslint/no-unused-vars': ['error', { argsIgnorePattern: '^_' }], + '@typescript-eslint/consistent-type-imports': 2, }, }, ]; diff --git a/src/index.spec.ts b/src/index.spec.ts index 833ec75..829a696 100644 --- a/src/index.spec.ts +++ b/src/index.spec.ts @@ -3,30 +3,45 @@ import { Component, Injectable, inject } from '@angular/core'; import { TestBed } from '@angular/core/testing'; import { BehaviorSubject, from } from 'rxjs'; import { writable as svelteWritable } from 'svelte/store'; -import { describe, expect, it, vi } from 'vitest'; -import { - DerivedStore, +import { afterEach, describe, expect, it, vi } from 'vitest'; +import type { + OnUseArgument, Readable, ReadableSignal, - Store, StoreInput, StoreOptions, StoresInput, StoresInputValues, SubscribableStore, SubscriberObject, - asWritable, +} from './index'; +import { + DerivedStore, + Store, asReadable, - equal, + asWritable, batch, computed, derived, + equal, get, readable, symbolObservable, untrack, writable, } from './index'; +import { rawStoreSymbol } from './internal/exposeRawStores'; +import { RawStoreFlags } from './internal/store'; +import { flushUnused } from './internal/storeTrackingUsage'; +import type { RawStoreWritable } from './internal/storeWritable'; + +const expectCorrectlyCleanedUp = (store: StoreInput) => { + const rawStore = (store as any)[rawStoreSymbol] as RawStoreWritable; + expect(rawStore.consumerLinks.length).toBe(0); + expect(rawStore.flags & RawStoreFlags.START_USE_CALLED).toBeFalsy(); +}; + +afterEach(flushUnused); const customSimpleWritable = ( value: T @@ -152,6 +167,93 @@ describe('stores', () => { expect(store2()).toBe(1); }); + it('should throw when trying to read a signal during the notification phase', () => { + const store = writable(0); + let success = 0; + const errors: any[] = []; + const unsubscribe = store.subscribe({ + pause() { + try { + store.get(); + success++; + } catch (error) { + errors.push(error); + } + }, + }); + store.set(1); + expect(success).toBe(0); + expect(errors.length).toBe(1); + expect(errors[0].message).toContain('during the notification phase'); + unsubscribe(); + }); + + it('should throw when trying to read an up-to-date computed signal during the notification phase', () => { + const w1 = writable(0); + const s1 = computed(() => w1()); + s1(); + const store = writable(0); + let success = 0; + const errors: any[] = []; + const unsubscribe = store.subscribe({ + pause() { + try { + s1.get(); + success++; + } catch (error) { + errors.push(error); + } + }, + }); + store.set(1); + expect(success).toBe(0); + expect(errors.length).toBe(1); + expect(errors[0].message).toContain('during the notification phase'); + unsubscribe(); + }); + + it('should throw when trying to subscribe to a signal during the notification phase', () => { + const store = writable(0); + let success = 0; + const errors: any[] = []; + const unsubscribe = store.subscribe({ + pause() { + try { + store.subscribe(() => {}); + success++; + } catch (error) { + errors.push(error); + } + }, + }); + store.set(1); + expect(success).toBe(0); + expect(errors.length).toBe(1); + expect(errors[0].message).toContain('during the notification phase'); + unsubscribe(); + }); + + it('should throw when trying to write a signal during notification phase', () => { + const store = writable(0); + let success = 0; + const errors: any[] = []; + const unsubscribe = store.subscribe({ + pause() { + try { + store.set(2); + success++; + } catch (error) { + errors.push(error); + } + }, + }); + store.set(1); + expect(success).toBe(0); + expect(errors.length).toBe(1); + expect(errors[0].message).toContain('during the notification phase'); + unsubscribe(); + }); + it('should work to use subscribe only and use it in a computed', () => { const store1 = writable(0); const store2 = asReadable({ subscribe: store1.subscribe }); @@ -444,34 +546,6 @@ describe('stores', () => { unsubscribe(); }); - it('should not call again listeners when only resuming subscribers', () => { - class BasicStore extends Store { - public override pauseSubscribers(): void { - super.pauseSubscribers(); - } - public override resumeSubscribers(): void { - super.resumeSubscribers(); - } - public override set(value: object): void { - super.set(value); - } - } - const initialValue = {}; - const newValue = {}; - const store = new BasicStore(initialValue); - const calls: object[] = []; - const unsubscribe = store.subscribe((v) => calls.push(v)); - expect(calls.length).toBe(1); - expect(calls[0]).toBe(initialValue); - store.pauseSubscribers(); - store.resumeSubscribers(); - expect(calls.length).toBe(1); - store.set(newValue); - expect(calls.length).toBe(2); - expect(calls[1]).toBe(newValue); - unsubscribe(); - }); - it('asReadable should be compatible with rxjs (BehaviorSubject)', () => { const behaviorSubject = new BehaviorSubject(0); const store = asReadable(behaviorSubject); @@ -512,39 +586,6 @@ describe('stores', () => { expect(values).toEqual([0, 1]); }); - it('asReadable should not wrap its output subscribe function into a new wrapper when called again (BehaviorSubject)', () => { - const input = new BehaviorSubject(0); - const readable1 = asReadable(input); - const readable2 = asReadable(readable1); - expect(readable1.subscribe).toBe(readable2.subscribe); - }); - - it('asReadable should not wrap its output subscribe function into a new wrapper when called again (InteropObservable)', () => { - const b = new BehaviorSubject(1); - const input = { [symbolObservable]: () => b }; - const readable1 = asReadable(input); - const readable2 = asReadable(readable1); - expect(readable1.subscribe).toBe(readable2.subscribe); - }); - - it('asReadable should not wrap the readable (const store) subscribe function into a new wrapper', () => { - const readable1 = readable(5); - const readable2 = asReadable(readable1); - expect(readable1.subscribe).toBe(readable2.subscribe); - }); - - it('asReadable should not wrap the readable (with onUse) subscribe function into a new wrapper', () => { - const readable1 = readable(5, { onUse() {} }); - const readable2 = asReadable(readable1); - expect(readable1.subscribe).toBe(readable2.subscribe); - }); - - it('asReadable should not wrap the writable subscribe function into a new wrapper', () => { - const readable1 = writable(5); - const readable2 = asReadable(readable1); - expect(readable1.subscribe).toBe(readable2.subscribe); - }); - it('asReadable should work nicely as a return value of a function whose type is explicitly defined', () => { interface Counter extends ReadableSignal { increment(): void; @@ -723,6 +764,44 @@ describe('stores', () => { expect(one()).toBe(1); }); + it('should work to call a constant store in a derived', () => { + const a = readable(0); + const b = derived(a, (a) => a + 1); + expect(b()).toEqual(1); + const values: number[] = []; + const unsubscribe = b.subscribe((v) => values.push(v)); + expect(values).toEqual([1]); + unsubscribe(); + }); + + it('should work to call a constant store in a computed', () => { + const a = readable(0); + const b = computed(() => a() + 1); + expect(b()).toEqual(1); + const values: number[] = []; + const unsubscribe = b.subscribe((v) => values.push(v)); + expect(values).toEqual([1]); + unsubscribe(); + }); + + it('should work to call the subscribe method of a constant store with a function', () => { + const one = readable(1); + const values: number[] = []; + one.subscribe((value) => values.push(value)); + expect(values).toEqual([1]); + }); + + it('should work to call the subscribe method of a constant store with an object with a next method', () => { + const one = readable(1); + const values: number[] = []; + one.subscribe({ + next(value) { + values.push(value); + }, + }); + expect(values).toEqual([1]); + }); + it('should work to subscribe without a listener', () => { let used = 0; const a = readable(0, () => { @@ -874,7 +953,7 @@ describe('stores', () => { ]); }); - it('should be able to use destructuring', () => { + it('should be able to use destructuring (constant store)', () => { const store = readable(0); const { subscribe } = store; @@ -884,6 +963,52 @@ describe('stores', () => { unsubscribe(); }); + + it('should be able to use destructuring (non-constant store)', () => { + const store = readable(0, (set) => { + set(1); + }); + const { subscribe } = store; + + const values: Array = []; + const unsubscribe = subscribe((v) => values.push(v)); + expect(values).toEqual([1]); + + unsubscribe(); + }); + + it('should have no scope in the readable onUse and update functions', () => { + const scopes: any[] = []; + const a = readable(0, function (this: any, set) { + scopes.push(this); + set.update(function (this: any, v) { + scopes.push(this); + return v + 1; + }); + return function (this: any) { + scopes.push(this); + }; + }); + expect(a()).toBe(1); + expect(scopes).toEqual([undefined, undefined, undefined]); + }); + + it('should have no scope in the equal function', () => { + const scopes: any[] = []; + let set: OnUseArgument; + const a = readable(0, { + onUse(s) { + set = s; + }, + equal(a, b) { + scopes.push(this); + return Object.is(a, b); + }, + }); + expect(a()).toBe(0); + set!(1); + expect(scopes).toEqual([undefined]); + }); }); describe('writable', () => { @@ -1126,6 +1251,43 @@ describe('stores', () => { expect((readonlyStore as any).update).toBeUndefined(); expect(readonlyStore[Symbol.observable || '@@observable']()).toBe(readonlyStore); }); + + it('should have no scope in the writable onUse and update functions', () => { + const scopes: any[] = []; + const a = writable(0, function (this: any, set) { + scopes.push(this); + set.update(function (this: any, v) { + scopes.push(this); + return v + 1; + }); + return function (this: any) { + scopes.push(this); + }; + }); + expect(a()).toBe(1); + expect(scopes).toEqual([undefined, undefined, undefined]); + }); + + it('should have no scope in the equal function', () => { + const scopes: any[] = []; + const a = writable(0, { + equal(a, b) { + scopes.push(this); + return Object.is(a, b); + }, + }); + a.set(1); + expect(scopes).toEqual([undefined]); + }); + + it('should allow reading the store in onUse', () => { + const onUseValues: number[] = []; + const store = writable(0, () => { + onUseValues.push(store()); + }); + expect(store()).toBe(0); + expect(onUseValues).toEqual([0]); + }); }); describe('asWritable', () => { @@ -2089,26 +2251,22 @@ describe('stores', () => { unsubscribe(); }); - it('should work with a derived function that subscribes to itself', () => { + it('should throw if a derived function subscribes to itself', () => { const store = writable(0); - let derivedCalls = 0; - let innerUnsubscribe: undefined | (() => void); - const innerSubscriptionCalls: any[] = []; const derivedStore = derived(store, (value) => { - derivedCalls++; - if (!innerUnsubscribe) { - // the first call of the listener should contain undefined as the value is not yet computed - innerUnsubscribe = derivedStore.subscribe((value) => innerSubscriptionCalls.push(value)); - } + // calling subscribe here should throw a "recursive computed" error + derivedStore.subscribe(() => {}); return value; }); - const calls: number[] = []; - const unsubscribe = derivedStore.subscribe((n: number) => calls.push(n)); - expect(derivedCalls).toBe(1); - expect(innerSubscriptionCalls).toEqual([undefined, 0]); - expect(calls).toEqual([0]); - unsubscribe(); - innerUnsubscribe!(); + expect(() => { + derivedStore.subscribe(() => {}); + }).toThrow('recursive computed'); + }); + + it('should throw when reading a derived that calls itself', () => { + const store = writable(0); + const c = derived(store, (): number => c()); + expect(c).toThrowError('recursive computed'); }); it('should work with a basic switchMap', () => { @@ -2137,7 +2295,8 @@ describe('stores', () => { const b = writable(2); const c = writable(0); const spy = vi.spyOn(a, 'subscribe'); - const d = switchMap(c, (c) => (c % 2 === 0 ? a : b)); + const aWithSpy = { subscribe: a.subscribe }; + const d = switchMap(c, (c) => (c % 2 === 0 ? aWithSpy : b)); const values: number[] = []; const unsubscribe = d.subscribe((value) => values.push(value)); expect(spy).toHaveBeenCalledTimes(1); @@ -2197,6 +2356,42 @@ describe('stores', () => { expect(a).toEqual([1, 2, 1, 5, 6]); unsubscribe(); }); + + it('should have no scope in the sync derived function', () => { + const scopes: any[] = []; + const a = writable(0); + const b = derived( + a, + function (this: any, a) { + scopes.push(this); + return a + 1; + }, + 0 + ); + expect(b()).toBe(1); + expect(scopes).toEqual([undefined]); + }); + + it('should have no scope in the async derived functions', () => { + const scopes: any[] = []; + const a = writable(0); + const b = derived( + a, + function (this: any, a, set) { + scopes.push(this); + set.update(function (this: any, v) { + scopes.push(this); + return v + a + 1; + }); + return function (this: any) { + scopes.push(this); + }; + }, + 0 + ); + expect(b()).toBe(1); + expect(scopes).toEqual([undefined, undefined, undefined]); + }); }); describe('batch', () => { @@ -2829,6 +3024,19 @@ describe('stores', () => { }); describe('computed', () => { + it('should work with a basic store class', () => { + class CounterStore extends Store { + increase() { + this.update((value) => value + 1); + } + } + const store = new CounterStore(0); + const doubleStore = computed(() => store.get() * 2); + expect(doubleStore()).toBe(0); + store.increase(); + expect(doubleStore()).toBe(2); + }); + it('should not call equal with undefined during the first computation', () => { const a = writable(1); const equal = vi.fn(Object.is); @@ -2899,6 +3107,29 @@ describe('stores', () => { expect(cHasListeners).toBe(false); }); + it('should not re-subscribe to stores that should no longer be used', () => { + const events: string[] = []; + const a = writable(true); + const b = writable(0, () => { + events.push('b used'); + return () => { + events.push('b unused'); + }; + }); + const c = writable(1, () => { + events.push('c used'); + return () => { + events.push('c unused'); + }; + }); + const d = computed(() => (a() ? b() : c())); + expect(d()).toBe(0); + events.push('changing a'); + a.set(false); + expect(d()).toBe(1); + expect(events).toEqual(['b used', 'b unused', 'changing a', 'c used', 'c unused']); + }); + it('should not recompute if an untracked store changed', () => { const a = writable(1); const b = writable(2); @@ -2990,6 +3221,11 @@ describe('stores', () => { expect(values).toEqual([]); }); + it('should throw when reading a computed that calls itself in untracked', () => { + const c = computed((): number => untrack(c)); + expect(c).toThrowError('recursive computed'); + }); + it('should throw when setting a value that triggers a recursive computed', () => { const recursive = writable(false); const myValue = computed((): number => (recursive() ? myValue() : 0)); @@ -3201,5 +3437,125 @@ describe('stores', () => { bUnsubscribe(); cUnsubscribe(); }); + + it('should prevent the diamond dependency problem', () => { + const a = writable(0); + const b = computed(() => `b${a()}`); + const c = computed(() => `c${a()}`); + const dFn = vi.fn(() => `${b()}${c()}`); + const d = computed(dFn); + + const values: string[] = []; + + const unsubscribe = d.subscribe((value) => { + values.push(value); + }); + expect(dFn).toHaveBeenCalledTimes(1); + expect(values).toEqual(['b0c0']); + a.set(1); + expect(dFn).toHaveBeenCalledTimes(2); + expect(values).toEqual(['b0c0', 'b1c1']); + unsubscribe(); + }); + + it('should prevent the asymmetric diamond dependency problem', () => { + const a = writable(0); + const b = computed(() => `b${a()}`); + const cFn = vi.fn(() => `${a()}${b()}`); + const c = computed(cFn); + + const values: string[] = []; + + const unsubscribe = c.subscribe((value) => { + values.push(value); + }); + expect(cFn).toHaveBeenCalledTimes(1); + expect(values).toEqual(['0b0']); + a.set(1); + expect(cFn).toHaveBeenCalledTimes(2); + expect(values).toEqual(['0b0', '1b1']); + unsubscribe(); + }); + + it('should call the function with no scope', () => { + let calls = 0; + let scope: any; + const c = computed(function (this: any) { + calls++; + // eslint-disable-next-line @typescript-eslint/no-this-alias + scope = this; + }); + c(); + expect(calls).toBe(1); + expect(scope).toBe(undefined); + }); + + it('should have no scope in the equal function', () => { + const scopes: any[] = []; + const a = writable(0); + const b = computed(() => a() + 1, { + equal(a, b) { + scopes.push(this); + return Object.is(a, b); + }, + }); + expect(b()).toBe(1); + a.set(1); + expect(b()).toBe(2); + expect(scopes).toEqual([undefined]); + }); + + it('should correctly register and clean-up consumers (several clean-up)', async () => { + const store = writable(0); + const doubleStore = computed(() => store() * 2); + expect(doubleStore()).toEqual(0); + await Promise.resolve(0); + expectCorrectlyCleanedUp(store); + expectCorrectlyCleanedUp(doubleStore); + const values: number[] = []; + const unsubscribe = doubleStore.subscribe((value) => values.push(value)); + expect(values).toEqual([0]); + await Promise.resolve(0); + store.set(1); + expect(values).toEqual([0, 2]); + unsubscribe(); + await Promise.resolve(0); + expectCorrectlyCleanedUp(store); + expectCorrectlyCleanedUp(doubleStore); + expect(doubleStore()).toEqual(2); + await Promise.resolve(0); + expectCorrectlyCleanedUp(store); + expectCorrectlyCleanedUp(doubleStore); + }); + + it('should correctly register and clean-up consumers (one clean-up at the end)', async () => { + const store = writable(0); + const doubleStore = computed(() => store() * 2); + expect(doubleStore()).toEqual(0); + const values: number[] = []; + const unsubscribe = doubleStore.subscribe((value) => values.push(value)); + expect(values).toEqual([0]); + await Promise.resolve(0); + store.set(1); + expect(values).toEqual([0, 2]); + unsubscribe(); + expect(doubleStore()).toEqual(2); + await Promise.resolve(0); + expectCorrectlyCleanedUp(store); + expectCorrectlyCleanedUp(doubleStore); + }); + + it('should correctly register and clean-up multiple levels of consumers', async () => { + const store = writable(0); + const doubleStore = computed(() => store() * 2); + const doubleDoubleStore = computed(() => doubleStore() * 2); + const doubleDoubleDoubleStore = computed(() => doubleDoubleStore() * 2); + expect(doubleDoubleDoubleStore()).toEqual(0); + await Promise.resolve(0); + expectCorrectlyCleanedUp(store); + expectCorrectlyCleanedUp(doubleStore); + expectCorrectlyCleanedUp(doubleDoubleStore); + expectCorrectlyCleanedUp(doubleDoubleDoubleStore); + }); }); }); diff --git a/src/index.ts b/src/index.ts index e76bbf8..6f2066e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -5,236 +5,51 @@ * @packageDocumentation */ -declare global { - interface SymbolConstructor { - readonly observable: symbol; - } -} - -/** - * Symbol used in {@link InteropObservable} allowing any object to expose an observable. - */ -export const symbolObservable: typeof Symbol.observable = - (typeof Symbol === 'function' && Symbol.observable) || ('@@observable' as any); - -const oldSubscription = Symbol(); - -/** - * A callback invoked when a store value changes. It is called with the latest value of a given store. - */ -export type SubscriberFunction = ((value: T) => void) & - Partial, 'next'>>; - -/** - * A partial {@link https://github.com/tc39/proposal-observable#api | observer} notified when a store value changes. A store will call the {@link SubscriberObject.next | next} method every time the store's state is changing. - */ -export interface SubscriberObject { - /** - * A store will call this method every time the store's state is changing. - */ - next: SubscriberFunction; - /** - * Unused, only declared for compatibility with rxjs. - */ - error?: any; - /** - * Unused, only declared for compatibility with rxjs. - */ - complete?: any; - /** - * A store will call this method when it knows that the value will be changed. - * A call to this method will be followed by a call to {@link SubscriberObject.next | next} or to {@link SubscriberObject.resume | resume}. - */ - pause: () => void; - /** - * A store will call this method if {@link SubscriberObject.pause | pause} was called previously - * and the value finally did not need to change. - */ - resume: () => void; - /** - * @internal - * Value returned from a previous call to subscribe, and corresponding to a subscription to resume. - * This subscription must no longer be active. The new subscriber will not be called synchronously if - * the value did not change compared to the last value received in this old subscription. - */ - [oldSubscription]?: Unsubscriber; -} - -interface PrivateSubscriberObject extends Omit, typeof oldSubscription> { - _value: T; - _valueIndex: number; - _paused: boolean; -} - -/** - * Expresses interest in store value changes over time. It can be either: - * - a callback function: {@link SubscriberFunction}; - * - a partial observer: {@link SubscriberObject}. - */ -export type Subscriber = SubscriberFunction | Partial> | null | undefined; - -/** - * A function to unsubscribe from value change notifications. - */ -export type UnsubscribeFunction = () => void; - -/** - * An object with the `unsubscribe` method. - * Subscribable stores might choose to return such object instead of directly returning {@link UnsubscribeFunction} from a subscription call. - */ -export interface UnsubscribeObject { - /** - * A method that acts as the {@link UnsubscribeFunction}. - */ - unsubscribe: UnsubscribeFunction; -} - -export type Unsubscriber = UnsubscribeObject | UnsubscribeFunction; - -/** - * Represents a store accepting registrations (subscribers) and "pushing" notifications on each and every store value change. - */ -export interface SubscribableStore { - /** - * A method that makes it possible to register "interest" in store value changes over time. - * It is called each and every time the store's value changes. - * - * A registered subscriber is notified synchronously with the latest store value. - * - * @param subscriber - a subscriber in a form of a {@link SubscriberFunction} or a {@link SubscriberObject}. Returns a {@link Unsubscriber} (function or object with the `unsubscribe` method) that can be used to unregister and stop receiving notifications of store value changes. - * @returns The {@link UnsubscribeFunction} or {@link UnsubscribeObject} that can be used to unsubscribe (stop state change notifications). - */ - subscribe(subscriber: Subscriber): Unsubscriber; -} - -/** - * An interface for interoperability between observable implementations. It only has to expose the `[Symbol.observable]` method that is supposed to return a subscribable store. - */ -export interface InteropObservable { - [Symbol.observable]: () => SubscribableStore; -} - -/** - * Valid types that can be considered as a store. - */ -export type StoreInput = SubscribableStore | InteropObservable; - -/** - * This interface augments the base {@link SubscribableStore} interface by requiring the return value of the subscribe method to be both a function and an object with the `unsubscribe` method. - * - * For {@link https://rxjs.dev/api/index/interface/InteropObservable | interoperability with rxjs}, it also implements the `[Symbol.observable]` method. - */ -export interface Readable extends SubscribableStore, InteropObservable { - subscribe(subscriber: Subscriber): UnsubscribeFunction & UnsubscribeObject; - [Symbol.observable](): Readable; -} - -/** - * This interface augments the base {@link Readable} interface by adding the ability to call the store as a function to get its value. - */ -export interface ReadableSignal extends Readable { - /** - * Returns the value of the store. This is a shortcut for calling {@link get} with the store. - */ - (): T; -} - -/** - * A function that can be used to update store's value. This function is called with the current value and should return new store value. - */ -export type Updater = (value: T) => U; - -/** - * Builds on top of {@link Readable} and represents a store that can be manipulated from "outside": anyone with a reference to writable store can either update or completely replace state of a given store. - * - * @example - * - * ```typescript - * // reset counter's store value to 0 by using the {@link Writable.set} method - * counterStore.set(0); - * - * // increment counter's store value by using the {@link Writable.update} method - * counterStore.update(currentValue => currentValue + 1); - * ``` - */ -export interface Writable extends Readable { - /** - * Replaces store's state with the provided value. - * @param value - value to be used as the new state of a store. - */ - set(value: U): void; - - /** - * Updates store's state by using an {@link Updater} function. - * @param updater - a function that takes the current state as an argument and returns the new state. - */ - update(updater: Updater): void; -} - -/** - * Represents a store that implements both {@link ReadableSignal} and {@link Writable}. - * This is the type of objects returned by {@link writable}. - */ -export interface WritableSignal extends ReadableSignal, Writable {} - -const noop = () => {}; - -const noopUnsubscribe = () => {}; -noopUnsubscribe.unsubscribe = noopUnsubscribe; - -const bind = (object: T | null | undefined, fnName: keyof T) => { - const fn = object ? object[fnName] : null; - return typeof fn === 'function' ? fn.bind(object) : noop; -}; - -const toSubscriberObject = (subscriber: Subscriber): PrivateSubscriberObject => ({ - next: typeof subscriber === 'function' ? subscriber.bind(null) : bind(subscriber, 'next'), - pause: bind(subscriber, 'pause'), - resume: bind(subscriber, 'resume'), - _value: undefined as any, - _valueIndex: 0, - _paused: false, -}); - -const returnThis = function (this: T): T { - return this; -}; - -const normalizeUnsubscribe = ( - unsubscribe: Unsubscriber | void | null | undefined -): UnsubscribeFunction & UnsubscribeObject => { - if (!unsubscribe) { - return noopUnsubscribe; - } - if ((unsubscribe as any).unsubscribe === unsubscribe) { - return unsubscribe as any; - } - const res: any = - typeof unsubscribe === 'function' ? () => unsubscribe() : () => unsubscribe.unsubscribe(); - res.unsubscribe = res; - return res; -}; - -const normalizedSubscribe = new WeakSet['subscribe']>(); -const normalizeSubscribe = (store: SubscribableStore): Readable['subscribe'] => { - let res: Readable['subscribe'] = store.subscribe as any; - if (!normalizedSubscribe.has(res)) { - res = (...args: [Subscriber]) => normalizeUnsubscribe(store.subscribe(...args)); - normalizedSubscribe.add(res); - } - return res; -}; - -const getNormalizedSubscribe = (input: StoreInput) => { - const store = 'subscribe' in input ? input : input[symbolObservable](); - return normalizeSubscribe(store); -}; - -const getValue = (subscribe: Readable['subscribe']): T => { - let value: T; - subscribe((v) => (value = v))(); - return value!; -}; +import { equal } from './internal/equal'; +import { + exposeRawStore, + getRawStore, + rawStoreSymbol, + symbolObservable, +} from './internal/exposeRawStores'; +import { RawStoreComputed } from './internal/storeComputed'; +import { RawStoreConst } from './internal/storeConst'; +import { + createOnUseArg, + RawStoreAsyncDerived, + RawStoreDerivedStore, + RawStoreSyncDerived, +} from './internal/storeDerived'; +import { RawStoreWithOnUse } from './internal/storeWithOnUse'; +import { RawStoreWritable } from './internal/storeWritable'; +import { noop } from './internal/subscribeConsumer'; +import { untrack } from './internal/untrack'; +import type { + AsyncDeriveFn, + AsyncDeriveOptions, + OnUseFn, + Readable, + ReadableSignal, + StoreInput, + StoreOptions, + StoresInput, + StoresInputValues, + Subscriber, + SyncDeriveFn, + SyncDeriveOptions, + UnsubscribeFunction, + UnsubscribeObject, + Unsubscriber, + Updater, + Writable, + WritableSignal, +} from './types'; + +export { batch } from './internal/batch'; +export { equal } from './internal/equal'; +export { symbolObservable } from './internal/exposeRawStores'; +export { untrack } from './internal/untrack'; +export type * from './types'; /** * Returns a wrapper (for the given store) which only exposes the {@link ReadableSignal} interface. @@ -260,16 +75,11 @@ export function asReadable( store: StoreInput, extraProp?: U ): ReadableSignal & Omit> { - const subscribe = getNormalizedSubscribe(store); - const res = Object.assign(() => get(res), extraProp, { - subscribe, - [symbolObservable]: returnThis, - }); - return res; + return exposeRawStore(getRawStore(store), extraProp); } const defaultUpdate: any = function (this: Writable, updater: Updater) { - this.set(updater(untrack(() => get(this)))); + this.set(updater(untrack(() => this.get()))); }; /** @@ -322,87 +132,6 @@ export function asWritable( ) as any; } -const triggerUpdate = Symbol(); -const queueProcess = Symbol(); -let willProcessQueue = false; -const queue = new Set<{ [queueProcess](): void }>(); - -const MAX_STORE_PROCESSING_IN_QUEUE = 1000; -const checkIterations = (iterations: number) => { - if (iterations > MAX_STORE_PROCESSING_IN_QUEUE) { - throw new Error('reached maximum number of store changes in one shot'); - } -}; - -/** - * Batches multiple changes to stores while calling the provided function, - * preventing derived stores from updating until the function returns, - * to avoid unnecessary recomputations. - * - * @remarks - * - * If a store is updated multiple times in the provided function, existing - * subscribers of that store will only be called once when the provided - * function returns. - * - * Note that even though the computation of derived stores is delayed in most - * cases, some computations of derived stores will still occur inside - * the function provided to batch if a new subscriber is added to a store, because - * calling {@link SubscribableStore.subscribe | subscribe} always triggers a - * synchronous call of the subscriber and because tansu always provides up-to-date - * values when calling subscribers. Especially, calling {@link get} on a store will - * always return the correct up-to-date value and can trigger derived store - * intermediate computations, even inside batch. - * - * It is possible to have nested calls of batch, in which case only the first - * (outer) call has an effect, inner calls only call the provided function. - * - * @param fn - a function that can update stores. Its returned value is - * returned by the batch function. - * - * @example - * Using batch in the following example prevents logging the intermediate "Sherlock Lupin" value. - * - * ```typescript - * const firstName = writable('Arsène'); - * const lastName = writable('Lupin'); - * const fullName = derived([firstName, lastName], ([a, b]) => `${a} ${b}`); - * fullName.subscribe((name) => console.log(name)); // logs any change to fullName - * batch(() => { - * firstName.set('Sherlock'); - * lastName.set('Holmes'); - * }); - * ``` - */ -export const batch = (fn: () => T): T => { - const needsProcessQueue = !willProcessQueue; - if (needsProcessQueue) { - willProcessQueue = true; - } - try { - return fn(); - } finally { - if (needsProcessQueue) { - try { - const storePasses = new Map<{ [queueProcess](): void }, number>(); - for (const store of queue) { - const storeCount = storePasses.get(store) ?? 0; - checkIterations(storeCount); - storePasses.set(store, storeCount + 1); - queue.delete(store); - store[queueProcess](); - } - } finally { - queue.clear(); - willProcessQueue = false; - } - } - } -}; - -const defaultReactiveContext = (store: StoreInput) => getValue(getNormalizedSubscribe(store)); -let reactiveContext = defaultReactiveContext; - /** * A utility function to get the current value from a given store. * It works by subscribing to a store, capturing the value (synchronously) and unsubscribing just after. @@ -415,29 +144,7 @@ let reactiveContext = defaultReactiveContext; * console.log(get(myStore)); // logs 1 * ``` */ -export const get = (store: StoreInput): T => reactiveContext(store); - -const createEqualCache = (valueIndex: number): Record => ({ - [valueIndex]: true, // the subscriber already has the last value - [valueIndex - 1]: false, // the subscriber had the previous value, - // which is known to be different because equal is called in the set method - 0: false, // the subscriber never received any value -}); - -const skipEqualInSet = Symbol(); - -/** - * Default implementation of the equal function used by tansu when a store - * changes, to know if listeners need to be notified. - * Returns false if `a` is a function or an object, or if `a` and `b` - * are different according to `Object.is`. Otherwise, returns true. - * - * @param a - First value to compare. - * @param b - Second value to compare. - * @returns true if a and b are considered equal. - */ -export const equal = (a: T, b: T): boolean => - Object.is(a, b) && (!a || typeof a !== 'object') && typeof a !== 'function'; +export const get = (store: StoreInput): T => getRawStore(store).get(); /** * Base class that can be extended to easily create a custom {@link Readable} store. @@ -471,76 +178,26 @@ export const equal = (a: T, b: T): boolean => * ``` */ export abstract class Store implements Readable { - #subscribers = new Set>(); - #cleanupFn: null | UnsubscribeFunction = null; - #subscribersPaused = false; - #valueIndex = 1; - #value: T; - #equalCache = createEqualCache(1); - #oldSubscriptions = new WeakMap>(); - - private [skipEqualInSet] = false; - /** * * @param value - Initial value of the store */ constructor(value: T) { - this.#value = value; - } - - #start() { - this.#cleanupFn = normalizeUnsubscribe(this.onUse()); - } - - #stop() { - const cleanupFn = this.#cleanupFn; - if (cleanupFn) { - this.#cleanupFn = null; - cleanupFn(); - } - } - - private [queueProcess](): void { - const valueIndex = this.#valueIndex; - for (const subscriber of [...this.#subscribers]) { - if (this.#subscribersPaused || this.#valueIndex !== valueIndex) { - // the value of the store can change while notifying subscribers - // in that case, let's just stop notifying subscribers - // they will be called later through another queueProcess call - // with the correct final value and in the right order - return; - } - if (subscriber._valueIndex === 0) { - // ignore subscribers which were not yet called synchronously - continue; - } - this.#notifySubscriber(subscriber); + let rawStore; + if (value instanceof RawStoreWritable) { + rawStore = value; + } else { + const onUse = this.onUse; + rawStore = onUse + ? new RawStoreWithOnUse(value, onUse.bind(this)) + : new RawStoreWritable(value); + rawStore.equalFn = (a, b) => this.equal(a, b); } + this[rawStoreSymbol] = rawStore; } /** @internal */ - protected [triggerUpdate](): void {} - - #notifySubscriber(subscriber: PrivateSubscriberObject): void { - const equalCache = this.#equalCache; - const valueIndex = this.#valueIndex; - const value = this.#value; - let equal = equalCache[subscriber._valueIndex]; - if (equal == null) { - equal = !!this.equal(subscriber._value, value); - equalCache[subscriber._valueIndex] = equal; - } - subscriber._valueIndex = valueIndex; - if (!equal) { - subscriber._value = value; - subscriber._paused = false; - subscriber.next(value); - } else if (!this.#subscribersPaused && subscriber._paused) { - subscriber._paused = false; - subscriber.resume(); - } - } + [rawStoreSymbol]: RawStoreWritable; /** * Compares two values and returns true if they are equal. @@ -585,54 +242,6 @@ export abstract class Store implements Readable { return !equal(a, b); } - /** - * Puts the store in the paused state, which means it will soon update its value. - * - * @remarks - * - * The paused state prevents derived or computed stores (both direct and transitive) from recomputing their value - * using the current value of this store. - * - * There are two ways to put a store back into its normal state: calling {@link Store.set | set} to set a new - * value or calling {@link Store.resumeSubscribers | resumeSubscribers} to declare that finally the value does not need to be - * changed. - * - * Note that a store should not stay in the paused state for a long time, and most of the time - * it is not needed to call pauseSubscribers or resumeSubscribers manually. - * - */ - protected pauseSubscribers(): void { - if (!this.#subscribersPaused) { - this.#subscribersPaused = true; - queue.delete(this as any); - for (const subscriber of [...this.#subscribers]) { - if (subscriber._valueIndex === 0 || subscriber._paused) { - // ignore subscribers which were not yet called synchronously or are already paused - continue; - } - subscriber._paused = true; - subscriber.pause(); - } - } - } - - /** - * Puts the store back to the normal state without changing its value, if it was in the paused state - * (cf {@link Store.pauseSubscribers | pauseSubscribers}). - * - * @remarks - * - * Does nothing if the store was not in the paused state. - */ - protected resumeSubscribers(): void { - if (this.#subscribersPaused) { - this.#subscribersPaused = false; - batch(() => { - queue.add(this as any); - }); - } - } - /** * Replaces store's state with the provided value. * Equivalent of {@link Writable.set}, but internal to the store. @@ -640,18 +249,11 @@ export abstract class Store implements Readable { * @param value - value to be used as the new state of a store. */ protected set(value: T): void { - const skipEqual = this[skipEqualInSet]; - if (skipEqual || !this.equal(this.#value, value)) { - const valueIndex = this.#valueIndex + 1; - this.#valueIndex = valueIndex; - this.#value = value; - this.#equalCache = createEqualCache(valueIndex); - if (skipEqual) { - delete this.#equalCache[valueIndex - 1]; - } - this.pauseSubscribers(); - } - this.resumeSubscribers(); + this[rawStoreSymbol].set(value); + } + + get(): T { + return this[rawStoreSymbol].get(); } /** @@ -661,7 +263,7 @@ export abstract class Store implements Readable { * @param updater - a function that takes the current state as an argument and returns the new state. */ protected update(updater: Updater): void { - this.set(updater(this.#value)); + this[rawStoreSymbol].update(updater); } /** @@ -688,50 +290,14 @@ export abstract class Store implements Readable { * unsubscribe2(); // logs 'All subscribers are gone...' * ``` */ - protected onUse(): Unsubscriber | void {} + protected onUse?(): Unsubscriber | void; /** * Default Implementation of the {@link SubscribableStore.subscribe}, not meant to be overridden. * @param subscriber - see {@link SubscribableStore.subscribe} */ subscribe(subscriber: Subscriber): UnsubscribeFunction & UnsubscribeObject { - const subscriberObject = toSubscriberObject(subscriber); - const oldSubscriptionParam = subscriber?.[oldSubscription]; - if (oldSubscriptionParam) { - const oldSubscriberObject = this.#oldSubscriptions.get(oldSubscriptionParam); - if (oldSubscriberObject) { - subscriberObject._value = oldSubscriberObject._value; - subscriberObject._valueIndex = oldSubscriberObject._valueIndex; - } - } - this.#subscribers.add(subscriberObject); - batch(() => { - if (this.#subscribers.size == 1) { - this.#start(); - } else { - this[triggerUpdate](); - } - }); - this.#notifySubscriber(subscriberObject); - - const unsubscribe = () => { - const removed = this.#subscribers.delete(subscriberObject); - subscriberObject.next = noop; - subscriberObject.pause = noop; - subscriberObject.resume = noop; - if (removed) { - this.#oldSubscriptions.set(unsubscribe, subscriberObject); - if (this.#subscribers.size === 0) { - this.#stop(); - } - } - }; - (unsubscribe as any)[triggerUpdate] = () => { - this[triggerUpdate](); - this.#notifySubscriber(subscriberObject); - }; - unsubscribe.unsubscribe = unsubscribe; - return unsubscribe; + return this[rawStoreSymbol].subscribe(subscriber); } [symbolObservable](): this { @@ -739,124 +305,23 @@ export abstract class Store implements Readable { } } -export interface OnUseArgument { - (value: T): void; - set: (value: T) => void; - update: (updater: Updater) => void; -} - -/** - * Type of a function that is called when the number of subscribers changes from 0 to 1 - * (but not called when the number of subscribers changes from 1 to 2, ...). - * - * If it returns a function, that function will be called when the number of subscribers changes from 1 to 0. - */ -export type OnUseFn = (arg: OnUseArgument) => void | Unsubscriber; - -/** - * Store options that can be passed to {@link readable} or {@link writable}. - */ -export interface StoreOptions { - /** - * A function that is called when the number of subscribers changes from 0 to 1 - * (but not called when the number of subscribers changes from 1 to 2, ...). - * If it returns a function, that function will be called when the number of subscribers changes from 1 to 0. - */ - onUse?: OnUseFn; - - /** - * Custom function to compare two values, that should return true if they - * are equal. - * - * It is called when setting a new value to avoid doing anything - * (such as notifying subscribers) if the value did not change. - * - * @remarks - * The default logic (when this option is not present) is to return false - * if `a` is a function or an object, or if `a` and `b` are different - * according to `Object.is`. - * - * {@link StoreOptions.equal|equal} takes precedence over {@link StoreOptions.notEqual|notEqual} if both - * are defined. - * - * @param a - First value to compare. - * @param b - Second value to compare. - * @returns true if a and b are considered equal. - */ - equal?: (a: T, b: T) => boolean; - - /** - * Custom function to compare two values, that should return true if they - * are different. - * - * It is called when setting a new value to avoid doing anything - * (such as notifying subscribers) if the value did not change. - * - * @remarks - * The default logic (when this option is not present) is to return true - * if `a` is a function or an object, or if `a` and `b` are different - * according to `Object.is`. - * - * {@link StoreOptions.equal} takes precedence over {@link StoreOptions.notEqual|notEqual} if both - * are defined. - * - * @deprecated Use {@link StoreOptions.equal} instead - * @param a - First value to compare. - * @param b - Second value to compare. - * @returns true if a and b are considered different. - */ - notEqual?: (a: T, b: T) => boolean; -} +const createStoreWithOnUse = (initValue: T, onUse: OnUseFn) => { + const store: RawStoreWithOnUse = new RawStoreWithOnUse(initValue, () => onUse(setFn)); + const setFn = createOnUseArg(store); + return store; +}; -/** - * A convenience function to create an optimized constant store (i.e. which never changes - * its value). It does not keep track of its subscribers. - * @param value - value of the store, which will never change - */ -function constStore(value: T): ReadableSignal { - const subscribe = (subscriber: Subscriber) => { - if (!subscriber?.[oldSubscription]) { - toSubscriberObject(subscriber).next(value); +const applyStoreOptions = >( + store: S, + options?: Omit, 'onUse'> +): S => { + if (options) { + const { equal, notEqual } = options; + if (equal) { + store.equalFn = equal; + } else if (notEqual) { + store.equalFn = (a: T, b: T) => !notEqual(a, b); } - return noopUnsubscribe; - }; - normalizedSubscribe.add(subscribe); - return Object.assign(() => value, { subscribe, [symbolObservable]: returnThis }); -} - -class WritableStore extends Store implements Writable { - constructor(value: T) { - super(value); - } - - override set(value: T): void { - super.set(value); - } - - override update(updater: Updater) { - super.update(updater); - } -} - -const applyStoreOptions = >(store: S, options: StoreOptions): S => { - const { onUse, equal, notEqual } = options; - if (onUse) { - (store as any).onUse = function (this: Store) { - const setFn = (v: T) => this.set(v); - setFn.set = setFn; - setFn.update = (updater: Updater) => this.update(updater); - return onUse(setFn); - }; - } - if (equal) { - (store as any).equal = function (this: Store, a: T, b: T) { - return equal(a, b); - }; - } - if (notEqual) { - (store as any).notEqual = function (this: Store, a: T, b: T) { - return notEqual(a, b); - }; } return store; }; @@ -883,18 +348,16 @@ const applyStoreOptions = >(store: S, options: StoreOption * }); * ``` */ -export function readable( - value: T, - options: StoreOptions | OnUseFn = {} -): ReadableSignal { +export function readable(value: T, options?: StoreOptions | OnUseFn): ReadableSignal { if (typeof options === 'function') { options = { onUse: options }; } - if (!options.onUse) { - // special optimized case - return constStore(value); - } - return asReadable(applyStoreOptions(new WritableStore(value), options)); + const onUse = options?.onUse; + return exposeRawStore( + onUse + ? applyStoreOptions(createStoreWithOnUse(value, onUse), options) + : new RawStoreConst(value) + ); } /** @@ -915,52 +378,21 @@ export function readable( * x.set(0); // reset back to the default value * ``` */ -export function writable( - value: T, - options: StoreOptions | OnUseFn = {} -): WritableSignal { +export function writable(value: T, options?: StoreOptions | OnUseFn): WritableSignal { if (typeof options === 'function') { options = { onUse: options }; } - const store = applyStoreOptions(new WritableStore(value), options); - return asReadable, 'set' | 'update'>>(store, { - set: store.set.bind(store), - update: store.update.bind(store), - }); + const onUse = options?.onUse; + const store = applyStoreOptions( + onUse ? createStoreWithOnUse(value, onUse) : new RawStoreWritable(value), + options + ); + const res = exposeRawStore(store) as any; + res.set = store.set.bind(store); + res.update = store.update.bind(store); + return res; } -/** - * Either a single {@link StoreInput} or a read-only array of at least one {@link StoreInput}. - */ -export type StoresInput = StoreInput | readonly [StoreInput, ...StoreInput[]]; - -/** - * Extracts the types of the values of the stores from a type extending {@link StoresInput}. - * - * @remarks - * - * If the type given as a parameter is a single {@link StoreInput}, the type of the value - * of that {@link StoreInput} is returned. - * - * If the type given as a parameter is one of an array of {@link StoreInput}, the returned type - * is the type of an array containing the value of each store in the same order. - */ -export type StoresInputValues = - S extends StoreInput - ? T - : { [K in keyof S]: S[K] extends StoreInput ? T : never }; - -export type SyncDeriveFn = (values: StoresInputValues) => T; -export interface SyncDeriveOptions extends Omit, 'onUse'> { - derive: SyncDeriveFn; -} -export type AsyncDeriveFn = ( - values: StoresInputValues, - set: OnUseArgument -) => Unsubscriber | void; -export interface AsyncDeriveOptions extends Omit, 'onUse'> { - derive: AsyncDeriveFn; -} type DeriveFn = SyncDeriveFn | AsyncDeriveFn; interface DeriveOptions extends Omit, 'onUse'> { derive: DeriveFn; @@ -969,109 +401,14 @@ function isSyncDeriveFn(fn: DeriveFn): fn is SyncDeriveFn { return fn.length <= 1; } -const callFn = (fn: () => void) => fn(); - export abstract class DerivedStore extends Store { - readonly #isArray: boolean; - readonly #storesSubscribeFn: Readable['subscribe'][]; - #pending = 0; - constructor(stores: S, initialValue: T) { - super(initialValue); - const isArray = Array.isArray(stores); - this.#isArray = isArray; - this.#storesSubscribeFn = (isArray ? [...stores] : [stores]).map(getNormalizedSubscribe); - } - - protected override resumeSubscribers(): void { - if (!this.#pending) { - // only resume subscribers if we know that the values of the stores with which - // the derived function was called were the correct ones - super.resumeSubscribers(); - } - } - - protected override onUse(): Unsubscriber | void { - let initDone = false; - let changed = 0; - - const isArray = this.#isArray; - const storesSubscribeFn = this.#storesSubscribeFn; - const dependantValues = new Array(storesSubscribeFn.length); - - let cleanupFn: null | UnsubscribeFunction = null; - - const callCleanup = () => { - const fn = cleanupFn; - if (fn) { - cleanupFn = null; - fn(); - } - }; - - const callDerive = (setInitDone = false) => { - if (setInitDone) { - initDone = true; - } - if (initDone && !this.#pending) { - if (changed) { - changed = 0; - callCleanup(); - cleanupFn = normalizeUnsubscribe( - this.derive(isArray ? dependantValues : dependantValues[0]) - ); - } - this.resumeSubscribers(); - } - }; - - const unsubscribers = storesSubscribeFn.map((subscribe, idx) => { - const subscriber = (v: any) => { - dependantValues[idx] = v; - changed |= 1 << idx; - this.#pending &= ~(1 << idx); - callDerive(); - }; - subscriber.next = subscriber; - subscriber.pause = () => { - this.#pending |= 1 << idx; - this.pauseSubscribers(); - }; - subscriber.resume = () => { - this.#pending &= ~(1 << idx); - callDerive(); - }; - return subscribe(subscriber); - }); - - const triggerSubscriberPendingUpdate = (unsubscriber: any, idx: number) => { - if (this.#pending & (1 << idx)) { - unsubscriber[triggerUpdate]?.(); - } - }; - this[triggerUpdate] = () => { - let iterations = 0; - while (this.#pending) { - checkIterations(++iterations); - initDone = false; - unsubscribers.forEach(triggerSubscriberPendingUpdate); - if (this.#pending) { - // safety check: if pending is not 0 after calling triggerUpdate, - // it will never be and this is an endless loop - break; - } - callDerive(true); - } - }; - callDerive(true); - this[triggerUpdate](); - return () => { - this[triggerUpdate] = noop; - callCleanup(); - unsubscribers.forEach(callFn); - }; + const rawStore = new RawStoreDerivedStore(stores, initialValue, (values) => + this.derive(values) + ); + rawStore.equalFn = (a, b) => this.equal(a, b); + super(rawStore as any); } - protected abstract derive(values: StoresInputValues): Unsubscriber | void; } @@ -1138,224 +475,12 @@ export function derived( options = { derive: options }; } const { derive, ...opts } = options; - const Derived = isSyncDeriveFn(derive) - ? class extends DerivedStore { - constructor(stores: S, initialValue: T) { - super(stores, initialValue); - this[skipEqualInSet] = true; // skip call to equal in set until the first value is set - } - protected override derive(values: StoresInputValues) { - this.set(derive(values)); - this[skipEqualInSet] = false; - } - } - : class extends DerivedStore { - protected override derive(values: StoresInputValues) { - const setFn = (v: T) => this.set(v); - setFn.set = setFn; - setFn.update = (updater: Updater) => this.update(updater); - return derive(values, setFn); - } - }; - return asReadable( - applyStoreOptions(new Derived(stores, initialValue as any), { - ...opts, - onUse: undefined /* setting onUse is not allowed from derived */, - }) + const Derived = isSyncDeriveFn(derive) ? RawStoreSyncDerived : RawStoreAsyncDerived; + return exposeRawStore( + applyStoreOptions(new Derived(stores as any, initialValue as any, derive as any), opts) ); } -/** - * Stops the tracking of dependencies made by {@link computed} and calls the provided function. - * After the function returns, the tracking of dependencies continues as before. - * - * @param fn - function to be called - * @returns the value returned by the given function - */ -export const untrack = (fn: () => T): T => { - const previousReactiveContext = reactiveContext; - try { - reactiveContext = defaultReactiveContext; - return fn(); - } finally { - reactiveContext = previousReactiveContext; - } -}; - -interface ComputedStoreSubscription { - versionIndex: number; - resubscribe: () => void; - unsubscribe: UnsubscribeFunction; - pending: boolean; - usedValueIndex: number; - valueIndex: number; - value: T; -} - -const callUnsubscribe = ({ unsubscribe }: ComputedStoreSubscription) => unsubscribe(); -const callResubscribe = ({ resubscribe }: ComputedStoreSubscription) => resubscribe(); - -abstract class ComputedStore extends Store { - #computing = false; - #skipCallCompute = false; - #versionIndex = 0; - #subscriptions = new Map, ComputedStoreSubscription>(); - - #reactiveContext = (storeInput: StoreInput): U => - untrack(() => this.#getSubscriptionValue(storeInput)); - - constructor() { - super(undefined as T); - this[skipEqualInSet] = true; // skip call to equal in set until the first value is set - } - - #createSubscription(subscribe: Readable['subscribe']) { - const res: ComputedStoreSubscription = { - versionIndex: this.#versionIndex, - unsubscribe: noop, - resubscribe: noop, - pending: false, - usedValueIndex: 0, - value: undefined as T, - valueIndex: 0, - }; - const subscriber: SubscriberFunction & Partial> = (value: T) => { - res.value = value; - res.valueIndex++; - res.pending = false; - if (!this.#skipCallCompute && !this.#isPending()) { - batch(() => this.#callCompute()); - } - }; - subscriber.next = subscriber; - subscriber.pause = () => { - res.pending = true; - this.pauseSubscribers(); - }; - subscriber.resume = () => { - res.pending = false; - if (!this.#skipCallCompute && !this.#isPending()) { - batch(() => this.#callCompute()); - } - }; - res.resubscribe = () => { - res.unsubscribe = subscribe(subscriber); - subscriber[oldSubscription] = res.unsubscribe; - }; - res.resubscribe(); - return res; - } - - #getSubscriptionValue(storeInput: StoreInput) { - let res = this.#subscriptions.get(storeInput); - if (res) { - res.versionIndex = this.#versionIndex; - (res.unsubscribe as any)[triggerUpdate]?.(); - } else { - res = this.#createSubscription(getNormalizedSubscribe(storeInput)); - this.#subscriptions.set(storeInput, res); - } - res.usedValueIndex = res.valueIndex; - return res.value; - } - - #callCompute(resubscribe = false) { - this.#computing = true; - this.#skipCallCompute = true; - try { - if (this.#versionIndex > 0) { - if (resubscribe) { - this.#subscriptions.forEach(callResubscribe); - } - if (!this.#hasChange()) { - this.resumeSubscribers(); - return; - } - } - this.#versionIndex++; - const versionIndex = this.#versionIndex; - const previousReactiveContext = reactiveContext; - let value: T; - try { - reactiveContext = this.#reactiveContext; - value = this.compute(); - } finally { - reactiveContext = previousReactiveContext; - } - this.set(value); - this[skipEqualInSet] = false; - for (const [store, info] of this.#subscriptions) { - if (info.versionIndex !== versionIndex) { - this.#subscriptions.delete(store); - info.unsubscribe(); - } - } - } finally { - this.#skipCallCompute = false; - this.#computing = false; - } - } - - #isPending() { - for (const [, { pending }] of this.#subscriptions) { - if (pending) { - return true; - } - } - return false; - } - - #hasChange() { - for (const [, { valueIndex, usedValueIndex }] of this.#subscriptions) { - if (valueIndex != usedValueIndex) { - return true; - } - } - return false; - } - - protected override resumeSubscribers(): void { - if (!this.#isPending()) { - super.resumeSubscribers(); - } - } - - /** @internal */ - protected override [triggerUpdate](): void { - if (this.#computing) { - throw new Error('recursive computed'); - } - let iterations = 0; - while (this.#isPending()) { - checkIterations(++iterations); - this.#skipCallCompute = true; - try { - for (const [, { pending, unsubscribe }] of this.#subscriptions) { - if (pending) { - (unsubscribe as any)[triggerUpdate]?.(); - } - } - } finally { - this.#skipCallCompute = false; - } - if (this.#isPending()) { - // safety check: if it is still pending after calling triggerUpdate, - // it will always be and this is an endless loop - break; - } - this.#callCompute(); - } - } - - protected abstract compute(): T; - - protected override onUse(): Unsubscriber { - this.#callCompute(true); - this[triggerUpdate](); - return () => this.#subscriptions.forEach(callUnsubscribe); - } -} - /** * Creates a store whose value is computed by the provided function. * @@ -1380,17 +505,7 @@ abstract class ComputedStore extends Store { */ export function computed( fn: () => T, - options: Omit, 'onUse'> = {} + options?: Omit, 'onUse'> ): ReadableSignal { - const Computed = class extends ComputedStore { - protected override compute(): T { - return fn(); - } - }; - return asReadable( - applyStoreOptions(new Computed(), { - ...options, - onUse: undefined /* setting onUse is not allowed from computed */, - }) - ); + return exposeRawStore(applyStoreOptions(new RawStoreComputed(fn), options)); } diff --git a/src/internal/batch.ts b/src/internal/batch.ts new file mode 100644 index 0000000..8469a39 --- /dev/null +++ b/src/internal/batch.ts @@ -0,0 +1,76 @@ +import type { SubscribeConsumer } from './subscribeConsumer'; + +export const subscribersQueue: SubscribeConsumer[] = []; +let willProcessQueue = false; + +/** + * Batches multiple changes to stores while calling the provided function, + * preventing derived stores from updating until the function returns, + * to avoid unnecessary recomputations. + * + * @remarks + * + * If a store is updated multiple times in the provided function, existing + * subscribers of that store will only be called once when the provided + * function returns. + * + * Note that even though the computation of derived stores is delayed in most + * cases, some computations of derived stores will still occur inside + * the function provided to batch if a new subscriber is added to a store, because + * calling {@link SubscribableStore.subscribe | subscribe} always triggers a + * synchronous call of the subscriber and because tansu always provides up-to-date + * values when calling subscribers. Especially, calling {@link get} on a store will + * always return the correct up-to-date value and can trigger derived store + * intermediate computations, even inside batch. + * + * It is possible to have nested calls of batch, in which case only the first + * (outer) call has an effect, inner calls only call the provided function. + * + * @param fn - a function that can update stores. Its returned value is + * returned by the batch function. + * + * @example + * Using batch in the following example prevents logging the intermediate "Sherlock Lupin" value. + * + * ```typescript + * const firstName = writable('Arsène'); + * const lastName = writable('Lupin'); + * const fullName = derived([firstName, lastName], ([a, b]) => `${a} ${b}`); + * fullName.subscribe((name) => console.log(name)); // logs any change to fullName + * batch(() => { + * firstName.set('Sherlock'); + * lastName.set('Holmes'); + * }); + * ``` + */ +export const batch = (fn: () => T): T => { + const needsProcessQueue = !willProcessQueue; + willProcessQueue = true; + let success = true; + let res; + let error; + try { + res = fn(); + } finally { + if (needsProcessQueue) { + while (subscribersQueue.length > 0) { + const consumer = subscribersQueue.shift()!; + try { + consumer.notify(); + } catch (e) { + // an error in one consumer should not impact others + if (success) { + // will throw the first error + success = false; + error = e; + } + } + } + willProcessQueue = false; + } + } + if (success) { + return res; + } + throw error; +}; diff --git a/src/internal/equal.ts b/src/internal/equal.ts new file mode 100644 index 0000000..9a63fc3 --- /dev/null +++ b/src/internal/equal.ts @@ -0,0 +1,12 @@ +/** + * Default implementation of the equal function used by tansu when a store + * changes, to know if listeners need to be notified. + * Returns false if `a` is a function or an object, or if `a` and `b` + * are different according to `Object.is`. Otherwise, returns true. + * + * @param a - First value to compare. + * @param b - Second value to compare. + * @returns true if a and b are considered equal. + */ +export const equal = (a: T, b: T): boolean => + Object.is(a, b) && (!a || typeof a !== 'object') && typeof a !== 'function'; diff --git a/src/internal/exposeRawStores.ts b/src/internal/exposeRawStores.ts new file mode 100644 index 0000000..4c96854 --- /dev/null +++ b/src/internal/exposeRawStores.ts @@ -0,0 +1,48 @@ +import type { Readable, ReadableSignal, StoreInput } from '../types'; +import type { RawStore } from './store'; +import { RawSubscribableWrapper } from './storeSubscribable'; + +/** + * Symbol used in {@link InteropObservable} allowing any object to expose an observable. + */ +export const symbolObservable: typeof Symbol.observable = + (typeof Symbol === 'function' && Symbol.observable) || ('@@observable' as any); + +const returnThis = function (this: T): T { + return this; +}; + +export const rawStoreSymbol = Symbol(); +const rawStoreMap = new WeakMap, RawStore>(); + +export const getRawStore = (storeInput: StoreInput): RawStore => { + const rawStore = (storeInput as any)[rawStoreSymbol]; + if (rawStore) { + return rawStore; + } + let res = rawStoreMap.get(storeInput); + if (!res) { + let subscribable = storeInput; + if (!('subscribe' in subscribable)) { + subscribable = subscribable[symbolObservable](); + } + res = new RawSubscribableWrapper(subscribable); + rawStoreMap.set(storeInput, res); + } + return res; +}; + +export const exposeRawStore = ( + rawStore: RawStore, + extraProp?: U +): ReadableSignal & Omit> => { + const get = rawStore.get.bind(rawStore) as any; + if (extraProp) { + Object.assign(get, extraProp); + } + get.get = get; + get.subscribe = rawStore.subscribe.bind(rawStore); + get[symbolObservable] = returnThis; + get[rawStoreSymbol] = rawStore; + return get; +}; diff --git a/src/internal/store.ts b/src/internal/store.ts new file mode 100644 index 0000000..1d65ddc --- /dev/null +++ b/src/internal/store.ts @@ -0,0 +1,48 @@ +import type { SignalStore, SubscribableStore } from '../types'; + +export interface Consumer { + markDirty(): void; +} + +export const enum RawStoreFlags { + NONE = 0, + // the following flags are used in RawStoreTrackingUsage and derived classes + HAS_VISIBLE_ONUSE = 1, + START_USE_CALLED = 1 << 1, + FLUSH_PLANNED = 1 << 2, + // the following flags are used in RawStoreComputedOrDerived and derived classes + COMPUTING = 1 << 3, + DIRTY = 1 << 4, +} + +export interface BaseLink { + producer: RawStore>; + skipMarkDirty?: boolean; +} + +export interface RawStore = BaseLink> + extends SignalStore, + SubscribableStore { + readonly flags: RawStoreFlags; + newLink(consumer: Consumer): Link; + registerConsumer(link: Link): Link; + unregisterConsumer(link: Link): void; + updateValue(): void; + isLinkUpToDate(link: Link): boolean; + updateLink(link: Link): T; +} + +export const updateLinkProducerValue = (link: BaseLink): void => { + try { + link.skipMarkDirty = true; + link.producer.updateValue(); + // Ignoring coverage for the following lines because, unless there is a bug in tansu (which would have to be fixed!) + // there should be no way to trigger this error. + /* v8 ignore next 3 */ + if (link.producer.flags & RawStoreFlags.DIRTY) { + throw new Error('assert failed: store still dirty after updating it'); + } + } finally { + link.skipMarkDirty = false; + } +}; diff --git a/src/internal/storeComputed.ts b/src/internal/storeComputed.ts new file mode 100644 index 0000000..1bc94f9 --- /dev/null +++ b/src/internal/storeComputed.ts @@ -0,0 +1,127 @@ +import type { BaseLink, Consumer, RawStore } from './store'; +import { RawStoreFlags, updateLinkProducerValue } from './store'; +import { + COMPUTED_ERRORED, + COMPUTED_UNSET, + RawStoreComputedOrDerived, +} from './storeComputedOrDerived'; +import { epoch, notificationPhase } from './storeWritable'; +import { activeConsumer, setActiveConsumer, type ActiveConsumer } from './untrack'; + +export class RawStoreComputed + extends RawStoreComputedOrDerived + implements Consumer, ActiveConsumer +{ + private producerIndex = 0; + private producerLinks: BaseLink[] = []; + private epoch = -1; + + constructor(private readonly computeFn: () => T) { + super(COMPUTED_UNSET); + } + + override increaseEpoch(): void { + // do nothing + } + + override updateValue(): void { + const flags = this.flags; + if (flags & RawStoreFlags.START_USE_CALLED && this.epoch === epoch) { + return; + } + super.updateValue(); + this.epoch = epoch; + } + + override get(): T { + if ( + !activeConsumer && + !notificationPhase && + this.epoch === epoch && + (!(this.flags & RawStoreFlags.HAS_VISIBLE_ONUSE) || + this.flags & RawStoreFlags.START_USE_CALLED) + ) { + return this.readValue(); + } + return super.get(); + } + + addProducer>(producer: RawStore): U { + const producerLinks = this.producerLinks; + const producerIndex = this.producerIndex; + let link = producerLinks[producerIndex] as L | undefined; + if (link?.producer !== producer) { + if (link) { + producerLinks.push(link); // push the existing link at the end (to be removed later) + } + link = producer.registerConsumer(producer.newLink(this)); + } + producerLinks[producerIndex] = link; + this.producerIndex = producerIndex + 1; + updateLinkProducerValue(link); + if (producer.flags & RawStoreFlags.HAS_VISIBLE_ONUSE) { + this.flags |= RawStoreFlags.HAS_VISIBLE_ONUSE; + } + return producer.updateLink(link); + } + + override startUse(): void { + const producerLinks = this.producerLinks; + for (let i = 0, l = producerLinks.length; i < l; i++) { + const link = producerLinks[i]; + link.producer.registerConsumer(link); + } + this.flags |= RawStoreFlags.DIRTY; + } + + override endUse(): void { + const producerLinks = this.producerLinks; + for (let i = 0, l = producerLinks.length; i < l; i++) { + const link = producerLinks[i]; + link.producer.unregisterConsumer(link); + } + } + + override areProducersUpToDate(): boolean { + if (this.value === COMPUTED_UNSET) { + return false; + } + const producerLinks = this.producerLinks; + for (let i = 0, l = producerLinks.length; i < l; i++) { + const link = producerLinks[i]; + const producer = link.producer; + updateLinkProducerValue(link); + if (!producer.isLinkUpToDate(link)) { + return false; + } + } + return true; + } + + override recompute(): void { + let value: T; + const prevActiveConsumer = setActiveConsumer(this); + try { + this.producerIndex = 0; + this.flags &= ~RawStoreFlags.HAS_VISIBLE_ONUSE; + const computeFn = this.computeFn; + value = computeFn(); + this.error = null; + } catch (error) { + value = COMPUTED_ERRORED; + this.error = error; + } finally { + setActiveConsumer(prevActiveConsumer); + } + // Remove unused producers: + const producerLinks = this.producerLinks; + const producerIndex = this.producerIndex; + if (producerIndex < producerLinks.length) { + for (let i = 0, l = producerLinks.length - producerIndex; i < l; i++) { + const link = producerLinks.pop()!; + link.producer.unregisterConsumer(link); + } + } + this.set(value); + } +} diff --git a/src/internal/storeComputedOrDerived.ts b/src/internal/storeComputedOrDerived.ts new file mode 100644 index 0000000..a01bf57 --- /dev/null +++ b/src/internal/storeComputedOrDerived.ts @@ -0,0 +1,83 @@ +import type { Consumer } from './store'; +import { RawStoreFlags } from './store'; +import { RawStoreTrackingUsage } from './storeTrackingUsage'; +import { setActiveConsumer } from './untrack'; + +const MAX_CHANGE_RECOMPUTES = 1000; + +export const COMPUTED_UNSET: any = Symbol('UNSET'); +export const COMPUTED_ERRORED: any = Symbol('ERRORED'); +export const isComputedSpecialValue = (value: unknown): boolean => + value === COMPUTED_UNSET || value === COMPUTED_ERRORED; + +export abstract class RawStoreComputedOrDerived + extends RawStoreTrackingUsage + implements Consumer +{ + override flags = RawStoreFlags.DIRTY; + error: any; + + override equal(a: T, b: T): boolean { + if (isComputedSpecialValue(a) || isComputedSpecialValue(b)) { + return false; + } + return super.equal(a, b); + } + + markDirty(): void { + if (!(this.flags & RawStoreFlags.DIRTY)) { + this.flags |= RawStoreFlags.DIRTY; + this.markConsumersDirty(); + } + } + + override readValue(): T { + const value = this.value; + if (value === COMPUTED_ERRORED) { + throw this.error; + } + // Ignoring coverage for the following lines because, unless there is a bug in tansu (which would have to be fixed!) + // there should be no way to trigger this error. + /* v8 ignore next 3 */ + if (value === COMPUTED_UNSET) { + throw new Error('assert failed: computed value is not set'); + } + return value; + } + + override updateValue(): void { + if (this.flags & RawStoreFlags.COMPUTING) { + throw new Error('recursive computed'); + } + super.updateValue(); + if (!(this.flags & RawStoreFlags.DIRTY)) { + return; + } + this.flags |= RawStoreFlags.COMPUTING; + const prevActiveConsumer = setActiveConsumer(null); + try { + let iterations = 0; + do { + do { + iterations++; + this.flags &= ~RawStoreFlags.DIRTY; + if (this.areProducersUpToDate()) { + return; + } + } while (this.flags & RawStoreFlags.DIRTY && iterations < MAX_CHANGE_RECOMPUTES); + this.recompute(); + } while (this.flags & RawStoreFlags.DIRTY && iterations < MAX_CHANGE_RECOMPUTES); + if (this.flags & RawStoreFlags.DIRTY) { + this.flags &= ~RawStoreFlags.DIRTY; + this.error = new Error('reached maximum number of store changes in one shot'); + this.set(COMPUTED_ERRORED); + } + } finally { + setActiveConsumer(prevActiveConsumer); + this.flags &= ~RawStoreFlags.COMPUTING; + } + } + + abstract areProducersUpToDate(): boolean; + abstract recompute(): void; +} diff --git a/src/internal/storeConst.ts b/src/internal/storeConst.ts new file mode 100644 index 0000000..a1148db --- /dev/null +++ b/src/internal/storeConst.ts @@ -0,0 +1,40 @@ +import type { Subscriber, Unsubscriber } from '../types'; +import type { BaseLink, Consumer, RawStore } from './store'; +import { RawStoreFlags } from './store'; +import { checkNotInNotificationPhase } from './storeWritable'; +import { noopUnsubscribe } from './unsubscribe'; + +export class RawStoreConst implements RawStore> { + readonly flags = RawStoreFlags.NONE; + constructor(private readonly value: T) {} + + newLink(_consumer: Consumer): BaseLink { + return { + producer: this, + }; + } + registerConsumer(link: BaseLink): BaseLink { + return link; + } + unregisterConsumer(_link: BaseLink): void {} + updateValue(): void {} + isLinkUpToDate(_link: BaseLink): boolean { + return true; + } + updateLink(_link: BaseLink): T { + return this.value; + } + get(): T { + checkNotInNotificationPhase(); + return this.value; + } + subscribe(subscriber: Subscriber): Unsubscriber { + checkNotInNotificationPhase(); + if (typeof subscriber === 'function') { + subscriber(this.value); + } else { + subscriber?.next?.(this.value); + } + return noopUnsubscribe; + } +} diff --git a/src/internal/storeDerived.ts b/src/internal/storeDerived.ts new file mode 100644 index 0000000..18e414a --- /dev/null +++ b/src/internal/storeDerived.ts @@ -0,0 +1,140 @@ +import type { + AsyncDeriveFn, + OnUseArgument, + StoreInput, + StoresInput, + StoresInputValues, + SyncDeriveFn, + UnsubscribeFunction, + Unsubscriber, +} from '../types'; +import { getRawStore } from './exposeRawStores'; +import type { BaseLink, Consumer, RawStore } from './store'; +import { RawStoreFlags, updateLinkProducerValue } from './store'; +import { + COMPUTED_ERRORED, + COMPUTED_UNSET, + RawStoreComputedOrDerived, +} from './storeComputedOrDerived'; +import type { RawStoreWritable } from './storeWritable'; +import { normalizeUnsubscribe } from './unsubscribe'; + +abstract class RawStoreDerived + extends RawStoreComputedOrDerived + implements Consumer +{ + private readonly arrayMode: boolean; + private readonly producers: RawStore[]; + private producerLinks: BaseLink[] | null = null; + private cleanUpFn: UnsubscribeFunction | null = null; + override flags = RawStoreFlags.HAS_VISIBLE_ONUSE | RawStoreFlags.DIRTY; + + constructor(producers: S, initialValue: T) { + super(initialValue); + const arrayMode = Array.isArray(producers); + this.arrayMode = arrayMode; + this.producers = ( + arrayMode ? (producers as StoreInput[]) : [producers as StoreInput] + ).map(getRawStore); + } + + callCleanUpFn(): void { + const cleanUpFn = this.cleanUpFn; + if (cleanUpFn) { + this.cleanUpFn = null; + cleanUpFn(); + } + } + + override startUse(): void { + this.producerLinks = this.producers.map((producer) => + producer.registerConsumer(producer.newLink(this)) + ); + this.flags |= RawStoreFlags.DIRTY; + } + + override endUse(): void { + this.callCleanUpFn(); + const producerLinks = this.producerLinks; + this.producerLinks = null; + if (producerLinks) { + for (let i = 0, l = producerLinks.length; i < l; i++) { + const link = producerLinks[i]; + link.producer.unregisterConsumer(link); + } + } + } + + override areProducersUpToDate(): boolean { + const producerLinks = this.producerLinks!; + let alreadyUpToDate = this.value !== COMPUTED_UNSET; + for (let i = 0, l = producerLinks.length; i < l; i++) { + const link = producerLinks[i]; + const producer = link.producer; + updateLinkProducerValue(link); + if (!producer.isLinkUpToDate(link)) { + alreadyUpToDate = false; + } + } + return alreadyUpToDate; + } + + override recompute(): void { + try { + this.callCleanUpFn(); + const values = this.producerLinks!.map((link) => link.producer.updateLink(link)); + this.cleanUpFn = normalizeUnsubscribe(this.derive(this.arrayMode ? values : values[0])); + } catch (error) { + this.error = error; + this.set(COMPUTED_ERRORED); + } + } + + protected abstract derive(values: S): void; +} + +export class RawStoreDerivedStore extends RawStoreDerived { + constructor( + stores: S, + initialValue: T, + protected readonly derive: (values: StoresInputValues) => void + ) { + super(stores, initialValue); + } +} + +export class RawStoreSyncDerived extends RawStoreDerived { + constructor( + stores: S, + _initialValue: T, + private readonly deriveFn: SyncDeriveFn + ) { + super(stores, COMPUTED_UNSET); + } + protected override derive(values: StoresInputValues): void { + const deriveFn = this.deriveFn; + this.set(deriveFn(values)); + } +} + +export const createOnUseArg = (store: RawStoreWritable): OnUseArgument => { + const setFn = store.set.bind(store) as any; + setFn.set = setFn; + setFn.update = store.update.bind(store); + return setFn; +}; + +export class RawStoreAsyncDerived extends RawStoreDerived { + private readonly setFn = createOnUseArg(this); + constructor( + stores: S, + initialValue: T, + private readonly deriveFn: AsyncDeriveFn + ) { + super(stores, initialValue); + } + protected override derive(values: StoresInputValues): Unsubscriber | void { + const deriveFn = this.deriveFn; + return deriveFn(values, this.setFn); + } +} diff --git a/src/internal/storeSubscribable.ts b/src/internal/storeSubscribable.ts new file mode 100644 index 0000000..67a8534 --- /dev/null +++ b/src/internal/storeSubscribable.ts @@ -0,0 +1,41 @@ +import type { + SubscribableStore, + SubscriberFunction, + SubscriberObject, + UnsubscribeFunction, +} from '../types'; +import { RawStoreFlags } from './store'; +import { RawStoreTrackingUsage } from './storeTrackingUsage'; +import { normalizeUnsubscribe } from './unsubscribe'; + +export class RawSubscribableWrapper extends RawStoreTrackingUsage { + private readonly subscriber: Pick, 'next'> & SubscriberFunction = + this.createSubscriber(); + private unsubscribe: UnsubscribeFunction | null = null; + override flags = RawStoreFlags.HAS_VISIBLE_ONUSE; + + constructor(private readonly subscribable: SubscribableStore) { + super(undefined as any); + } + + private createSubscriber() { + const subscriber = (value: T) => this.set(value); + subscriber.next = subscriber; + subscriber.pause = () => { + this.markConsumersDirty(); + }; + return subscriber; + } + + override startUse(): void { + this.unsubscribe = normalizeUnsubscribe(this.subscribable.subscribe(this.subscriber)); + } + + override endUse(): void { + const unsubscribe = this.unsubscribe; + if (unsubscribe) { + this.unsubscribe = null; + unsubscribe(); + } + } +} diff --git a/src/internal/storeTrackingUsage.ts b/src/internal/storeTrackingUsage.ts new file mode 100644 index 0000000..6495145 --- /dev/null +++ b/src/internal/storeTrackingUsage.ts @@ -0,0 +1,90 @@ +import { RawStoreFlags } from './store'; +import { checkNotInNotificationPhase, RawStoreWritable } from './storeWritable'; +import { activeConsumer, untrack } from './untrack'; + +let flushUnusedQueue: RawStoreTrackingUsage[] | null = null; +let inFlushUnused = false; + +export const flushUnused = (): void => { + // Ignoring coverage for the following lines because, unless there is a bug in tansu (which would have to be fixed!) + // there should be no way to trigger this error. + /* v8 ignore next 3 */ + if (inFlushUnused) { + throw new Error('assert failed: recursive flushUnused call'); + } + inFlushUnused = true; + try { + const queue = flushUnusedQueue; + if (queue) { + flushUnusedQueue = null; + for (let i = 0, l = queue.length; i < l; i++) { + const producer = queue[i]; + producer.flags &= ~RawStoreFlags.FLUSH_PLANNED; + producer.checkUnused(); + } + } + } finally { + inFlushUnused = false; + } +}; + +export abstract class RawStoreTrackingUsage extends RawStoreWritable { + private extraUsages = 0; + abstract startUse(): void; + abstract endUse(): void; + + override updateValue(): void { + const flags = this.flags; + if (!(flags & RawStoreFlags.START_USE_CALLED)) { + // Ignoring coverage for the following lines because, unless there is a bug in tansu (which would have to be fixed!) + // there should be no way to trigger this error. + /* v8 ignore next 3 */ + if (!this.extraUsages && !this.consumerLinks.length) { + throw new Error('assert failed: untracked producer usage'); + } + this.flags |= RawStoreFlags.START_USE_CALLED; + untrack(() => this.startUse()); + } + } + + override checkUnused(): void { + const flags = this.flags; + if (flags & RawStoreFlags.START_USE_CALLED && !this.extraUsages && !this.consumerLinks.length) { + if (inFlushUnused || flags & RawStoreFlags.HAS_VISIBLE_ONUSE) { + this.flags &= ~RawStoreFlags.START_USE_CALLED; + untrack(() => this.endUse()); + } else if (!(flags & RawStoreFlags.FLUSH_PLANNED)) { + this.flags |= RawStoreFlags.FLUSH_PLANNED; + if (!flushUnusedQueue) { + flushUnusedQueue = []; + queueMicrotask(flushUnused); + } + flushUnusedQueue.push(this); + } + } + } + + override get(): T { + checkNotInNotificationPhase(); + if (activeConsumer) { + return activeConsumer.addProducer(this); + } else { + this.extraUsages++; + try { + this.updateValue(); + // Ignoring coverage for the following lines because, unless there is a bug in tansu (which would have to be fixed!) + // there should be no way to trigger this error. + /* v8 ignore next 3 */ + if (this.flags & RawStoreFlags.DIRTY) { + throw new Error('assert failed: store still dirty after updating it'); + } + return this.readValue(); + } finally { + const extraUsages = --this.extraUsages; + if (extraUsages === 0) { + this.checkUnused(); + } + } + } + } +} diff --git a/src/internal/storeWithOnUse.ts b/src/internal/storeWithOnUse.ts new file mode 100644 index 0000000..81f63d4 --- /dev/null +++ b/src/internal/storeWithOnUse.ts @@ -0,0 +1,28 @@ +import type { UnsubscribeFunction, Unsubscriber } from '../types'; +import { RawStoreFlags } from './store'; +import { RawStoreTrackingUsage } from './storeTrackingUsage'; +import { normalizeUnsubscribe } from './unsubscribe'; + +export class RawStoreWithOnUse extends RawStoreTrackingUsage { + private cleanUpFn: UnsubscribeFunction | null = null; + override flags = RawStoreFlags.HAS_VISIBLE_ONUSE; + + constructor( + value: T, + private readonly onUseFn: () => Unsubscriber | void + ) { + super(value); + } + + override startUse(): void { + this.cleanUpFn = normalizeUnsubscribe(this.onUseFn()); + } + + override endUse(): void { + const cleanUpFn = this.cleanUpFn; + if (cleanUpFn) { + this.cleanUpFn = null; + cleanUpFn(); + } + } +} diff --git a/src/internal/storeWritable.ts b/src/internal/storeWritable.ts new file mode 100644 index 0000000..9b0644c --- /dev/null +++ b/src/internal/storeWritable.ts @@ -0,0 +1,162 @@ +import type { Subscriber, UnsubscribeFunction, UnsubscribeObject, Updater } from '../types'; +import { batch } from './batch'; +import { equal } from './equal'; +import type { Consumer, RawStore } from './store'; +import { RawStoreFlags } from './store'; +import { SubscribeConsumer } from './subscribeConsumer'; +import { activeConsumer } from './untrack'; + +export let notificationPhase = false; + +export const checkNotInNotificationPhase = (): void => { + if (notificationPhase) { + throw new Error('Reading or writing a signal is forbidden during the notification phase.'); + } +}; + +export let epoch = 0; + +export interface ProducerConsumerLink { + value: T; + version: number; + producer: RawStore>; + indexInProducer: number; + consumer: Consumer; + skipMarkDirty: boolean; +} + +export class RawStoreWritable implements RawStore> { + constructor(protected value: T) {} + flags = RawStoreFlags.NONE; + private version = 0; + equalFn = equal; + private equalCache: Record | null = null; + consumerLinks: ProducerConsumerLink[] = []; + + newLink(consumer: Consumer): ProducerConsumerLink { + return { + version: -1, + value: undefined as any, + producer: this, + indexInProducer: 0, + consumer, + skipMarkDirty: false, + }; + } + + isLinkUpToDate(link: ProducerConsumerLink): boolean { + if (link.version === this.version) { + return true; + } + if (link.version === this.version - 1 || link.version < 0) { + return false; + } + let equalCache = this.equalCache; + if (!equalCache) { + equalCache = {}; + this.equalCache = equalCache; + } + let res = equalCache[link.version]; + if (res === undefined) { + res = this.equal(link.value, this.value); + equalCache[link.version] = res; + } + return res; + } + + updateLink(link: ProducerConsumerLink): T { + link.value = this.value; + link.version = this.version; + return this.readValue(); + } + + registerConsumer(link: ProducerConsumerLink): ProducerConsumerLink { + const consumerLinks = this.consumerLinks; + const indexInProducer = consumerLinks.length; + link.indexInProducer = indexInProducer; + consumerLinks[indexInProducer] = link; + return link; + } + + unregisterConsumer(link: ProducerConsumerLink): void { + const consumerLinks = this.consumerLinks; + const index = link.indexInProducer; + // Ignoring coverage for the following lines because, unless there is a bug in tansu (which would have to be fixed!) + // there should be no way to trigger this error. + /* v8 ignore next 3 */ + if (consumerLinks[index] !== link) { + throw new Error('assert failed: invalid indexInProducer'); + } + // swap with the last item to avoid shifting the array + const lastConsumerLink = consumerLinks.pop()!; + const isLast = link === lastConsumerLink; + if (!isLast) { + consumerLinks[index] = lastConsumerLink; + lastConsumerLink.indexInProducer = index; + } else if (index === 0) { + this.checkUnused(); + } + } + + protected checkUnused(): void {} + updateValue(): void {} + + protected equal(a: T, b: T): boolean { + const equalFn = this.equalFn; + return equalFn(a, b); + } + + protected increaseEpoch(): void { + epoch++; + this.markConsumersDirty(); + } + + set(newValue: T): void { + checkNotInNotificationPhase(); + const same = this.equal(this.value, newValue); + if (!same) { + batch(() => { + this.value = newValue; + this.version++; + this.equalCache = null; + this.increaseEpoch(); + }); + } + } + + update(updater: Updater): void { + this.set(updater(this.value)); + } + + protected markConsumersDirty(): void { + const prevNotificationPhase = notificationPhase; + notificationPhase = true; + try { + const consumerLinks = this.consumerLinks; + for (let i = 0, l = consumerLinks.length; i < l; i++) { + const link = consumerLinks[i]; + if (link.skipMarkDirty) continue; + link.consumer.markDirty(); + } + } finally { + notificationPhase = prevNotificationPhase; + } + } + + get(): T { + checkNotInNotificationPhase(); + return activeConsumer ? activeConsumer.addProducer(this) : this.readValue(); + } + + readValue(): T { + return this.value; + } + + subscribe(subscriber: Subscriber): UnsubscribeFunction & UnsubscribeObject { + checkNotInNotificationPhase(); + const subscription = new SubscribeConsumer(this, subscriber); + const unsubscriber = () => subscription.unsubscribe(); + unsubscriber.unsubscribe = unsubscriber; + return unsubscriber; + } +} diff --git a/src/internal/subscribeConsumer.ts b/src/internal/subscribeConsumer.ts new file mode 100644 index 0000000..78c49a5 --- /dev/null +++ b/src/internal/subscribeConsumer.ts @@ -0,0 +1,64 @@ +import type { Subscriber, SubscriberObject } from '../types'; +import { subscribersQueue } from './batch'; +import { updateLinkProducerValue, type BaseLink, type Consumer, type RawStore } from './store'; + +export const noop = (): void => {}; + +const bind = (object: T | null | undefined, fnName: keyof T) => { + const fn = object ? object[fnName] : null; + return typeof fn === 'function' ? fn.bind(object) : noop; +}; + +const noopSubscriber: SubscriberObject = { + next: noop, + pause: noop, + resume: noop, +}; + +const toSubscriberObject = (subscriber: Subscriber): SubscriberObject => ({ + next: typeof subscriber === 'function' ? subscriber.bind(null) : bind(subscriber, 'next'), + pause: bind(subscriber, 'pause'), + resume: bind(subscriber, 'resume'), +}); + +export class SubscribeConsumer> implements Consumer { + private readonly link: Link; + private subscriber: SubscriberObject; + dirtyCount = 1; + constructor(producer: RawStore, subscriber: Subscriber) { + this.subscriber = toSubscriberObject(subscriber); + this.link = producer.registerConsumer(producer.newLink(this)); + this.notify(true); + } + + unsubscribe(): void { + if (this.subscriber !== noopSubscriber) { + this.subscriber = noopSubscriber; + this.link.producer.unregisterConsumer(this.link); + } + } + + markDirty(): void { + this.dirtyCount++; + subscribersQueue.push(this); + if (this.dirtyCount === 1) { + this.subscriber.pause(); + } + } + + notify(first = false): void { + this.dirtyCount--; + if (this.dirtyCount === 0 && this.subscriber !== noopSubscriber) { + const link = this.link; + const producer = link.producer; + updateLinkProducerValue(link); + if (producer.isLinkUpToDate(link) && !first) { + this.subscriber.resume(); + } else { + // note that the following line can throw + const value = producer.updateLink(link); + this.subscriber.next(value); + } + } + } +} diff --git a/src/internal/unsubscribe.ts b/src/internal/unsubscribe.ts new file mode 100644 index 0000000..0fd2733 --- /dev/null +++ b/src/internal/unsubscribe.ts @@ -0,0 +1,19 @@ +import type { UnsubscribeFunction, UnsubscribeObject, Unsubscriber } from '../types'; + +export const noopUnsubscribe = (): void => {}; +noopUnsubscribe.unsubscribe = noopUnsubscribe; + +export const normalizeUnsubscribe = ( + unsubscribe: Unsubscriber | void | null | undefined +): UnsubscribeFunction & UnsubscribeObject => { + if (!unsubscribe) { + return noopUnsubscribe; + } + if ((unsubscribe as any).unsubscribe === unsubscribe) { + return unsubscribe as any; + } + const res: any = + typeof unsubscribe === 'function' ? () => unsubscribe() : () => unsubscribe.unsubscribe(); + res.unsubscribe = res; + return res; +}; diff --git a/src/internal/untrack.ts b/src/internal/untrack.ts new file mode 100644 index 0000000..be236a0 --- /dev/null +++ b/src/internal/untrack.ts @@ -0,0 +1,31 @@ +import type { BaseLink, RawStore } from './store'; + +export interface ActiveConsumer { + addProducer: >(store: RawStore) => T; +} + +export let activeConsumer: ActiveConsumer | null = null; + +export const setActiveConsumer = (consumer: ActiveConsumer | null): ActiveConsumer | null => { + const prevConsumer = activeConsumer; + activeConsumer = consumer; + return prevConsumer; +}; + +/** + * Stops the tracking of dependencies made by {@link computed} and calls the provided function. + * After the function returns, the tracking of dependencies continues as before. + * + * @param fn - function to be called + * @returns the value returned by the given function + */ +export const untrack = (fn: () => T): T => { + let output: T; + const prevActiveConsumer = setActiveConsumer(null); + try { + output = fn(); + } finally { + setActiveConsumer(prevActiveConsumer); + } + return output; +}; diff --git a/src/types.ts b/src/types.ts new file mode 100644 index 0000000..110cdd5 --- /dev/null +++ b/src/types.ts @@ -0,0 +1,262 @@ +declare global { + interface SymbolConstructor { + readonly observable: symbol; + } +} + +/** + * A callback invoked when a store value changes. It is called with the latest value of a given store. + */ +export type SubscriberFunction = ((value: T) => void) & + Partial, 'next'>>; + +/** + * A partial {@link https://github.com/tc39/proposal-observable#api | observer} notified when a store value changes. A store will call the {@link SubscriberObject.next | next} method every time the store's state is changing. + */ +export interface SubscriberObject { + /** + * A store will call this method every time the store's state is changing. + */ + next: SubscriberFunction; + /** + * Unused, only declared for compatibility with rxjs. + */ + error?: any; + /** + * Unused, only declared for compatibility with rxjs. + */ + complete?: any; + /** + * A store will call this method when it knows that the value will be changed. + * A call to this method will be followed by a call to {@link SubscriberObject.next | next} or to {@link SubscriberObject.resume | resume}. + */ + pause: () => void; + /** + * A store will call this method if {@link SubscriberObject.pause | pause} was called previously + * and the value finally did not need to change. + */ + resume: () => void; +} + +/** + * Expresses interest in store value changes over time. It can be either: + * - a callback function: {@link SubscriberFunction}; + * - a partial observer: {@link SubscriberObject}. + */ +export type Subscriber = SubscriberFunction | Partial> | null | undefined; + +/** + * A function to unsubscribe from value change notifications. + */ +export type UnsubscribeFunction = () => void; + +/** + * An object with the `unsubscribe` method. + * Subscribable stores might choose to return such object instead of directly returning {@link UnsubscribeFunction} from a subscription call. + */ +export interface UnsubscribeObject { + /** + * A method that acts as the {@link UnsubscribeFunction}. + */ + unsubscribe: UnsubscribeFunction; +} + +export type Unsubscriber = UnsubscribeObject | UnsubscribeFunction; + +/** + * Represents a store accepting registrations (subscribers) and "pushing" notifications on each and every store value change. + */ +export interface SubscribableStore { + /** + * A method that makes it possible to register "interest" in store value changes over time. + * It is called each and every time the store's value changes. + * + * A registered subscriber is notified synchronously with the latest store value. + * + * @param subscriber - a subscriber in a form of a {@link SubscriberFunction} or a {@link SubscriberObject}. Returns a {@link Unsubscriber} (function or object with the `unsubscribe` method) that can be used to unregister and stop receiving notifications of store value changes. + * @returns The {@link UnsubscribeFunction} or {@link UnsubscribeObject} that can be used to unsubscribe (stop state change notifications). + */ + subscribe(subscriber: Subscriber): Unsubscriber; +} + +/** + * An interface for interoperability between observable implementations. It only has to expose the `[Symbol.observable]` method that is supposed to return a subscribable store. + */ +export interface InteropObservable { + [Symbol.observable]: () => SubscribableStore; +} + +/** + * Valid types that can be considered as a store. + */ +export type StoreInput = SubscribableStore | InteropObservable; + +/** + * Represents a store that can return its value with a get method. + */ +export interface SignalStore { + /** + * Returns the value of the store. + */ + get(): T; +} + +/** + * This interface augments the base {@link SubscribableStore} interface by requiring the return value of the subscribe method to be both a function and an object with the `unsubscribe` method. + * + * For {@link https://rxjs.dev/api/index/interface/InteropObservable | interoperability with rxjs}, it also implements the `[Symbol.observable]` method. + */ +export interface Readable extends SubscribableStore, InteropObservable, SignalStore { + subscribe(subscriber: Subscriber): UnsubscribeFunction & UnsubscribeObject; + [Symbol.observable](): Readable; +} + +/** + * This interface augments the base {@link Readable} interface by adding the ability to call the store as a function to get its value. + */ +export interface ReadableSignal extends Readable { + /** + * Returns the value of the store. + */ + (): T; +} + +/** + * A function that can be used to update store's value. This function is called with the current value and should return new store value. + */ +export type Updater = (value: T) => U; + +/** + * Builds on top of {@link Readable} and represents a store that can be manipulated from "outside": anyone with a reference to writable store can either update or completely replace state of a given store. + * + * @example + * + * ```typescript + * // reset counter's store value to 0 by using the {@link Writable.set} method + * counterStore.set(0); + * + * // increment counter's store value by using the {@link Writable.update} method + * counterStore.update(currentValue => currentValue + 1); + * ``` + */ +export interface Writable extends Readable { + /** + * Replaces store's state with the provided value. + * @param value - value to be used as the new state of a store. + */ + set(value: U): void; + + /** + * Updates store's state by using an {@link Updater} function. + * @param updater - a function that takes the current state as an argument and returns the new state. + */ + update(updater: Updater): void; +} + +/** + * Represents a store that implements both {@link ReadableSignal} and {@link Writable}. + * This is the type of objects returned by {@link writable}. + */ +export interface WritableSignal extends ReadableSignal, Writable {} + +export interface OnUseArgument { + (value: T): void; + set: (value: T) => void; + update: (updater: Updater) => void; +} + +/** + * Type of a function that is called when the number of subscribers changes from 0 to 1 + * (but not called when the number of subscribers changes from 1 to 2, ...). + * + * If it returns a function, that function will be called when the number of subscribers changes from 1 to 0. + */ +export type OnUseFn = (arg: OnUseArgument) => void | Unsubscriber; + +/** + * Store options that can be passed to {@link readable} or {@link writable}. + */ +export interface StoreOptions { + /** + * A function that is called when the number of subscribers changes from 0 to 1 + * (but not called when the number of subscribers changes from 1 to 2, ...). + * If it returns a function, that function will be called when the number of subscribers changes from 1 to 0. + */ + onUse?: OnUseFn; + + /** + * Custom function to compare two values, that should return true if they + * are equal. + * + * It is called when setting a new value to avoid doing anything + * (such as notifying subscribers) if the value did not change. + * + * @remarks + * The default logic (when this option is not present) is to return false + * if `a` is a function or an object, or if `a` and `b` are different + * according to `Object.is`. + * + * {@link StoreOptions.equal|equal} takes precedence over {@link StoreOptions.notEqual|notEqual} if both + * are defined. + * + * @param a - First value to compare. + * @param b - Second value to compare. + * @returns true if a and b are considered equal. + */ + equal?: (a: T, b: T) => boolean; + + /** + * Custom function to compare two values, that should return true if they + * are different. + * + * It is called when setting a new value to avoid doing anything + * (such as notifying subscribers) if the value did not change. + * + * @remarks + * The default logic (when this option is not present) is to return true + * if `a` is a function or an object, or if `a` and `b` are different + * according to `Object.is`. + * + * {@link StoreOptions.equal} takes precedence over {@link StoreOptions.notEqual|notEqual} if both + * are defined. + * + * @deprecated Use {@link StoreOptions.equal} instead + * @param a - First value to compare. + * @param b - Second value to compare. + * @returns true if a and b are considered different. + */ + notEqual?: (a: T, b: T) => boolean; +} + +/** + * Either a single {@link StoreInput} or a read-only array of at least one {@link StoreInput}. + */ +export type StoresInput = StoreInput | readonly [StoreInput, ...StoreInput[]]; + +/** + * Extracts the types of the values of the stores from a type extending {@link StoresInput}. + * + * @remarks + * + * If the type given as a parameter is a single {@link StoreInput}, the type of the value + * of that {@link StoreInput} is returned. + * + * If the type given as a parameter is one of an array of {@link StoreInput}, the returned type + * is the type of an array containing the value of each store in the same order. + */ +export type StoresInputValues = + S extends StoreInput + ? T + : { [K in keyof S]: S[K] extends StoreInput ? T : never }; + +export type SyncDeriveFn = (values: StoresInputValues) => T; +export interface SyncDeriveOptions extends Omit, 'onUse'> { + derive: SyncDeriveFn; +} +export type AsyncDeriveFn = ( + values: StoresInputValues, + set: OnUseArgument +) => Unsubscriber | void; +export interface AsyncDeriveOptions extends Omit, 'onUse'> { + derive: AsyncDeriveFn; +}