Skip to content

Commit

Permalink
fix(core): fix One quic connection corresponds to one rxstream (#109)
Browse files Browse the repository at this point in the history
Co-authored-by: jjwygjj <[email protected]>
  • Loading branch information
JJW and jjwygjj authored Jan 21, 2021
1 parent 3857121 commit ec61a87
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 40 deletions.
2 changes: 1 addition & 1 deletion internal/cmd/dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s quicDevHandler) Listen() error {
}

func (s quicDevHandler) Read(st quic.Stream) error {
stream := dispatcher.Dispatcher(s.serverlessHandle, rx.FromReader(st))
stream := dispatcher.Dispatcher(s.serverlessHandle, rx.FromReaderWithY3(st))

go func() {
for customer := range stream.Observe() {
Expand Down
2 changes: 1 addition & 1 deletion internal/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (s quicServerHandler) Listen() error {
}

func (s quicServerHandler) Read(st quic.Stream) error {
stream := dispatcher.Dispatcher(s.serverlessHandle, rx.FromReader(st))
stream := dispatcher.Dispatcher(s.serverlessHandle, rx.FromReaderWithY3(st))

y3codec := y3.NewCodec(0x10)

Expand Down
24 changes: 6 additions & 18 deletions internal/cmd/wf/dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,14 @@ type quicDevHandler struct {

func (s *quicDevHandler) Listen() error {
err := mocker.EmitMockDataFromCloud(s.serverAddr)
return err
}

func (s *quicDevHandler) Read(st quic.Stream) error {

flows, sinks := workflow.Build(s.serverlessConfig)

stream := dispatcher.DispatcherWithFunc(flows, s.mergeChan)
stream := dispatcher.DispatcherWithFunc(flows, st)

go func() {
for customer := range stream.Observe() {
Expand All @@ -90,23 +95,6 @@ func (s *quicDevHandler) Listen() error {
}
}
}()
return err
}

func (s *quicDevHandler) Read(st quic.Stream) error {

go func() {
for {
buf := make([]byte, 3*1024)
n, err := st.Read(buf)
if err != nil {
break
} else {
value := buf[:n]
s.mergeChan <- value
}
}
}()

return nil
}
24 changes: 6 additions & 18 deletions internal/cmd/wf/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,14 @@ type quicHandler struct {
}

func (s *quicHandler) Listen() error {

return nil
}

func (s *quicHandler) Read(st quic.Stream) error {
flows, sinks := workflow.Build(s.serverlessConfig)

stream := dispatcher.DispatcherWithFunc(flows, s.mergeChan)
stream := dispatcher.DispatcherWithFunc(flows, st)

go func() {
for customer := range stream.Observe() {
Expand All @@ -85,23 +90,6 @@ func (s *quicHandler) Listen() error {
}
}
}()
return nil
}

func (s *quicHandler) Read(st quic.Stream) error {
go func() {
for {
buf := make([]byte, 3*1024)
n, err := st.Read(buf)

if err != nil {
break
} else {
value := buf[:n]
s.mergeChan <- value
}
}
}()

return nil
}
4 changes: 2 additions & 2 deletions internal/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func AutoDispatcher(appPath string, rxstream rx.RxStream) (rx.RxStream, error) {
return Dispatcher(handler, rxstream), nil
}

func DispatcherWithFunc(flows []func() (io.ReadWriter, func()), reader chan []byte) rx.RxStream {
stream := rx.FromChannel(reader)
func DispatcherWithFunc(flows []func() (io.ReadWriter, func()), reader io.Reader) rx.RxStream {
stream := rx.FromReader(reader)

for _, flow := range flows {
stream = stream.MergeReadWriterWithFunc(flow)
Expand Down
22 changes: 22 additions & 0 deletions pkg/rx/rxstream_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,28 @@ func FromChannel(channel chan []byte) RxStream {
}

func FromReader(reader io.Reader) RxStream {
next := make(chan rxgo.Item)

go func() {
defer close(next)

for {
buf := make([]byte, 3*1024)
n, err := reader.Read(buf)

if err != nil {
break
} else {
value := buf[:n]
next <- Of(value)
}
}
}()

return ConvertObservable(rxgo.FromChannel(next))
}

func FromReaderWithY3(reader io.Reader) RxStream {
source := y3.FromStream(reader)
return ConvertObservableWithY3(source)
}
Expand Down

0 comments on commit ec61a87

Please sign in to comment.