Skip to content

Commit

Permalink
refactor(core): 改为单流 (#13)
Browse files Browse the repository at this point in the history
Co-authored-by: jjwygjj <[email protected]>
  • Loading branch information
JJW and jjwygjj authored Jul 8, 2020
1 parent 890b83c commit 466835a
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 221 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ module github.com/yomorun/yomo
go 1.14

require (
github.com/10cella/yomo-txtkv-codec v1.0.5
github.com/10cella/yomo-txtkv-codec v1.0.6
github.com/lucas-clemente/quic-go v0.17.1
)
)
70 changes: 0 additions & 70 deletions internal/framework/object.go

This file was deleted.

33 changes: 5 additions & 28 deletions internal/framework/server.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,12 @@
package framework

import (
"io"

txtkv "github.com/10cella/yomo-txtkv-codec"
"github.com/yomorun/yomo/pkg/plugin"
"github.com/yomorun/yomo/pkg/util"
)

type YomoFrameworkStream struct {
Writer YomoFrameworkStreamWriter
Reader YomoFrameworkStreamReader
}

type YomoFrameworkStreamWriter struct {
Name string
io.Writer
}

type YomoFrameworkStreamReader struct {
Name string
io.Reader
}

func (w YomoFrameworkStreamWriter) Write(b []byte) (int, error) {
_, err := w.Writer.Write(b)
return len(b), err
}

func (r YomoFrameworkStreamReader) Read(b []byte) (int, error) {
return r.Reader.Read(b)
}

func NewServer(endpoint string, writer io.Writer, reader io.Reader) {
util.QuicServer(endpoint, writer, reader)
func NewServer(endpoint string, p plugin.YomoObjectPlugin) {
codec := txtkv.NewCodec(p.Observed())
util.QuicServer(endpoint, p, codec)
}
59 changes: 0 additions & 59 deletions internal/framework/stream.go

This file was deleted.

36 changes: 33 additions & 3 deletions pkg/util/quic.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,39 @@ import (

"github.com/lucas-clemente/quic-go"
quicGo "github.com/lucas-clemente/quic-go"
"github.com/yomorun/yomo/pkg/plugin"
txtkv "github.com/10cella/yomo-txtkv-codec"
)

type YomoFrameworkStreamWriter struct {
Name string
Codec *txtkv.Codec
Plugin plugin.YomoObjectPlugin
io.Writer
}

func (w YomoFrameworkStreamWriter) Write(b []byte) (int, error) {
var err error = nil
var value interface{}
var result interface{}

w.Codec.Decoder(b)

for {
value, err = w.Codec.Read()
if err != nil {
break
}

if len(value.(string)) > 0 {
result, err = w.Plugin.Handle(value)
w.Codec.Write(w.Writer, result.(string))
break
}
}
return len(b), err
}

func QuicClient(endpoint string) (quicGo.Stream, error) {
tlsConf := &tls.Config{
InsecureSkipVerify: true, // nolint
Expand All @@ -42,7 +73,7 @@ func QuicClient(endpoint string) (quicGo.Stream, error) {
return stream, nil
}

func QuicServer(endpoint string, w io.Writer, r io.Reader) {
func QuicServer(endpoint string, plugin plugin.YomoObjectPlugin, codec *txtkv.Codec) {
listener, err := quicGo.ListenAddr(endpoint, GenerateTLSConfig(endpoint), nil)
if err != nil {
panic(err)
Expand All @@ -58,8 +89,7 @@ func QuicServer(endpoint string, w io.Writer, r io.Reader) {
panic(err)
}

go io.Copy(w, stream) // nolint
go io.Copy(stream, r) // nolint
go io.Copy(YomoFrameworkStreamWriter{plugin.Name(), codec, plugin, stream}, stream) // nolint
}

}
Expand Down
62 changes: 3 additions & 59 deletions pkg/yomo/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"os"
"time"

txtkv "github.com/10cella/yomo-txtkv-codec"

"github.com/yomorun/yomo/pkg/plugin"
"github.com/yomorun/yomo/pkg/util"

Expand All @@ -21,53 +19,17 @@ func Run(plugin plugin.YomoObjectPlugin, endpoint string) {
log.SetPrefix(fmt.Sprintf("[%s:%v]", plugin.Name(), os.Getpid()))
log.Printf("plugin service start... [%s]", endpoint)

// binding plugin
pluginStream := framework.NewObjectPlugin(plugin)

// decoding
deStream1 := txtkv.NewObjectDecoder(plugin.Observed())

//过滤
deStream2 := txtkv.NewFilterDecoder(plugin.Observed())

// encoding
enStream := txtkv.NewObjectEncoder(plugin.Observed())

deStream := io.MultiWriter(deStream1.Writer, deStream2.Writer)

go func() { io.CopyN(pluginStream.Writer, deStream1.Reader, 1024) }() // nolint
go func() { io.CopyN(enStream.Writer, pluginStream.Reader, 1024) }() // nolint
go func() { io.CopyN(enStream.Writer, deStream2.Reader, 1024) }() // nolint

// activation service
framework.NewServer(endpoint, deStream, enStream.Reader)
framework.NewServer(endpoint, plugin)
}

// RunStream run a server for YomoStreamPlugin
func RunStream(plugin plugin.YomoStreamPlugin, endpoint string) {
log.SetPrefix(fmt.Sprintf("[%s:%v]", plugin.Name(), os.Getpid()))
log.Printf("plugin service start... [%s]", endpoint)

// binding plugin
pluginStream := framework.NewStreamPlugin(plugin)

// decoding
deStream1 := txtkv.NewStreamDecoder(plugin.Observed())

//过滤
deStream2 := txtkv.NewFilterDecoder(plugin.Observed())

// encoding
enStream := txtkv.NewStreamEncoder(plugin.Observed())

deStream := io.MultiWriter(deStream1.Writer, deStream2.Writer)

// activation service
framework.NewServer(endpoint, deStream, enStream.Reader)

go func() { io.CopyN(pluginStream.Writer, deStream1.Reader, 1024) }() // nolint
go func() { io.CopyN(enStream.Writer, pluginStream.Reader, 1024) }() // nolint
go func() { io.CopyN(enStream.Writer, deStream2.Reader, 1024) }() // nolint
panic("not impl")
}

// RunDev makes test plugin connect to a demo YoMo server
Expand All @@ -77,26 +39,8 @@ func RunDev(plugin plugin.YomoObjectPlugin, endpoint string) {
log.SetPrefix(fmt.Sprintf("[%s:%v]", plugin.Name(), os.Getpid()))
log.Printf("plugin service start... [%s]", endpoint)

// binding plugin
pluginStream := framework.NewObjectPlugin(plugin)

// decoding
deStream1 := txtkv.NewObjectDecoder(plugin.Observed())

//过滤
deStream2 := txtkv.NewFilterDecoder(plugin.Observed())

// encoding
enStream := txtkv.NewObjectEncoder(plugin.Observed())

deStream := io.MultiWriter(deStream1.Writer, deStream2.Writer)

go func() { io.CopyN(pluginStream.Writer, deStream1.Reader, 1024) }() // nolint
go func() { io.CopyN(enStream.Writer, pluginStream.Reader, 1024) }() // nolint
go func() { io.CopyN(enStream.Writer, deStream2.Reader, 1024) }() // nolint

// activation service
framework.NewServer(endpoint, deStream, enStream.Reader)
framework.NewServer(endpoint, plugin)
}()

yomoEchoClient, err := util.QuicClient("echo.cella.fun:11521")
Expand Down

0 comments on commit 466835a

Please sign in to comment.