-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgopipe_deprecate.go
58 lines (54 loc) · 1.81 KB
/
gopipe_deprecate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package gopipe
/*
AttachTap adds a Tap to the pipeline.
*/
func (p *Pipeline) AttachTap(tapOut chan interface{}) {
p.debug("Attaching tap to pipeline")
// Make a new tail for this Pipeline
tapTail := p.tail
p.tail = make(chan interface{})
tap := make(chan interface{})
go func() {
// read from old tail and send on local tap chan as well as pipeline tail
for item := range tapTail {
tap <- item
p.tail <- item
}
p.debug("Pipeline tap source closed. Closing rest of the pipeline")
close(tap)
close(p.tail)
}()
go func() {
// routine that consumes from local tap and queues items at external tap
for item := range tap {
tapOut <- item
}
close(tapOut)
}()
}
/*
AttachSinkFanOut redirects outgoing items to the appropriate channel based on the routing function provided.
Returns a channel where unrouted items are pushed. If the routing function returns a routing key that does not have an associated
channel provided, the item will be routed to the unrouted channel. Items encountering errors on routing are also put on the unrouted
channel. Clients of the library should handle unrouted chan properly - if nothing is listening on that chan, operations will block if
an unroutable item is put on the channel (or until its buffer is full)
*/
func (p *Pipeline) AttachSinkFanOut(chanfan map[string]chan interface{}, unrouted chan interface{}, routingFunc func(interface{}) (string, error)) {
go func() {
for item := range p.tail {
key, err := routingFunc(item)
routeChan, ok := chanfan[key]
if err != nil || key == "" || !ok {
routeChan = unrouted
}
routeChan <- item
}
p.debug("Shutting down Pipeline ChanFans")
for key, fanoutChan := range chanfan {
// Close all outgoing channels
p.debug("Closing channel for routing key:", key)
close(fanoutChan)
}
close(unrouted)
}()
}