diff --git a/.vscode/settings.json b/.vscode/settings.json index f1825c1..a13e05a 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -5,6 +5,7 @@ ], "cSpell.words": [ "Assistable", + "binaryheap", "bodyclose", "cmds", "coverpkg", @@ -15,6 +16,7 @@ "dogsled", "dotenv", "dupl", + "emirpasic", "errcheck", "exportloopref", "extendio", @@ -46,6 +48,7 @@ "nakedret", "nolint", "nolintlint", + "notif", "onecontext", "onsi", "outdir", diff --git a/go.mod b/go.mod index 010e605..794e6e3 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( ) require ( + github.com/emirpasic/gods v1.18.1 github.com/go-logr/logr v1.4.1 // indirect github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect github.com/google/go-cmp v0.6.0 // indirect diff --git a/go.sum b/go.sum index 1a1ea7f..fe321bc 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= +github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= diff --git a/rx/assert.go b/rx/assert.go index c7fdc5a..a6653f1 100644 --- a/rx/assert.go +++ b/rx/assert.go @@ -127,6 +127,11 @@ loop: break loop } + // TODO: needs to accommodate item.N, ie the numeric aux value + // and also should be modified to support all the other + // new ways of interpreting an item (Ch, Tick, Tv), possibly + // with new assertions, ie: HasCh, HasTick, HasTv. + // if item.IsError() { errs = append(errs, item.E) } else { @@ -226,6 +231,14 @@ func HasItem[T any](i T) RxAssert[T] { }) } +// HasItemsNoOrder checks that an observable produces the corresponding items regardless of the order. +func HasItemsNoOrder[T any](items ...T) RxAssert[T] { + return newAssertion(func(a *rxAssert[T]) { + a.checkHasItemsNoOrder = true + a.itemsNoOrder = items + }) +} + // IsNotEmpty checks that the observable produces some items. func IsNotEmpty[T any]() RxAssert[T] { return newAssertion(func(a *rxAssert[T]) { @@ -259,3 +272,15 @@ func HasNoError[T any]() RxAssert[T] { ra.checkHasNotRaisedError = true }) } + +// CustomPredicate checks a custom predicate. +func CustomPredicate[T any](predicate AssertPredicate[T]) RxAssert[T] { + return newAssertion(func(a *rxAssert[T]) { + if !a.checkHasCustomPredicate { + a.checkHasCustomPredicate = true + a.customPredicates = make([]AssertPredicate[T], 0) + } + + a.customPredicates = append(a.customPredicates, predicate) + }) +} diff --git a/rx/duration.go b/rx/duration.go new file mode 100644 index 0000000..7ab365c --- /dev/null +++ b/rx/duration.go @@ -0,0 +1,110 @@ +package rx + +import ( + "context" + "time" +) + +// Infinite represents an infinite wait time +var Infinite int64 = -1 + +// Duration represents a duration +type Duration interface { + duration() time.Duration +} + +type duration struct { + d time.Duration +} + +func (d *duration) duration() time.Duration { + return d.d +} + +// WithDuration is a duration option +func WithDuration(d time.Duration) Duration { + return &duration{ + d: d, + } +} + +type causalityDuration struct { + fs []execution +} + +type execution struct { + f func() + isTick bool +} + +func timeCausality[T any](elems ...any) (context.Context, Observable[T], Duration) { + ch := make(chan Item[T], 1) + fs := make([]execution, len(elems)+1) + ctx, cancel := context.WithCancel(context.Background()) + + for i, elem := range elems { + i := i + elem := elem + + if el, ok := elem.(Item[T]); ok && el.IsTick() { + fs[i] = execution{ + f: func() {}, + isTick: true, + } + } else { + switch elem := elem.(type) { + case Item[T]: + fs[i] = execution{ + f: func() { + ch <- elem + }, + } + + case error: + fs[i] = execution{ + f: func() { + ch <- Error[T](elem) + }, + } + + case T: + fs[i] = execution{ + f: func() { + ch <- Of(elem) + }, + } + } + } + } + + fs[len(elems)] = execution{ + f: func() { + cancel() + }, + isTick: false, + } + + return ctx, FromChannel(ch), &causalityDuration{fs: fs} +} + +func (d *causalityDuration) duration() time.Duration { + pop := d.fs[0] + pop.f() + + d.fs = d.fs[1:] + + if pop.isTick { + return time.Nanosecond + } + + return time.Minute +} + +// type mockDuration struct { +// mock.Mock +// } + +// func (m *mockDuration) duration() time.Duration { +// args := m.Called() +// return args.Get(0).(time.Duration) +// } diff --git a/rx/errors.go b/rx/errors.go new file mode 100644 index 0000000..0dfcf0e --- /dev/null +++ b/rx/errors.go @@ -0,0 +1,19 @@ +package rx + +// IllegalInputError is triggered when the observable receives an illegal input. +type IllegalInputError struct { + error string +} + +func (e IllegalInputError) Error() string { + return "illegal input: " + e.error +} + +// IndexOutOfBoundError is triggered when the observable cannot access to the specified index. +type IndexOutOfBoundError struct { + error string +} + +func (e IndexOutOfBoundError) Error() string { + return "index out of bound: " + e.error +} diff --git a/rx/factory.go b/rx/factory.go index 6206d30..d0e934f 100644 --- a/rx/factory.go +++ b/rx/factory.go @@ -1,6 +1,14 @@ package rx -import "github.com/samber/lo" +import ( + "context" + "math" + "sync" + "sync/atomic" + "time" + + "github.com/samber/lo" +) // Amb takes several Observables, emit all of the items from only the first of these Observables // to emit an item or notification. @@ -10,6 +18,132 @@ func Amb[T any](observables []Observable[T], opts ...Option[T]) Observable[T] { panic("Amb: NOT-IMPL") } +// CombineLatest combines the latest item emitted by each Observable via a specified function +// and emit items based on the results of this function. +func CombineLatest[T any](f FuncN[T], observables []Observable[T], opts ...Option[T]) Observable[T] { + option := parseOptions(opts...) + ctx := option.buildContext(emptyContext) + next := option.buildChannel() + + go func() { + var counter uint32 + + size := uint32(len(observables)) + s := make([]T, size) + mutex := sync.Mutex{} + errCh := make(chan struct{}) + wg := sync.WaitGroup{} + wg.Add(int(size)) + + handler := func(ctx context.Context, it Iterable[T], i int) { + defer wg.Done() + + observe := it.Observe(opts...) + + for { + select { + case <-ctx.Done(): + return + case item, ok := <-observe: + if !ok { + return + } + + if item.IsError() { + next <- item + errCh <- struct{}{} + + return + } + + if isZero(s[i]) { // s[i] == nil + atomic.AddUint32(&counter, 1) + } + + mutex.Lock() + s[i] = item.V + + if atomic.LoadUint32(&counter) == size { + next <- Of(f(s...)) + } + mutex.Unlock() + } + } + } + + cancelCtx, cancel := context.WithCancel(ctx) + + for i, o := range observables { + go handler(cancelCtx, o, i) + } + + go func() { + for range errCh { + cancel() + } + }() + + wg.Wait() + close(next) + close(errCh) + }() + + return &ObservableImpl[T]{ + iterable: newChannelIterable(next), + } +} + +// Concat emits the emissions from two or more Observables without interleaving them. +func Concat[T any](observables []Observable[T], opts ...Option[T]) Observable[T] { + option := parseOptions(opts...) + ctx := option.buildContext(emptyContext) + next := option.buildChannel() + + go func() { + defer close(next) + + for _, obs := range observables { + observe := obs.Observe(opts...) + loop: + for { + select { + case <-ctx.Done(): + return + case item, ok := <-observe: + if !ok { + break loop + } + if item.IsError() { + next <- item + return + } + next <- item + } + } + } + }() + + return &ObservableImpl[T]{ + iterable: newChannelIterable(next), + } +} + +// Create creates an Observable from scratch by calling observer methods programmatically. +func Create[T any](f []Producer[T], opts ...Option[T]) Observable[T] { + return &ObservableImpl[T]{ + iterable: newCreateIterable(f, opts...), + } +} + +// Defer does not create the Observable until the observer subscribes, +// and creates a fresh Observable for each observer. This creates a cold +// observable. +func Defer[T any](f []Producer[T], opts ...Option[T]) Observable[T] { + return &ObservableImpl[T]{ + iterable: newDeferIterable(f, opts...), + } +} + // Empty creates an Observable with no item and terminate immediately. func Empty[T any]() Observable[T] { next := make(chan Item[T]) @@ -31,6 +165,47 @@ func FromChannel[T any](next <-chan Item[T], opts ...Option[T]) Observable[T] { } } +// FromEventSource creates a hot observable from a channel. +func FromEventSource[T any](next <-chan Item[T], opts ...Option[T]) Observable[T] { + option := parseOptions(opts...) + + return &ObservableImpl[T]{ + iterable: newEventSourceIterable(option.buildContext(emptyContext), + next, option.getBackPressureStrategy(), + ), + } +} + +// Interval creates an Observable emitting incremental integers infinitely between +// each given time interval. +func Interval[T any](interval Duration, opts ...Option[T]) Observable[T] { + option := parseOptions(opts...) + next := option.buildChannel() + ctx := option.buildContext(emptyContext) + + go func() { + i := 0 + + for { + select { + case <-time.After(interval.duration()): + if !Tv[T](i).SendContext(ctx, next) { + return + } + + i++ + case <-ctx.Done(): + close(next) + return + } + } + }() + + return &ObservableImpl[T]{ + iterable: newEventSourceIterable(ctx, next, option.getBackPressureStrategy()), + } +} + // Just creates an Observable with the provided items. func Just[T any](values ...T) func(opts ...Option[T]) Observable[T] { return func(opts ...Option[T]) Observable[T] { @@ -71,6 +246,53 @@ func JustError[T any](err error) func(opts ...Option[T]) Single[T] { } } +// Merge combines multiple Observables into one by merging their emissions +func Merge[T any](observables []Observable[T], opts ...Option[T]) Observable[T] { + option := parseOptions(opts...) + ctx := option.buildContext(emptyContext) + next := option.buildChannel() + wg := sync.WaitGroup{} + wg.Add(len(observables)) + + f := func(o Observable[T]) { + defer wg.Done() + + observe := o.Observe(opts...) + + for { + select { + case <-ctx.Done(): + return + case item, ok := <-observe: + if !ok { + return + } + + if item.IsError() { + next <- item + + return + } + next <- item + } + } + } + + for _, o := range observables { + go f(o) + } + + go func() { + wg.Wait() + + close(next) + }() + + return &ObservableImpl[T]{ + iterable: newChannelIterable(next), + } +} + // Never creates an Observable that emits no items and does not terminate. func Never[T any]() Observable[T] { next := make(chan Item[T]) @@ -79,3 +301,79 @@ func Never[T any]() Observable[T] { iterable: newChannelIterable(next), } } + +// Range creates an Observable that emits count sequential integers beginning +// at start. +func Range[T any](start, count int, opts ...Option[T]) Observable[T] { + if count < 0 { + return Thrown[T](IllegalInputError{ + error: "count must be positive", // TODO(i18n) + }) + } + + if start+count-1 > math.MaxInt32 { + return Thrown[T](IllegalInputError{ + error: "max value is bigger than math.MaxInt32", + }) + } + + return &ObservableImpl[T]{ + iterable: newRangeIterable(start, count, opts...), + } +} + +// Start creates an Observable from one or more directive-like Supplier +// and emits the result of each operation asynchronously on a new Observable. +func Start[T any](fs []Supplier[T], opts ...Option[T]) Observable[T] { + option := parseOptions(opts...) + next := option.buildChannel() + ctx := option.buildContext(emptyContext) + + go func() { + defer close(next) + + for _, f := range fs { + select { + case <-ctx.Done(): + return + case next <- f(ctx): + } + } + }() + + return &ObservableImpl[T]{ + iterable: newChannelIterable(next), + } +} + +// Thrown creates an Observable that emits no items and terminates with an error. +func Thrown[T any](err error) Observable[T] { + next := make(chan Item[T], 1) + next <- Error[T](err) + close(next) + + return &ObservableImpl[T]{ + iterable: newChannelIterable(next), + } +} + +// Timer returns an Observable that completes after a specified delay. +func Timer[T any](d Duration, opts ...Option[T]) Observable[T] { + option := parseOptions(opts...) + next := make(chan Item[T], 1) + ctx := option.buildContext(emptyContext) + + go func() { + defer close(next) + select { + case <-ctx.Done(): + return + case <-time.After(d.duration()): + return + } + }() + + return &ObservableImpl[T]{ + iterable: newChannelIterable(next), + } +} diff --git a/rx/factory_test.go b/rx/factory_test.go new file mode 100644 index 0000000..55757f6 --- /dev/null +++ b/rx/factory_test.go @@ -0,0 +1,716 @@ +package rx_test + +import ( + "context" + "errors" + "time" + + "github.com/fortytw2/leaktest" + . "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok + . "github.com/onsi/gomega" //nolint:revive // gomega ok + "github.com/samber/lo" + + "github.com/snivilised/lorax/rx" +) + +// connect should go into factory-connectable_test +func collect[T any](ctx context.Context, ch <-chan rx.Item[T]) ([]any, error) { + s := make([]any, 0) + + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case item, ok := <-ch: + if !ok { + return s, nil + } + + if item.IsError() { + s = append(s, item.E) + } else { + s = append(s, item.V) + } + } + } +} + +var _ = Describe("Factory", func() { + // TODO: Amb1, Amb2 + + Context("CombineLatest", func() { + When("Multiple observables", func() { + It("🧪 should: combine", func() { + defer leaktest.Check(GinkgoT())() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + obs := rx.CombineLatest(func(values ...int) int { + return lo.Sum(values) + }, lo.Map([]rx.Observable[int]{ + testObservable[int](ctx, 1, 2), + testObservable[int](ctx, 10, 11), + }, func(it rx.Observable[int], _ int) rx.Observable[int] { + return it + })) + + rx.Assert(context.Background(), obs, rx.IsNotEmpty[int]()) + }) + }) + + When("Empty", func() { + It("🧪 should: be able to detect empty observable", func() { + defer leaktest.Check(GinkgoT())() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + obs := rx.CombineLatest(func(values ...int) int { + return lo.Sum(values) + }, lo.Map([]rx.Observable[int]{ + testObservable[int](ctx, 1, 2), + rx.Empty[int](), + }, func(it rx.Observable[int], _ int) rx.Observable[int] { + return it + })) + + rx.Assert(context.Background(), obs, rx.IsEmpty[int]()) + }) + }) + + When("Contains error", func() { + It("🧪 should: be able to detect error", func() { + defer leaktest.Check(GinkgoT())() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + obs := rx.CombineLatest(func(values ...int) int { + return lo.Sum(values) + }, lo.Map([]rx.Observable[int]{ + testObservable[int](ctx, 1, 2), + testObservable[int](ctx, errFoo), + }, func(it rx.Observable[int], _ int) rx.Observable[int] { + return it + })) + + rx.Assert(context.Background(), obs, rx.IsEmpty[int](), rx.HasError[int](errFoo)) + }) + }) + }) + + Context("Concat", func() { + When("Single observable", func() { + It("🧪 should: create derived single observable", func() { + defer leaktest.Check(GinkgoT())() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + obs := rx.Concat([]rx.Observable[int]{ + testObservable[int](ctx, 1, 2, 3), + }) + rx.Assert(context.Background(), obs, rx.HasItems([]int{1, 2, 3})) + }) + }) + + When("Two observables", func() { + It("🧪 should: create derived compound single observable", func() { + defer leaktest.Check(GinkgoT())() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + obs := rx.Concat([]rx.Observable[int]{ + testObservable[int](ctx, 1, 2, 3), + testObservable[int](ctx, 4, 5, 6), + }) + rx.Assert(context.Background(), obs, rx.HasItems([]int{1, 2, 3, 4, 5, 6})) + }) + }) + + When("More than two observables", func() { + It("🧪 should: create derived compound single observable", func() { + defer leaktest.Check(GinkgoT())() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + obs := rx.Concat([]rx.Observable[int]{ + testObservable[int](ctx, 1, 2, 3), + testObservable[int](ctx, 4, 5, 6), + testObservable[int](ctx, 7, 8, 9), + }) + rx.Assert(context.Background(), obs, rx.HasItems([]int{1, 2, 3, 4, 5, 6, 7, 8, 9})) + }) + }) + + When("Multiple empty observables", func() { + It("🧪 should: create derived compound single observable", func() { + defer leaktest.Check(GinkgoT())() + + obs := rx.Concat([]rx.Observable[int]{ + rx.Empty[int](), + rx.Empty[int](), + rx.Empty[int](), + }) + rx.Assert(context.Background(), obs, rx.IsEmpty[int]()) + }) + }) + + When("One empty observable", func() { + It("🧪 should: create derived compound single observable", func() { + defer leaktest.Check(GinkgoT())() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + obs := rx.Concat([]rx.Observable[int]{ + rx.Empty[int](), + testObservable[int](ctx, 1, 2, 3), + }) + rx.Assert(context.Background(), obs, rx.HasItems([]int{1, 2, 3})) + + obs = rx.Concat([]rx.Observable[int]{ + testObservable[int](ctx, 1, 2, 3), + rx.Empty[int](), + }) + rx.Assert(context.Background(), obs, rx.HasItems([]int{1, 2, 3})) + }) + }) + }) + + Context("Create", func() { + When("provided with a Producer", func() { + It("🧪 should: create observable", func() { + defer leaktest.Check(GinkgoT())() + + obs := rx.Create([]rx.Producer[int]{func(_ context.Context, next chan<- rx.Item[int]) { + next <- rx.Of(1) + next <- rx.Of(2) + next <- rx.Of(3) + }}) + rx.Assert(context.Background(), obs, rx.HasItems([]int{1, 2, 3}), rx.HasNoError[int]()) + }) + }) + + When("Provided with a Producer", func() { + It("🧪 should: create observable (single dup?)", func() { + defer leaktest.Check(GinkgoT())() + + obs := rx.Create([]rx.Producer[int]{func(_ context.Context, next chan<- rx.Item[int]) { + next <- rx.Of(1) + next <- rx.Of(2) + next <- rx.Of(3) + }}) + rx.Assert(context.Background(), obs, rx.HasItems([]int{1, 2, 3}), rx.HasNoError[int]()) + rx.Assert(context.Background(), obs, rx.IsEmpty[int](), rx.HasNoError[int]()) + }) + }) + + When("context cancelled", func() { + It("🧪 should: create observable", func() { + defer leaktest.Check(GinkgoT())() + + closed1 := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + + _ = rx.Create([]rx.Producer[int]{ + func(_ context.Context, _ chan<- rx.Item[int]) { + cancel() + }, + func(ctx context.Context, _ chan<- rx.Item[int]) { + <-ctx.Done() + closed1 <- struct{}{} + }, + }, rx.WithContext[int](ctx)).Run() + + select { + case <-time.Tick(time.Second): + Fail("producer not closed") + + case <-closed1: + } + }) + }) + }) + + Context("Defer", func() { + When("single", func() { + It("🧪 should: create deferred observer", func() { + defer leaktest.Check(GinkgoT())() + + obs := rx.Defer([]rx.Producer[int]{ + func(_ context.Context, next chan<- rx.Item[int]) { + next <- rx.Of(1) + next <- rx.Of(2) + next <- rx.Of(3) + }}) + rx.Assert(context.Background(), obs, rx.HasItems([]int{1, 2, 3}), rx.HasNoError[int]()) + }) + }) + + When("multiple", func() { + It("should: create deferred observer", func() { + defer leaktest.Check(GinkgoT())() + + obs := rx.Defer([]rx.Producer[int]{ + func(_ context.Context, next chan<- rx.Item[int]) { + next <- rx.Of(1) + next <- rx.Of(2) + }, + func(_ context.Context, next chan<- rx.Item[int]) { + next <- rx.Of(10) + next <- rx.Of(20) + }, + }) + + rx.Assert(context.Background(), obs, rx.HasItemsNoOrder(1, 2, 10, 20), rx.HasNoError[int]()) + }) + }) + + When("context cancelled", func() { + It("🧪 should: create deferred observable", func() { + defer leaktest.Check(GinkgoT())() + + closed1 := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + + _ = rx.Defer([]rx.Producer[int]{ + func(_ context.Context, _ chan<- rx.Item[int]) { + cancel() + }, + func(ctx context.Context, _ chan<- rx.Item[int]) { + <-ctx.Done() + closed1 <- struct{}{} + }, + }, rx.WithContext[int](ctx)).Run() + + select { + case <-time.Tick(time.Second): + Fail("producer not closed") + + case <-closed1: + } + }) + }) + + When("Provided with a Producer", func() { + It("🧪 should: create deferred observable (single dup?)", func() { + defer leaktest.Check(GinkgoT())() + + obs := rx.Defer([]rx.Producer[int]{func(_ context.Context, next chan<- rx.Item[int]) { + next <- rx.Of(1) + next <- rx.Of(2) + next <- rx.Of(3) + }}) + rx.Assert(context.Background(), obs, rx.HasItems([]int{1, 2, 3}), rx.HasNoError[int]()) + rx.Assert(context.Background(), obs, rx.HasItems([]int{1, 2, 3}), rx.HasNoError[int]()) + }) + }) + + When("ComposeDup", func() { + It("🧪 should: create deferred observable (composed dup?)", func() { + defer leaktest.Check(GinkgoT())() + + obs := rx.Defer([]rx.Producer[int]{func(_ context.Context, next chan<- rx.Item[int]) { + next <- rx.Of(1) + next <- rx.Of(2) + next <- rx.Of(3) + }}).Map(func(_ context.Context, i int) (_ int, _ error) { + return i + 1, nil + }).Map(func(_ context.Context, i int) (_ int, _ error) { + return i + 1, nil + }) + rx.Assert(context.Background(), obs, rx.HasItems([]int{3, 4, 5}), rx.HasNoError[int]()) + rx.Assert(context.Background(), obs, rx.HasItems([]int{3, 4, 5}), rx.HasNoError[int]()) + }) + }) + + When("ComposeDup with eager observation", func() { + It("🧪 should: create deferred observable (composed dup?)", func() { + defer leaktest.Check(GinkgoT())() + + obs := rx.Defer([]rx.Producer[int]{func(_ context.Context, next chan<- rx.Item[int]) { + next <- rx.Of(1) + next <- rx.Of(2) + next <- rx.Of(3) + }}).Map(func(_ context.Context, i int) (_ int, _ error) { + return i + 1, nil + }, rx.WithObservationStrategy[int](rx.Eager)).Map(func(_ context.Context, i int) (_ int, _ error) { + return i + 1, nil + }) + rx.Assert(context.Background(), obs, rx.HasItems([]int{3, 4, 5}), rx.HasNoError[int]()) + // In the case of an eager observation, we already consumed the items produced by Defer + // So if we create another subscription, it will be empty + rx.Assert(context.Background(), obs, rx.IsEmpty[int](), rx.HasNoError[int]()) + }) + }) + + When("Error", func() { + It("🧪 should: be detectable in observable", func() { + defer leaktest.Check(GinkgoT())() + + obs := rx.Defer([]rx.Producer[int]{func(_ context.Context, next chan<- rx.Item[int]) { + next <- rx.Of(1) + next <- rx.Of(2) + next <- rx.Error[int](errFoo) + }}) + rx.Assert(context.Background(), obs, rx.HasItems([]int{1, 2}), rx.HasError[int](errFoo)) + }) + }) + }) + + Context("Empty", func() { + It("🧪 should: contain no elements", func() { + defer leaktest.Check(GinkgoT())() + + obs := rx.Empty[int]() + rx.Assert(context.Background(), obs, rx.IsEmpty[int]()) + }) + }) + + Context("FromChannel", func() { + It("🧪 should: create observable from channel", func() { + defer leaktest.Check(GinkgoT())() + + ch := make(chan rx.Item[int]) + go func() { + ch <- rx.Of(1) + ch <- rx.Of(2) + ch <- rx.Of(3) + close(ch) + }() + + obs := rx.FromChannel(ch) + rx.Assert(context.Background(), obs, rx.HasItems([]int{1, 2, 3}), rx.HasNoError[int]()) + }) + + When("SimpleCapacity", func() { + It("🧪 should: ???", func() { + defer leaktest.Check(GinkgoT())() + + ch := rx.FromChannel(make(chan rx.Item[int], 10)).Observe() + Expect(cap(ch)).To(Equal(10)) + }) + }) + + When("ComposedCapacity", func() { + XIt("🧪 should: ???", func() { + defer leaktest.Check(GinkgoT())() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + obs1 := rx.FromChannel(make(chan rx.Item[int], 10)). + Map(func(_ context.Context, _ int) (int, error) { + return 1, nil + }, rx.WithContext[int](ctx), rx.WithBufferedChannel[int](11)) + + Expect(cap(obs1.Observe())).To(Equal(11)) + + obs2 := obs1.Map(func(_ context.Context, _ int) (int, error) { + return 1, nil + }, rx.WithContext[int](ctx), rx.WithBufferedChannel[int](12)) + + // FAILED => Observe returns 0 + Expect(cap(obs2.Observe())).To(Equal(12)) + }) + }) + }) + + Context("FromEventSource", func() { + When("Observation after all sent", func() { + It("🧪 should: not see any items", func() { + defer leaktest.Check(GinkgoT())() + + const max = 10 + next := make(chan rx.Item[int], max) + obs := rx.FromEventSource(next, rx.WithBackPressureStrategy[int](rx.Drop)) + + go func() { + for i := 0; i < max; i++ { + next <- rx.Of(i) + } + close(next) + }() + time.Sleep(50 * time.Millisecond) + + rx.Assert(context.Background(), obs, rx.CustomPredicate(func(items []int) error { + if len(items) != 0 { + return errors.New("items should be nil") + } + + return nil + })) + }) + }) + + When("Drop", func() { + It("🧪 should: ???", func() { + defer leaktest.Check(GinkgoT())() + + const max = 100000 + next := make(chan rx.Item[int], max) + obs := rx.FromEventSource(next, rx.WithBackPressureStrategy[int](rx.Drop)) + + go func() { + for i := 0; i < max; i++ { + next <- rx.Of(i) + } + close(next) + }() + + rx.Assert(context.Background(), obs, rx.CustomPredicate(func(items []int) error { + if len(items) == max { + return errors.New("some items should be dropped") + } + if len(items) == 0 { + return errors.New("no items") + } + + return nil + })) + }) + }) + }) + + Context("JustItem", func() { + When("given: a value", func() { + It("🧪 should: return a single item observable containing value", func() { + defer leaktest.Check(GinkgoT())() + + single := rx.JustItem(1) + rx.Assert(context.Background(), single, rx.HasItem(1), rx.HasNoError[int]()) + rx.Assert(context.Background(), single, rx.HasItem(1), rx.HasNoError[int]()) + }) + }) + }) + + Context("Just", func() { + When("given: a value", func() { + It("🧪 should: return a single item observable containing value", func() { + defer leaktest.Check(GinkgoT())() + + obs := rx.Just(1, 2, 3)() + rx.Assert(context.Background(), obs, rx.HasItems([]int{1, 2, 3}), rx.HasNoError[int]()) + rx.Assert(context.Background(), obs, rx.HasItems([]int{1, 2, 3}), rx.HasNoError[int]()) + }) + }) + + When("given: custom structure", func() { + It("🧪 should: ", func() { + defer leaktest.Check(GinkgoT())() + + type customer struct { + id int + } + + obs := rx.Just([]customer{{id: 1}, {id: 2}, {id: 3}}...)() + rx.Assert(context.Background(), obs, + rx.HasItems([]customer{{id: 1}, {id: 2}, {id: 3}}), + rx.HasNoError[customer](), + ) + + rx.Assert(context.Background(), obs, + rx.HasItems([]customer{{id: 1}, {id: 2}, {id: 3}}), + rx.HasNoError[customer](), + ) + }) + }) + + When("given: channel", func() { + XIt("🧪 should: ???", func() { + defer leaktest.Check(GinkgoT())() + + ch := make(chan int, 1) + go func() { + ch <- 1 + ch <- 2 + ch <- 3 + close(ch) + }() + obs := rx.Just(ch)() + _ = obs + + // TODO(fix): o := rx.Ch[int](1) + // rx.Assert(context.Background(), obs, rx.HasItems[int]([]int{1, 2, 3})) + }) + }) + + When("given: simple capacity", func() { + It("🧪 should: ???", func() { + defer leaktest.Check(GinkgoT())() + + ch := rx.Just(1)(rx.WithBufferedChannel[int](5)).Observe() + Expect(cap(ch)).To(Equal(5)) + }) + }) + + When("given: composed capacity", func() { + XIt("🧪 should: ???", func() { + defer leaktest.Check(GinkgoT())() + + obs1 := rx.Just(1)().Map(func(_ context.Context, _ int) (int, error) { + return 1, nil + }, rx.WithBufferedChannel[int](11)) + // FAILED => Observe returns 0 + Expect(cap(obs1.Observe())).To(Equal(11)) + + obs2 := obs1.Map(func(_ context.Context, _ int) (int, error) { + return 1, nil + }, rx.WithBufferedChannel[int](12)) + Expect(cap(obs2.Observe())).To(Equal(12)) + }) + }) + }) + + Context("Merge", func() { + When("given, multiple observers", func() { + It("🧪 should: combine into a single observer", func() { + defer leaktest.Check(GinkgoT())() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + obs := rx.Merge([]rx.Observable[int]{ + testObservable[int](ctx, 1, 2), + testObservable[int](ctx, 3, 4), + }) + rx.Assert(context.Background(), obs, rx.HasItemsNoOrder(1, 2, 3, 4)) + }) + }) + + When("given, multiple observers and contains error", func() { + It("🧪 should: able to detect error in combined observable", func() { + defer leaktest.Check(GinkgoT())() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + obs := rx.Merge([]rx.Observable[int]{ + testObservable[int](ctx, 1, 2), + testObservable[int](ctx, 3, errFoo), + }) + + // The content is not deterministic, hence we just test if we have some items + rx.Assert(context.Background(), obs, rx.IsNotEmpty[int](), rx.HasError[int](errFoo)) + }) + }) + }) + + Context("Range", func() { + When("positive count", func() { + XIt("🧪 should: create observable", func() { + defer leaktest.Check(GinkgoT())() + + /* + this code inside rx.Assert: + if item.IsError() { + errs = append(errs, item.E) + } else { + got = append(got, item.V) + } + needs to accommodate item.N, ie the numeric aux value + and also should be modified to support all the other + new ways of interpreting an item (Ch, Tick, Tv) + */ + + const ( + start = 5 + count = 3 + ) + + obs := rx.Range[int](start, count) + rx.Assert(context.Background(), obs, rx.HasItems([]int{5, 6, 7})) + // Test whether the observable is reproducible + rx.Assert(context.Background(), obs, rx.HasItems([]int{5, 6, 7})) + }) + }) + + When("negative count", func() { + It("🧪 should: contain detectable error", func() { + defer leaktest.Check(GinkgoT())() + + obs := rx.Range[int](1, -5) + rx.Assert(context.Background(), obs, rx.HasAnError[int]()) + }) + }) + + When("maximum exceeded", func() { + It("🧪 should: contain detectable error", func() { + defer leaktest.Check(GinkgoT())() + + const ( + start = 1 << 31 + count = 1 + ) + + obs := rx.Range[int](start, count) + rx.Assert(context.Background(), obs, rx.HasAnError[int]()) + }) + }) + }) + + Context("Start", func() { + When("using Supplier", func() { + It("🧪 should: ???", func() { + defer leaktest.Check(GinkgoT())() + + obs := rx.Start([]rx.Supplier[int]{func(_ context.Context) rx.Item[int] { + return rx.Of(1) + }, func(_ context.Context) rx.Item[int] { + return rx.Of(2) + }}) + rx.Assert(context.Background(), obs, rx.HasItemsNoOrder(1, 2)) + }) + }) + }) + + Context("Thrown", func() { + When("foo", func() { + It("🧪 should: ", func() { + defer leaktest.Check(GinkgoT())() + + obs := rx.Thrown[int](errFoo) + rx.Assert(context.Background(), obs, rx.HasError[int](errFoo)) + }) + }) + }) + + Context("Timer", func() { + When("foo", func() { + It("🧪 should: ???", func() { + defer leaktest.Check(GinkgoT())() + + obs := rx.Timer[int](rx.WithDuration(time.Nanosecond)) + select { + case <-time.Tick(time.Second): + Fail("observable not closed") + case <-obs.Observe(): + } + }) + }) + + When("Empty", func() { + It("🧪 should: ???", func() { + defer leaktest.Check(GinkgoT())() + + ctx, cancel := context.WithCancel(context.Background()) + obs := rx.Timer(rx.WithDuration(time.Hour), rx.WithContext[int](ctx)) + + go func() { + time.Sleep(50 * time.Millisecond) + cancel() + }() + + select { + case <-time.Tick(time.Second): + Fail("observable not closed") + case <-obs.Observe(): + } + }) + }) + }) +}) diff --git a/rx/item.go b/rx/item.go index 76478db..cc60438 100644 --- a/rx/item.go +++ b/rx/item.go @@ -12,7 +12,13 @@ type ( Item[T any] struct { V T E error - C chan<- Item[T] + // + C chan<- Item[T] + tick bool + tickV bool + numeric bool + TV int + N int } // TimestampItem attach a timestamp to an item. @@ -47,6 +53,27 @@ func Ch[T any](ch any) Item[T] { panic("invalid ch type") } +// Tick creates a type safe tick instance +func Tick[T any]() Item[T] { + return Item[T]{tick: true} +} + +// Tv creates a type safe tick value instance +func Tv[T any](tv int) Item[T] { + return Item[T]{ + TV: tv, + tickV: true, + } +} + +// Num creates a type safe tick value instance +func Num[T any](n int) Item[T] { + return Item[T]{ + N: n, + numeric: true, + } +} + // Error creates an item from an error. func Error[T any](err error) Item[T] { return Item[T]{E: err} @@ -128,6 +155,21 @@ func (i Item[T]) IsError() bool { return i.E != nil } +// IsTick checks if an item is a tick instance. +func (i Item[T]) IsTick() bool { + return i.tick +} + +// IsTickValue checks if an item is a tick instance. +func (i Item[T]) IsTickValue() bool { + return i.tickV +} + +// IsTickValue checks if an item is a tick instance. +func (i Item[T]) IsNumeric() bool { + return i.numeric +} + // SendBlocking sends an item and blocks until it is sent. func (i Item[T]) SendBlocking(ch chan<- Item[T]) { ch <- i diff --git a/rx/iterable-create.go b/rx/iterable-create.go new file mode 100644 index 0000000..b0f5089 --- /dev/null +++ b/rx/iterable-create.go @@ -0,0 +1,93 @@ +package rx + +import ( + "context" + "sync" +) + +type createIterable[T any] struct { + next <-chan Item[T] + opts []Option[T] + subscribers []chan Item[T] + mutex sync.RWMutex + producerAlreadyCreated bool +} + +func newCreateIterable[T any](fs []Producer[T], opts ...Option[T]) Iterable[T] { + option := parseOptions(opts...) + next := option.buildChannel() + ctx := option.buildContext(emptyContext) + + go func() { + defer close(next) + + for _, f := range fs { + f(ctx, next) + } + }() + + return &createIterable[T]{ + opts: opts, + next: next, + } +} + +func (i *createIterable[T]) Observe(opts ...Option[T]) <-chan Item[T] { + mergedOptions := make([]Option[T], 0, len(opts)) + copy(mergedOptions, opts) + mergedOptions = append(mergedOptions, opts...) + option := parseOptions(mergedOptions...) + + if !option.isConnectable() { + return i.next + } + + if option.isConnectOperation() { + i.connect(option.buildContext(emptyContext)) + return nil + } + + ch := option.buildChannel() + + i.mutex.Lock() + i.subscribers = append(i.subscribers, ch) + i.mutex.Unlock() + + return ch +} + +func (i *createIterable[T]) connect(ctx context.Context) { + i.mutex.Lock() + if !i.producerAlreadyCreated { + go i.produce(ctx) + i.producerAlreadyCreated = true + } + i.mutex.Unlock() +} + +func (i *createIterable[T]) produce(ctx context.Context) { + defer func() { + i.mutex.RLock() + for _, subscriber := range i.subscribers { + close(subscriber) + } + i.mutex.RUnlock() + }() + + for { + select { + case <-ctx.Done(): + return + case item, ok := <-i.next: + if !ok { + return + } + + i.mutex.RLock() + for _, subscriber := range i.subscribers { + subscriber <- item + } + i.mutex.RUnlock() + } + } +} diff --git a/rx/iterable-defer.go b/rx/iterable-defer.go new file mode 100644 index 0000000..716e017 --- /dev/null +++ b/rx/iterable-defer.go @@ -0,0 +1,29 @@ +package rx + +type deferIterable[T any] struct { + fs []Producer[T] + opts []Option[T] +} + +func newDeferIterable[T any](f []Producer[T], opts ...Option[T]) Iterable[T] { + return &deferIterable[T]{ + fs: f, + opts: opts, + } +} + +func (i *deferIterable[T]) Observe(opts ...Option[T]) <-chan Item[T] { + option := parseOptions(append(i.opts, opts...)...) + next := option.buildChannel() + ctx := option.buildContext(emptyContext) + + go func() { + defer close(next) + + for _, f := range i.fs { + f(ctx, next) + } + }() + + return next +} diff --git a/rx/iterable-event-source.go b/rx/iterable-event-source.go new file mode 100644 index 0000000..49ce411 --- /dev/null +++ b/rx/iterable-event-source.go @@ -0,0 +1,97 @@ +package rx + +import ( + "context" + "sync" +) + +type eventSourceIterable[T any] struct { + sync.RWMutex + observers []chan Item[T] + disposed bool + opts []Option[T] +} + +func newEventSourceIterable[T any](ctx context.Context, + next <-chan Item[T], strategy BackPressureStrategy, opts ...Option[T], +) Iterable[T] { + it := &eventSourceIterable[T]{ + observers: make([]chan Item[T], 0), + opts: opts, + } + + go func() { + defer func() { + it.closeAllObservers() + }() + + deliver := func(item Item[T]) (done bool) { + it.RLock() + defer it.RUnlock() + + switch strategy { + default: + fallthrough + case Block: + for _, observer := range it.observers { + if !item.SendContext(ctx, observer) { + return true + } + } + case Drop: + for _, observer := range it.observers { + select { + default: + case <-ctx.Done(): + return true + case observer <- item: + } + } + } + + return + } + + for { + select { + case <-ctx.Done(): + return + case item, ok := <-next: + if !ok { + return + } + + if done := deliver(item); done { + return + } + } + } + }() + + return it +} + +func (i *eventSourceIterable[T]) closeAllObservers() { + i.Lock() + for _, observer := range i.observers { + close(observer) + } + + i.disposed = true + i.Unlock() +} + +func (i *eventSourceIterable[T]) Observe(opts ...Option[T]) <-chan Item[T] { + option := parseOptions(append(i.opts, opts...)...) + next := option.buildChannel() + + i.Lock() + if i.disposed { + close(next) + } else { + i.observers = append(i.observers, next) + } + i.Unlock() + + return next +} diff --git a/rx/iterable-range.go b/rx/iterable-range.go new file mode 100644 index 0000000..7f37389 --- /dev/null +++ b/rx/iterable-range.go @@ -0,0 +1,33 @@ +package rx + +type rangeIterable[T any] struct { + start, count int + opts []Option[T] +} + +func newRangeIterable[T any](start, count int, opts ...Option[T]) Iterable[T] { + return &rangeIterable[T]{ + start: start, + count: count, + opts: opts, + } +} + +func (i *rangeIterable[T]) Observe(opts ...Option[T]) <-chan Item[T] { + option := parseOptions(append(i.opts, opts...)...) + ctx := option.buildContext(emptyContext) + next := option.buildChannel() + + go func() { + for idx := i.start; idx <= i.start+i.count-1; idx++ { + select { + case <-ctx.Done(): + return + case next <- Num[T](idx): + } + } + close(next) + }() + + return next +} diff --git a/rx/observable-operator.go b/rx/observable-operator.go index 3b0ffea..6880dac 100644 --- a/rx/observable-operator.go +++ b/rx/observable-operator.go @@ -9,6 +9,32 @@ func (o *ObservableImpl[T]) Observe(opts ...Option[T]) <-chan Item[T] { return o.iterable.Observe(opts...) } +// Run creates an Observer without consuming the emitted items. +func (o *ObservableImpl[T]) Run(opts ...Option[T]) Disposed { + dispose := make(chan struct{}) + option := parseOptions(opts...) + ctx := option.buildContext(o.parent) + + go func() { + defer close(dispose) + + observe := o.Observe(opts...) + + for { + select { + case <-ctx.Done(): + return + case _, ok := <-observe: + if !ok { + return + } + } + } + }() + + return dispose +} + // Max determines and emits the maximum-valued item emitted by an Observable according to a comparator. func (o *ObservableImpl[T]) Max(comparator Comparator[T], opts ...Option[T], @@ -26,7 +52,7 @@ func (o *ObservableImpl[T]) Max(comparator Comparator[T], }, forceSeq, bypassGather, opts...) } -func isLimitDefined[T any](limit T) bool { +func isZero[T any](limit T) bool { val := reflect.ValueOf(limit).Interface() zero := reflect.Zero(reflect.TypeOf(limit)).Interface() @@ -51,7 +77,7 @@ func (op *maxOperator[T]) next(_ context.Context, // op.max = item.V // } // } - if !isLimitDefined(op.max) || (op.comparator(op.max, item.V) < 0) { + if !isZero(op.max) || (op.comparator(op.max, item.V) < 0) { op.max = item.V } } @@ -91,6 +117,59 @@ func (o *ObservableImpl[T]) Min(comparator Comparator[T], opts ...Option[T]) Opt }, forceSeq, bypassGather, opts...) } +// Map transforms the items emitted by an Observable by applying a function to each item. +func (o *ObservableImpl[T]) Map(apply Func[T], opts ...Option[T]) Observable[T] { + const ( + forceSeq = false + bypassGather = true + ) + + return observable(o.parent, o, func() operator[T] { + return &mapOperator[T]{ + apply: apply, + } + }, forceSeq, bypassGather, opts...) +} + +type mapOperator[T any] struct { + apply Func[T] +} + +func (op *mapOperator[T]) next(ctx context.Context, + item Item[T], dst chan<- Item[T], operatorOptions operatorOptions[T], +) { + res, err := op.apply(ctx, item.V) + + if err != nil { + Error[T](err).SendContext(ctx, dst) + operatorOptions.stop() + + return + } + + Of(res).SendContext(ctx, dst) +} + +func (op *mapOperator[T]) err(ctx context.Context, + item Item[T], dst chan<- Item[T], operatorOptions operatorOptions[T], +) { + defaultErrorFuncOperator(ctx, item, dst, operatorOptions) +} + +func (op *mapOperator[T]) end(_ context.Context, _ chan<- Item[T]) { +} + +func (op *mapOperator[T]) gatherNext(ctx context.Context, + item Item[T], dst chan<- Item[T], _ operatorOptions[T], +) { + // switch item.V.(type) { + // case *mapOperator: + // return + // } + // TODO: check above switch not required + item.SendContext(ctx, dst) +} + type minOperator[T any] struct { comparator Comparator[T] empty bool @@ -110,7 +189,7 @@ func (op *minOperator[T]) next(_ context.Context, // op.min = item.V // } // } - if !isLimitDefined(op.min) || (op.comparator(op.min, item.V) > 0) { + if !isZero(op.min) || (op.comparator(op.min, item.V) > 0) { op.min = item.V } } diff --git a/rx/observable.go b/rx/observable.go index 4948807..97b6612 100644 --- a/rx/observable.go +++ b/rx/observable.go @@ -3,7 +3,9 @@ package rx import ( "context" "sync" + "sync/atomic" + "github.com/emirpasic/gods/trees/binaryheap" "golang.org/x/exp/constraints" ) @@ -23,7 +25,10 @@ type Observable[T any] interface { Iterable[T] Max(comparator Comparator[T], opts ...Option[T]) OptionalSingle[T] + Map(apply Func[T], opts ...Option[T]) Observable[T] Min(comparator Comparator[T], opts ...Option[T]) OptionalSingle[T] + + Run(opts ...Option[T]) Disposed } // ObservableImpl implements Observable. @@ -46,6 +51,99 @@ type operator[T any] interface { gatherNext(ctx context.Context, item Item[T], dst chan<- Item[T], options operatorOptions[T]) } +func observable[T any](parent context.Context, + iterable Iterable[T], operatorFactory func() operator[T], forceSeq, bypassGather bool, opts ...Option[T], +) Observable[T] { + option := parseOptions(opts...) + parallel, _ := option.getPool() + + if option.isEagerObservation() { + next := option.buildChannel() + ctx := option.buildContext(parent) + + if forceSeq || !parallel { + runSequential(ctx, next, iterable, operatorFactory, option, opts...) + } else { + runParallel(ctx, next, iterable.Observe(opts...), operatorFactory, bypassGather, option, opts...) + } + + return &ObservableImpl[T]{iterable: newChannelIterable(next)} + } + + if forceSeq || !parallel { + return &ObservableImpl[T]{ + iterable: newFactoryIterable(func(propagatedOptions ...Option[T]) <-chan Item[T] { + mergedOptions := make([]Option[T], 0, len(opts)+len(propagatedOptions)) + copy(mergedOptions, opts) + mergedOptions = append(mergedOptions, propagatedOptions...) + + option := parseOptions(mergedOptions...) //nolint:govet // shadow is deliberate + next := option.buildChannel() + ctx := option.buildContext(parent) + + runSequential(ctx, next, iterable, operatorFactory, option, mergedOptions...) + + return next + }), + } + } + + if serialized, f := option.isSerialized(); serialized { + firstItemIDCh := make(chan Item[T], 1) + fromCh := make(chan Item[T], 1) + obs := &ObservableImpl[T]{ + iterable: newFactoryIterable(func(propagatedOptions ...Option[T]) <-chan Item[T] { + mergedOptions := make([]Option[T], 0, len(opts)+len(propagatedOptions)) + copy(mergedOptions, opts) + mergedOptions = append(mergedOptions, propagatedOptions...) + + option := parseOptions(mergedOptions...) + next := option.buildChannel() + ctx := option.buildContext(parent) + observe := iterable.Observe(opts...) + + go func() { + select { + case <-ctx.Done(): + return + case firstItemID := <-firstItemIDCh: + if firstItemID.IsError() { + firstItemID.SendContext(ctx, fromCh) + return + } + Of(firstItemID.V).SendContext(ctx, fromCh) + // TODO: check int def here: Of(firstItemID.V.(int)).SendContext(ctx, fromCh) + runParallel(ctx, next, observe, operatorFactory, bypassGather, option, mergedOptions...) + } + }() + runFirstItem(ctx, f, firstItemIDCh, observe, next, operatorFactory, option, mergedOptions...) + + return next + }), + } + + return obs.serialize(parent, fromCh, f) + } + + return &ObservableImpl[T]{ + iterable: newFactoryIterable(func(propagatedOptions ...Option[T]) <-chan Item[T] { + mergedOptions := make([]Option[T], 0, len(opts)+len(propagatedOptions)) + copy(mergedOptions, opts) + mergedOptions = append(mergedOptions, propagatedOptions...) + + option := parseOptions(mergedOptions...) + next := option.buildChannel() + ctx := option.buildContext(parent) + + runParallel(ctx, next, iterable.Observe(mergedOptions...), + operatorFactory, bypassGather, option, mergedOptions..., + ) + + return next + }), + } +} + func single[T any](parent context.Context, iterable Iterable[T], operatorFactory func() operator[T], forceSeq, bypassGather bool, opts ...Option[T], ) Single[T] { @@ -66,7 +164,9 @@ func single[T any](parent context.Context, return &SingleImpl[T]{ iterable: newFactoryIterable(func(propagatedOptions ...Option[T]) <-chan Item[T] { - mergedOptions := append(opts, propagatedOptions...) //nolint:gocritic // foo + mergedOptions := make([]Option[T], 0, len(opts)+len(propagatedOptions)) + copy(mergedOptions, opts) + mergedOptions = append(mergedOptions, propagatedOptions...) option = parseOptions(mergedOptions...) @@ -108,7 +208,10 @@ func optionalSingle[T any](parent context.Context, return &OptionalSingleImpl[T]{ parent: ctx, iterable: newFactoryIterable(func(propagatedOptions ...Option[T]) <-chan Item[T] { - mergedOptions := append(opts, propagatedOptions...) //nolint:gocritic // foo + mergedOptions := make([]Option[T], 0, len(opts)+len(propagatedOptions)) + copy(mergedOptions, opts) + mergedOptions = append(mergedOptions, propagatedOptions...) + option = parseOptions(mergedOptions...) next := option.buildChannel() @@ -242,18 +345,6 @@ func runParallel[T any](ctx context.Context, case item, ok := <-observe: if !ok { if !bypassGather { - // TODO: - // cannot use gather (variable of type chan Item[T]) as chan<- Item[operator[T]] - // value in argument to Of(op).SendContext - // - // op = operator[T] / Item[operator[T]] - // gather = chan Item[T] - // can we send T down the channel, then apply the operator on the other - // end of the channel? - // - // or can we define another method on Item, such as SendOp ==> this looks - // like a better option. - // SendOpContext Of(op).SendOpContext(ctx, gather) } @@ -275,3 +366,134 @@ func runParallel[T any](ctx context.Context, close(gather) }() } + +func runFirstItem[T any](ctx context.Context, + f func(T) int, // TODO(check, return type int): func(T) int + notif chan Item[T], observe <-chan Item[T], next chan Item[T], + operatorFactory func() operator[T], option Option[T], opts ...Option[T], +) { + go func() { + op := operatorFactory() + stopped := false + operator := operatorOptions[T]{ + stop: func() { + if option.getErrorStrategy() == StopOnError { + stopped = true + } + }, + resetIterable: func(newIterable Iterable[T]) { + observe = newIterable.Observe(opts...) + }, + } + + loop: + for !stopped { + select { + case <-ctx.Done(): + break loop + case i, ok := <-observe: + if !ok { + break loop + } + + if i.IsError() { + op.err(ctx, i, next, operator) + i.SendContext(ctx, notif) + } else { + op.next(ctx, i, next, operator) + Num[T](f(i.V)).SendContext(ctx, notif) + // TODO(check this correct): Of[T](f(i.V)).SendContext(ctx, notif) + } + } + } + op.end(ctx, next) + }() +} + +func (o *ObservableImpl[T]) serialize(parent context.Context, + fromCh chan Item[T], identifier func(T) int, opts ...Option[T], +) Observable[T] { + option := parseOptions(opts...) + next := option.buildChannel() + + ctx := option.buildContext(parent) + minHeap := binaryheap.NewWith(func(a, b interface{}) int { + return a.(int) - b.(int) + }) + items := make(map[int]interface{}) // TODO(check interface{} is correct, T?) + + var ( + from int + counter int64 + ) + + src := o.Observe(opts...) + + go func() { + select { + case <-ctx.Done(): + close(next) + + return + case item := <-fromCh: + if item.IsError() { + item.SendContext(ctx, next) + close(next) + + return + } + + from = item.N + counter = int64(from) + + go func() { + defer close(next) + + for { + select { + case <-ctx.Done(): + return + case item, ok := <-src: + if !ok { + return + } + + if item.IsError() { + next <- item + + return + } + + id := identifier(item.V) + minHeap.Push(id) + + items[id] = item.V + + for !minHeap.Empty() { + v, _ := minHeap.Peek() + id, _ := v.(int) + + if atomic.LoadInt64(&counter) == int64(id) { + if itemValue, contains := items[id]; contains { + minHeap.Pop() + delete(items, id) + Num[T](itemValue.(int)).SendContext(ctx, next) // TODO(check me) + + counter++ + + continue + } + } + + break + } + } + } + }() + } + }() + + return &ObservableImpl[T]{ + iterable: newChannelIterable(next), + } +} diff --git a/rx/util_test.go b/rx/util_test.go index 72912d5..1bd41d5 100644 --- a/rx/util_test.go +++ b/rx/util_test.go @@ -1,10 +1,46 @@ package rx_test import ( + "context" "errors" + + "github.com/samber/lo" + "github.com/snivilised/lorax/rx" ) var ( errFoo = errors.New("foo") errBar = errors.New("bar") ) + +func channelValue[T any](ctx context.Context, items ...any) chan rx.Item[T] { + next := make(chan rx.Item[T]) + go func() { + for _, item := range items { + switch item := item.(type) { + case rx.Item[T]: + item.SendContext(ctx, next) + + case error: + rx.Error[T](item).SendContext(ctx, next) + + case T: + rx.Of(item).SendContext(ctx, next) + } + } + + close(next) + }() + + return next +} + +func convertAllItemsToAny[T any](items []T) []any { + return lo.Map(items, func(it T, _ int) any { + return it + }) +} + +func testObservable[T any](ctx context.Context, items ...any) rx.Observable[T] { + return rx.FromChannel(channelValue[T](ctx, convertAllItemsToAny(items)...)) +}