-
Notifications
You must be signed in to change notification settings - Fork 90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Proposal: Buffer notifications during subscription initialization #183
Comments
I'm open to ideas about how we can prevent sync emission if it will make Observables more ergonomic. Thanks for taking the time to prototype this, and I’ll take a closer look when I get a chance. One concern I have is that I don’t want to create hazards when refactoring from Iterables to Observables. Consider the following code: const ys =
Array.
of(“a”,“b”,“c”).
flatMap(x =>
Array.
of(1,2,3).
map(y => [x,y])); Now let’s consider the straightforward refactor to Observable. const ys =
Observable.
of(“a”,“b”,“c”).
flatMap(x =>
Observable.
of(1,2,3).
map(y => [x,y])); I submit both programs above should produce the same output. Can this be assured if we inject concurrency into “of” and “from”? Furthermore would we be comfortable making this scheduling strategy the default? I’m concerned that implicitly injecting concurrency into Observable could lead to notifications being processed in an unpredictable order. That is why one of the guiding principles of this proposal is currently to have the developer explicitly opt-in to concurrency. |
I agree, and I believe that for this example we can prove the ordering will be maintained.
The resulting jobs are:
There still might be some interesting ordering effects if we mix delayed notification observables like the proposed |
A refactoring from Array methods into Observable methods might run into ordering issues for an auto-flattening method, like the proposed If we assume both an let result = Array.of(1, 2, Array.of(3, 4), 5).flatten(Infinity);
console.log(result); // [1, 2, 3, 4, 5] to let result = [];
await Observable.of(1, 2, Observable.of(3, 4), 5).flatten(Infinity).forEach(x => result.push(x));
console.log(result); // [1, 2, 5, 3, 4] ?? might result in some surprises (depending on how The analogy between Observable and Array/Iterable can only be stretched so far, though, (especially since we've consistently moved away from iterable/observable duality) and this might be a case where we say that |
I'm not sure I agree, I think flatten does make sense for observables - and I'd expect The fact iterable/observable are not dual means that the signatures of the relevant interface aren't mirrors of each other - iterables and observables are still ordered which is an important property of them, and |
This divergence of behavior wouldn't occur if Observables would have the synchronous (recursive) scheduler by default, which is the case in RxJS: Rx.Observable.of(1, 2, Rx.Observable.of(3, 4), 5)
.map(x => x.subscribe ? x : Rx.Observable.of(x))
.mergeAll()
.subscribe(x => console.log(x));
Note 1: Note 2: |
@staltz might explaining that part? I realize that this is the default behavior in RxJS but to clarify - you mean this would be the behavior even if we defer notifications until after the subscription function is done like the issue suggests? |
@staltz I've corrected the example to use |
Hi @benjamingr, I meant the divergence EcmaObservable vs Array occurs if we defer notifications, but the divergence does not occur in RxJS. Actually, I just checked, and other schedulers in RxJS still preserve the expected order: Rx.Observable.of(1, 2, Rx.Observable.of(3, 4, Rx.Scheduler.asap), 5, Rx.Scheduler.asap)
.map(x => x.subscribe ? x : Rx.Observable.of(x, Rx.Scheduler.asap))
.mergeAll()
.subscribe(x => console.log(x));
So maybe there are hopes that |
Yeah I think flatten should maintain its behavior anyway :) |
I think this line of thinking is begging the question; that question being whether Observable is isomorphic with Array. It doesn't have to be, just like we've decided that observer doesn't have to be isomorphic with generator. In fact, we know that Observable isn't going to be isomorphic with Array, because some Array methods don't make sense for Observable (e.g. I think this just amounts to a good argument for not having a recursive, auto-flattening |
It can be conceptually like an array even if it doesn’t have the same API; an array is already some kind of conflation of a List and a Queue and a Stack, and if Observable only maps to one of those concepts, that’s still fine. |
Observable probably isn't any of those things. I think it's more like |
Isomorphism is effectively a "renaming" of entities that preserved behavior. Like how points in x/y like (2,2) and linear equations like 6x+1 are isomorphic because you can convert 6x+1 to the point (6,1), add it to another point - say (1,1) and get (7,1) and then turn it back to the polynomial 7x+1 and that would be like adding 1x+1 to it. It is a morphism (structure preserving map operation) that is invertible. With categories isomorphism is the same thing (structure preserving inversifle function) that effectively means they are identical and just a "renaming of one another". In no case were observables and arrays isomorphic (and if so - under (for) what mappings and operations?). What was claimed (and shown by Erik) is that observables can be created with a dual interface to that of iterables. In math in categories duality means a correspondence between categories that inverts the source and target of each mapping and the order of composing said mappings. Erik shows this here and it is effectively how observables were derived to begin with as far as I know :) In either case - a dual interface isn't something we should necessarily do in JS and indeed the conclusion of discussions in the past few years weren't concluded in favor of such a duality. |
Well, if we want to make analogies and observable would be like an iterable and a subscription would be like an iterator. |
@zenparsing do you have an example of code that would be complicated by allowing observables to emit while the subscribe function is executing?
The Also, FWIW: An alternative to the |
Sure. Consider the humble "filter": function filter(source, filterFn) {
return new Observable(observer => {
return source.subscribe(
val => {
try {
if (filterFn(val)) {
observer.next(val);
}
} catch (err) {
observer.error(err);
}
},
err => observer.error(err),
() => observer.complete()
);
});
} This implementation is actually incorrect with the current observable spec. Can you spot why? If function filterFn(v) {
console.log(v);
throw new Error('oops');
}
filter(Observable.of(1, 2, 3, 4, 5), filterFn).subscribe({ error() {} });
// Logs: 1, 2, 3, 4, 5 With the current proposal, all consumers (including combinators) need to be aware of this footgun. Currently, I'm not seeing the benefit from "early emission" which would justify such an ever-present footgun.
See the OP, it's all there. The idea is that all notifications sent "early" (i.e. before subscribe has returned) are queued and delivered in a job.
That might make the footgun a little easier to deal with, but would not eliminate it. Also, it would break the type signature consistency between "observer" and its |
@zenparsing I think the footgun is not necessarily due to early emission, but due to the fact that we can't naively execute subscribe logic after returning the subscription object, unless through a rearchitecture which is what RxJS does with its Subscriber architecture. I'm trying to point out that the footgun is not synchronous emission, but it's the return Subscription architecture. In another architecture, callbags, it supports all the semantics that current RxJS does, but doesn't return subscription objects. Check this trick I use when converting iterators to callbag streams: https://github.com/staltz/callbag-from-iter/blob/master/index.js#L20 The problem with returning Subscription objects is that they are fundamentally meant for simple upstream communication to the producer, but we may only receive if after the producer has sent us all the data. I think a more versatile architecture removes returns altogether. That's the case in callbags: the producer is (a function) called by the consumer to do subscribe logic, then the consumer is (a function) called by the producer to do data delivery. There is a greet stage, where both sides will get to acknowledge the existence of each other, and only after that data delivery will happen. Currently in naively implemented Observables, the consumer may get data delivered to it before it has been fully greeted by the producer (represented by the Subscription object). More about this here https://github.com/callbag/callbag and here https://github.com/callbag/callbag/blob/master/getting-started.md I'm not insisting on just selling my idea, just want to point out how sync emission isn't necessarily the root cause it seems to be. |
I agree with this view of the problem. It doesn't really make sense to have both a return-based handshake protocol and allow notifications before the subscription is returned (i.e. before the handshake has completed). We could get rid of the return portion of the handshake, for example by keeping Or we can disallow notifications before the subscription is returned, as suggested here. |
Keeping |
If we move away from a return-based protocol, then we'll have to explore other options for registering the cleanup routine other than returning it from the producer setup function. One option might be to require that the producer call start with an optional cleanup function: function interval(ms) {
return new Observable(observer => {
const interval = setInterval(() => observer.next(), ms);
observer.start(() => clearInterval(interval));
});
} And then for composition, function map(source, mapFn) {
return new Observable(observer => {
source.subscribe({
start(cancel) { observer.start(cancel) },
next(v) {
try { observer.next(mapFn(v)) }
catch (e) { observer.error(e) }
},
error(e) { observer.error(e) },
complete() { observer.complete() },
});
});
} A more complex example with this API: function combineLatest(...sources) {
if (sources.length === 0)
return Observable.of();
return new Observable(observer => {
let count = sources.length;
let values = new Map();
let cancels = [];
observer.start(() => cancels.forEach(cancel => cancel()));
sources.forEach((source, index) => {
if (observer.closed)
return;
Observable.from(source).subscribe({
start(cancel) {
cancels.push(cancel);
},
next(v) {
values.set(index, v);
if (values.size === sources.length)
observer.next(Array.from(values.values()));
},
error(e) {
observer.error(e);
},
complete() {
if (--count === 0)
observer.complete();
},
});
})
});
} With this design, the subscription object goes away entirely (which is kind of nice, since it was just a wrapper around |
👍 to this idea and to the removal of Subscription object as a wrapper. |
What do you think should happen if the producer attempts to call "next", "error", or "complete" before calling "start"? We can't send an error to the consumer, since the handshake hasn't completed yet. I suppose we could either:
I'm leaning towards throwing the error, since the handshake didn't complete and there's nowhere else for the error to go. |
I think that problem should be treated like any other violations of the Observable contract (such as calling The updated Observable contract should be, as regex, |
By the way, with the removal of the subscription object, there still needs to be a way for the Observable constructor to get a reference to the |
Yes. I'm thinking that the wrapper observer's
Good point. Another option that I didn't think of before: the wrapper observer could just automatically call |
I've created a branch of zen-observable using this modified "no-return" protocol, and here are my thoughts so far:
I think the last point means that we would have to drop Thoughts? |
Thumbs up for I'm curious about Jafar's thoughts. |
Definitely interested in this idea. Continue to believe sync notification is the right approach. Agree with the idea that the push and pull together is the biggest source of pain. It’s worth noting that this moves the shape of Observable closer to that of Java’s proposed reactive sreams, which don’t return Subscription objects. The main concern I have is the impact to the ergonomics of subscribe. This issue could be mitigated with takeUntil. It also may not be a big deal if we expect people to primarily use forEach. Overall I think there is a strong argument that returning a subscription is an attractive nuisance. |
Just to be clear... a "job" in this case refers to a "microtask"? If so, I think it will severely limit this Observable's usefulness as a primitive. It's very easy to "opt-in" to async, it's impossible to opt out. It also means that this Observable wouldn't be usable for modelling anything that could be synchronous, like EventTarget. One of the arguments that seems to have come up a few times is that we don't want the |
@benlesh |
@benlesh What do you think of the alternative presented in #183 (comment)? With that API, we retain synchronous |
I think the 99% use case of that will be trying to get that function out of that start handler so it can be used elsewhere: class SomeFrameworkyComponent {
onMounted() {
const self = this;
someObservable.subscribe({
start(cancel) {
self.unsubscribe = cancel;
}
});
}
onWillUnmount() {
if (this.unsubscribe) this.unsubscribe();
}
} Overall, it's not very ergonomic for that use case (which, in my experience, is the most common use case). Also, we had agreed in the past (I'd have to find the issue), that having an object with an It's going to be very rare that someone wants to even use |
This isn't true. const handler = () => console.log('clicked');
console.log('start');
document.addEventListener('click', handler);
document.dispatchEvent(new Event('click'));
document.removeEventListener('click', handler);
console.log('end'); |
What if class SomeFrameworkyComponent {
onMounted() {
this.unsubscribe = someObservable.subscribe({
start(cancel) {
// cancel === this.unsubscribe
}
});
}
onWillUnmount() {
if (this.unsubscribe) this.unsubscribe();
}
} So that That said, even the And yet another suggested usage: class SomeFrameworkyComponent {
onMounted() {
someObservable.subscribe(this);
}
start(cancel) {
this.cancel = cancel;
}
next(x) {
// the observer's 'next' is this method
}
onWillUnmount() {
if (this.cancel) this.cancel();
}
} |
There's no need to capture class SomeFrameworkyComponent {
onMounted() {
someObservable.subscribe({
start: (cancel) => this.unsubscribe = cancel,
});
}
onWillUnmount() {
if (this.unsubscribe) this.unsubscribe();
}
} |
Completely fair. It's still not as intuitive. Given this is the same with the current proposal:
That has been proposed a few times, and it was decided that having an explicit call named |
@benlesh Using the returned subscription object may be more intuitive, but unfortunately it's also subject to the footgun described in #183 (comment). In the general case (where an observable might emit early) the consumer might not have access to the subscription at the time when it needs to unsubscribe. |
@zenparsing An alternative, since this is just unsubscribing - we can be OK with it being impossible to unsubscribe synchronously or from events emitted synchronously. Honestly I don't think I've ever needed to unsubscribe synchronously. Unsubscribing being important typically implies there is expensive work to do or a resource being held onto which emission during subscriber isn't really a problem for. I only have my own limited experience though. |
@benjamingr it's not about explicit unsubscribes in a synchronous style, it's mostly about guaranteeing the observable contract is met, e.g. see the synchronous error example many comments above. |
@benjamingr We should be able to unsubscribe synchronously so that users can easily build their own combinators using the Observable constructor, without falling into the "the subscription object hasn't been returned yet" footgun. As @jhusain mentioned, class SomeFrameworkyComponent {
onMounted() {
this.abortController = new AbortController();
someObservable.takeUntil(this.abortController.signal).subscribe();
}
onWillUnmount() {
if (this.abortController)
this.abortController.abort();
}
} |
The above is actually a good reason to not have subscription objects at all, since they duplicate the functionality of cancel token-like things such as I'm wondering, since we're talking about changing the signature of |
See #179 for background discussion.
Motivation
The subscribe protocol is complicated by the fact that the producer is able to send notifications to the consumer before the call to
subscribe
has completed. We call this "early emission". In general, the programmer must account for "early emission" by providing astart
callback and obtaining a reference to the subscription object before any notifications are sent. In addition, the consumer may also need to remember to check if the subscription is closed after the call tosubscribe
completes."Early emission" has the following disadvantages:
start
method which is outside of the core Observable protocol.The primary advantage of allowing "early emission" is that it enables the user to synchronously convert from in-memory data to an observable and then back to in-memory data. This technique is frequently used for unit testing. However, by using
Observable.prototype.forEach
andasync/await
, users can convert from observables to in-memory data without significantly altering their unit tests.Goals
Proposal
1. Buffer notifications sent during subscription initialization
We can disallow "early emission", while providing interoperability with observable libraries that choose to allow it, by buffering all notifications that occur before the
subscribe
method has returned.During the call to
subscribe
, if the producer sends a notification to the SubscriptionObserver, that notification is placed in a queue, and a job is scheduled to flush this queue. If additional notifications are sent to the SubscriptionObserver object before the queue is flushed, those notifications are also enqueued.2. Send synchronous data in a job
We can modify
Observable.of
andObservable.from
(when operating on iterables) to schedule a job to send data to the observer.3. Remove support for the "start" method
Once we disallow "early emission", the start method is no longer required and can be removed from the API.
Additional Considerations
Observable.from
. When an "early emission" observable is consumed byObservable.from
, all "early emissions" are temporarily stored in the notification queue.Implementation Prototype
The text was updated successfully, but these errors were encountered: