-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsubject.go
94 lines (81 loc) · 1.67 KB
/
subject.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package rxgo
import (
"sync"
)
type Subject interface {
Observable
Observer
}
type subject struct {
mtx sync.Mutex
index int
observers map[int]Observer
}
func (s *subject) Next(v Value) {
s.mtx.Lock()
defer s.mtx.Unlock()
for _, observer := range s.observers {
observer.Next(v)
}
}
func (s *subject) Err(e error) {
s.mtx.Lock()
defer s.mtx.Unlock()
for _, observer := range s.observers {
observer.Err(e)
}
}
func (s *subject) Complete() {
s.mtx.Lock()
defer s.mtx.Unlock()
for _, observer := range s.observers {
observer.Complete()
}
}
func (s *subject) Pipe(fns ...OperatorFunc) Observable {
return Pipe(fns...)(s)
}
func NewSubject() Subject {
return &subject{observers: make(map[int]Observer)}
}
func (s *subject) Subscribe(obs Observer) Subscription {
s.mtx.Lock()
index := s.index
s.index++
s.observers[index] = obs
s.mtx.Unlock()
return NewSubscription(func() {
s.mtx.Lock()
defer s.mtx.Unlock()
delete(s.observers, index)
})
}
type replaySubject struct {
mtx sync.Mutex
buffer []Value
Subject
}
func NewReplaySubject(buffer int) Subject {
return &replaySubject{buffer: make([]Value, buffer), Subject: NewSubject()}
}
func (r *replaySubject) Next(v Value) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.buffer = append(r.buffer[1:], v)
r.Subject.Next(v)
}
func (r *replaySubject) Subscribe(obs Observer) Subscription {
return Create(
func(v ValueChan, e ErrChan, c CompleteChan) TeardownFunc {
r.mtx.Lock()
for _, val := range r.buffer {
v.Next(val)
}
r.mtx.Unlock()
println("sub")
return r.Subject.Subscribe(
OnNext(v.Next).OnErr(e.Error).OnComplete(c.Complete),
).Unsubscribe
},
).Subscribe(obs)
}