Skip to content

Commit

Permalink
ref(rx): rationalise item aux fields (#142)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Apr 5, 2024
1 parent c56df52 commit 80bed8a
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 31 deletions.
30 changes: 30 additions & 0 deletions enums/item.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package enums

type ItemDiscriminator uint32

const (
// ItemDiscNative enum value that represents the native type T.
//
ItemDiscNative ItemDiscriminator = 0

// ItemDiscError enum value that represents an error
//
ItemDiscError ItemDiscriminator = 1 << (iota - 1)

// ItemDiscTick enum value that represents a Tick value.
//
ItemDiscTick

// ItemDiscTick enum value that represents a TickValue value.
//
ItemDiscTickValue

// ItemDiscNumeric enum value that represents a general numeric value
// typically used by range operations that require a number.
//
ItemDiscNumeric

// ItemDiscChan enum value that represents a channel of T
//
ItemDiscChan
)
4 changes: 2 additions & 2 deletions rx/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

// Amb takes several Observables, emit all of the items from only the first of these Observables
// to emit an item or notification.
// to emit an item or notification. (What the hell is an Amb, WTF)
func Amb[T any](observables []Observable[T], opts ...Option[T]) Observable[T] {
option := parseOptions(opts...)
ctx := option.buildContext(emptyContext)
Expand Down Expand Up @@ -242,7 +242,7 @@ func Interval[T any](interval Duration, opts ...Option[T]) Observable[T] {
for {
select {
case <-time.After(interval.duration()):
if !Tv[T](i).SendContext(ctx, next) {
if !TV[T](i).SendContext(ctx, next) {
return
}

Expand Down
69 changes: 40 additions & 29 deletions rx/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,20 @@ import (
"context"
"reflect"
"time"

"github.com/snivilised/lorax/enums"
)

type (
// Item is a wrapper having either a value, error or channel.
//
Item[T any] struct {
V T
E error
Disc enums.ItemDiscriminator
V T
E error
//
C chan<- Item[T]
tick bool
tickV bool
numeric bool
TV int
N int
C chan<- Item[T]
N int
}

// TimestampItem attach a timestamp to an item.
Expand All @@ -41,45 +40,56 @@ const (

// Of creates an item from a value.
func Of[T any](v T) Item[T] {
return Item[T]{V: v}
return Item[T]{
V: v,
Disc: enums.ItemDiscNative,
}
}

// Ch creates an item from a channel
func Ch[T any](ch any) Item[T] {
if c, ok := ch.(chan<- Item[T]); ok {
return Item[T]{C: c}
return Item[T]{
C: c,
Disc: enums.ItemDiscChan,
}
}

panic("temp: invalid ch type")
}

// Error creates an item from an error.
func Error[T any](err error) Item[T] {
return Item[T]{
E: err,
Disc: enums.ItemDiscError,
}
}

// Tick creates a type safe tick instance
func Tick[T any]() Item[T] {
return Item[T]{tick: true}
return Item[T]{
Disc: enums.ItemDiscTick,
}
}

// Tv creates a type safe tick value instance
func Tv[T any](tv int) Item[T] {
// TV creates a type safe tick value instance
func TV[T any](tv int) Item[T] {
return Item[T]{
TV: tv,
tickV: true,
N: tv,
Disc: enums.ItemDiscTickValue,
}
}

// Num creates a type safe tick value instance
func Num[T any](n int) Item[T] {
return Item[T]{
N: n,
numeric: true,
N: n,
Disc: enums.ItemDiscNumeric,
}
}

// Error creates an item from an error.
func Error[T any](err error) Item[T] {
return Item[T]{E: err}
}

// SendItems is an utility function that send a list of items and indicate a
// SendItems is a utility function that sends a list of items and indicates a
// strategy on whether to close the channel once the function completes.
func SendItems[T any](ctx context.Context,
ch chan<- Item[T], strategy CloseChannelStrategy, items ...any,
Expand All @@ -91,11 +101,12 @@ func SendItems[T any](ctx context.Context,
send(ctx, ch, items...)
}

// can we revert items to be Item[T]?
func send[T any](ctx context.Context, ch chan<- Item[T], items ...any) {
for _, current := range items {
switch item := current.(type) {
default:
rt := reflect.TypeOf(item)
rt := reflect.TypeOf(item) // !! WE DONT'T NEED REFLECTION

switch rt.Kind() { //nolint:exhaustive // foo
default:
Expand Down Expand Up @@ -147,27 +158,27 @@ func send[T any](ctx context.Context, ch chan<- Item[T], items ...any) {

// IsCh checks if an item is an error.
func (i Item[T]) IsCh() bool {
return i.C != nil
return (i.Disc & enums.ItemDiscChan) > 0
}

// IsError checks if an item is an error.
func (i Item[T]) IsError() bool {
return i.E != nil
return (i.Disc & enums.ItemDiscError) > 0
}

// IsTick checks if an item is a tick instance.
func (i Item[T]) IsTick() bool {
return i.tick
return (i.Disc & enums.ItemDiscTick) > 0
}

// IsTickValue checks if an item is a tick instance.
func (i Item[T]) IsTickValue() bool {
return i.tickV
return (i.Disc & enums.ItemDiscTickValue) > 0
}

// IsTickValue checks if an item is a tick instance.
func (i Item[T]) IsNumeric() bool {
return i.numeric
return (i.Disc & enums.ItemDiscNumeric) > 0
}

// SendBlocking sends an item and blocks until it is sent.
Expand Down
4 changes: 4 additions & 0 deletions rx/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func convertAllItemsToAny[T any](values []T) []any {
}

func testObservable[T any](ctx context.Context, items ...any) rx.Observable[T] {
// items is a collection of any because we need the ability to send a stream
// of events that may include errors; 1, 2, err, 4, ..., without enforcing
// that the client should manufacture Item[T]s; Of(1), Of(2), Error(err), Of(4).
//
return rx.FromChannel(channelValue[T](ctx, convertAllItemsToAny(items)...))
}

Expand Down

0 comments on commit 80bed8a

Please sign in to comment.