Skip to content

Commit

Permalink
ref(rx): resolve send items (#132)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Mar 27, 2024
1 parent f008dc2 commit 64113cc
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 17 deletions.
78 changes: 70 additions & 8 deletions rx/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ package rx

import (
"context"
"reflect"
"time"
)

type (
// Item is a wrapper having either a value or an error.
// Item is a wrapper having either a value, error or channel.
//
Item[T any] struct {
V T
E error
C chan<- Item[T]
}

// TimestampItem attach a timestamp to an item.
Expand All @@ -36,31 +38,91 @@ func Of[T any](v T) Item[T] {
return Item[T]{V: v}
}

// Ch creates an item from a channel
func Ch[T any](ch any) Item[T] {
if c, ok := ch.(chan<- Item[T]); ok {
return Item[T]{C: c}
}

panic("invalid ch type")
}

// Error creates an item from an error.
func Error[T any](err error) Item[T] {
return Item[T]{E: err}
}

// SendItems is an utility function that send a list of items and indicate a
// strategy on whether to close the channel once the function completes.
// This method has been derived from the original SendItems.
// (does not support channels or slice)
func SendItems[T any](ctx context.Context,
ch chan<- Item[T], strategy CloseChannelStrategy, items ...Item[T],
ch chan<- Item[T], strategy CloseChannelStrategy, items ...any,
) {
if strategy == CloseChannel {
defer close(ch)
}

sendItems(ctx, ch, items...)
send(ctx, ch, items...)
}

func sendItems[T any](ctx context.Context, ch chan<- Item[T], items ...Item[T]) {
for _, item := range items {
item.SendContext(ctx, ch)
func send[T any](ctx context.Context, ch chan<- Item[T], items ...any) {
for _, current := range items {
switch item := current.(type) {
default:
rt := reflect.TypeOf(item)

switch rt.Kind() { //nolint:exhaustive // foo
default:
switch v := item.(type) {
case error:
Error[T](v).SendContext(ctx, ch)

case Item[T]:
v.SendContext(ctx, ch)

case T:
Of(v).SendContext(ctx, ch)
}

case reflect.Chan:
inCh := reflect.ValueOf(current)

for {
v, ok := inCh.Recv()

if !ok {
return
}

vItem := v.Interface()

switch item := vItem.(type) {
default:
Ch[T](item).SendContext(ctx, ch)

case error:
Error[T](item).SendContext(ctx, ch)
}
}

case reflect.Slice:
s := reflect.ValueOf(current)

for i := 0; i < s.Len(); i++ {
send(ctx, ch, s.Index(i).Interface())
}
}

case error:
Error[T](item).SendContext(ctx, ch)
}
}
}

// IsCh checks if an item is an error.
func (i Item[T]) IsCh() bool {
return i.C != nil
}

// IsError checks if an item is an error.
func (i Item[T]) IsError() bool {
return i.E != nil
Expand Down
14 changes: 6 additions & 8 deletions rx/item_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ var _ = Describe("Item", Ordered, func() {
ch := make(chan rx.Item[int], 3)

rx.SendItems(context.Background(), ch, rx.CloseChannel,
rx.Of(1),
rx.Of(2),
rx.Of(3),
1, 2, 3,
)

rx.Assert(context.Background(),
Expand All @@ -38,9 +36,9 @@ var _ = Describe("Item", Ordered, func() {
ch := make(chan rx.Item[int], 3)

rx.SendItems(context.Background(), ch, rx.CloseChannel,
rx.Of(1),
1,
rx.Error[int](errFoo),
rx.Of(3),
3,
)

rx.Assert(context.Background(),
Expand All @@ -57,9 +55,9 @@ var _ = Describe("Item", Ordered, func() {
ch := make(chan rx.Item[int], 3)

rx.SendItems(context.Background(), ch, rx.CloseChannel,
rx.Of(1),
1,
rx.Error[int](errFoo),
rx.Of(3),
3,
)

rx.Assert(context.Background(),
Expand All @@ -77,8 +75,8 @@ var _ = Describe("Item", Ordered, func() {

ch := make(chan rx.Item[int], 1)
defer close(ch)
rx.Of[int](5).SendBlocking(ch)

rx.Of(5).SendBlocking(ch)
Expect((<-ch).V).To(Equal(5))
})
})
Expand Down
12 changes: 11 additions & 1 deletion rx/iterable-just.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package rx

import "github.com/samber/lo"

type justIterable[T any] struct {
items []T
opts []Option[T]
Expand All @@ -23,7 +25,15 @@ func (i *justIterable[T]) Observe(opts ...Option[T]) <-chan Item[T] {
items = append(items, Of(item))
}

go SendItems(option.buildContext(emptyContext), next, CloseChannel, items...)
lo.Map(items, func(it Item[T], _ int) any {
return it
})

go SendItems(option.buildContext(emptyContext), next, CloseChannel,
lo.Map(items, func(it Item[T], _ int) any {
return it
})...,
)

return next
}

0 comments on commit 64113cc

Please sign in to comment.