Skip to content

Commit

Permalink
refactor(core): implement FrameConn interface using quic-go (#642)
Browse files Browse the repository at this point in the history
# Description

Implement new two frame-related interfaces below using quic-go.

```go
// Listener accepts FrameConns.
type Listener interface {
	// Accept accepts FrameConns.
	Accept(context.Context) (FrameConn, error)
	// Close closes listener,
	// If listener be closed, all FrameConn accepted will be unavailable.
	Close() error
}

// FrameConn is a connection that transmits data in frame format.
type FrameConn interface {
	// Context returns FrameConn.Context.
	// The Context can be used to manage the lifecycle of connection and
	// retrieve error using `context.Cause(conn.Context())` after calling `CloseWithError()`.
	Context() context.Context
	// WriteFrame writes a frame to connection.
	WriteFrame(frame.Frame) error
	// ReadFrame returns a channel from which frames can be received.
	ReadFrame() (frame.Frame, error)
	// RemoteAddr returns the remote address of connection.
	RemoteAddr() net.Addr
	// LocalAddr returns the local address of connection.
	LocalAddr() net.Addr
	// CloseWithError closes the connection with an error message.
	// It will be unavailable if the connection is closed. the error message should be written to the conn.Context().
	CloseWithError(string) error
}

```
  • Loading branch information
woorui authored Nov 14, 2023
1 parent 94370ab commit a628936
Show file tree
Hide file tree
Showing 12 changed files with 539 additions and 562 deletions.
91 changes: 31 additions & 60 deletions core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@ import (
"context"
"errors"
"fmt"
"io"
"reflect"
"runtime"
"time"

"github.com/quic-go/quic-go"
"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/pkg/frame-codec/y3codec"
"github.com/yomorun/yomo/pkg/id"
yquic "github.com/yomorun/yomo/pkg/listener/quic"
oteltrace "go.opentelemetry.io/otel/trace"
"golang.org/x/exp/slog"
)
Expand Down Expand Up @@ -69,35 +68,10 @@ func NewClient(appName, zipperAddr string, clientType ClientType, opts ...Client
}
}

type connectResult struct {
conn quic.Connection
fs *FrameStream
err error
}

func newConnectResult(conn quic.Connection, fs *FrameStream, err error) *connectResult {
return &connectResult{
conn: conn,
fs: fs,
err: err,
}
}

func (c *Client) connect(ctx context.Context) *connectResult {
conn, err := quic.DialAddr(ctx, c.zipperAddr, c.opts.tlsConfig, c.opts.quicConfig)
if err != nil {
return newConnectResult(conn, nil, err)
}

stream, err := conn.OpenStream()
func (c *Client) connect(ctx context.Context, addr string) (frame.Conn, error) {
conn, err := yquic.DialAddr(ctx, addr, y3codec.Codec(), y3codec.PacketReadWriter(), c.opts.tlsConfig, c.opts.quicConfig)
if err != nil {
return newConnectResult(conn, nil, err)
}

fs := NewFrameStream(stream, y3codec.Codec(), y3codec.PacketReadWriter())

if credential := c.opts.credential; credential != nil {
c.Logger.Info("use credential", "credential_name", credential.Name())
return conn, err
}

hf := &frame.HandshakeFrame{
Expand All @@ -109,79 +83,76 @@ func (c *Client) connect(ctx context.Context) *connectResult {
AuthPayload: c.opts.credential.Payload(),
}

if err := fs.WriteFrame(hf); err != nil {
return newConnectResult(conn, nil, err)
if err := conn.WriteFrame(hf); err != nil {
return conn, err
}

received, err := fs.ReadFrame()
received, err := conn.ReadFrame()
if err != nil {
return newConnectResult(conn, nil, err)
return nil, err
}

switch received.Type() {
case frame.TypeRejectedFrame:
se := ErrAuthenticateFailed{received.(*frame.RejectedFrame).Message}
return newConnectResult(conn, fs, se)
return conn, ErrAuthenticateFailed{received.(*frame.RejectedFrame).Message}
case frame.TypeHandshakeAckFrame:
return newConnectResult(conn, fs, nil)
return conn, nil
default:
se := ErrAuthenticateFailed{
return conn, ErrAuthenticateFailed{
fmt.Sprintf("authentication failed: read unexcepted frame, frame read: %s", received.Type().String()),
}
return newConnectResult(conn, fs, se)
}
}

func (c *Client) runBackground(ctx context.Context, conn quic.Connection, fs *FrameStream) {
func (c *Client) runBackground(ctx context.Context, conn frame.Conn) {
reconnection := make(chan struct{})

go c.handleReadFrames(fs, reconnection)
go c.handleReadFrames(conn, reconnection)

var err error
for {
select {
case <-c.ctx.Done():
fs.Close()
conn.CloseWithError("yomo: client closed")
return
case <-ctx.Done():
fs.Close()
conn.CloseWithError("yomo: parent context canceled")
return
case f := <-c.writeFrameChan:
if err := fs.WriteFrame(f); err != nil {
if err := conn.WriteFrame(f); err != nil {
c.handleFrameError(err, reconnection)
}
case <-reconnection:
reconnect:
cr := c.connect(ctx)
if err := cr.err; err != nil {
conn, err = c.connect(ctx, c.zipperAddr)
if err != nil {
if errors.As(err, new(ErrAuthenticateFailed)) {
return
}
c.Logger.Error("reconnect to zipper error", "err", cr.err)
c.Logger.Error("reconnect to zipper error", "err", err)
time.Sleep(time.Second)
goto reconnect
}
fs = cr.fs
go c.handleReadFrames(fs, reconnection)
go c.handleReadFrames(conn, reconnection)
}
}
}

// Connect connect client to server.
func (c *Client) Connect(ctx context.Context) error {
connect:
result := c.connect(ctx)
if result.err != nil {
fconn, err := c.connect(ctx, c.zipperAddr)
if err != nil {
if c.opts.connectUntilSucceed {
c.Logger.Error("failed to connect to zipper, trying to reconnect", "err", result.err)
c.Logger.Error("failed to connect to zipper, trying to reconnect", "err", err)
time.Sleep(time.Second)
goto connect
}
c.Logger.Error("can not connect to zipper", "err", result.err)
return result.err
c.Logger.Error("can not connect to zipper", "err", err)
return err
}
c.Logger.Info("connected to zipper")

go c.runBackground(ctx, result.conn, result.fs)
go c.runBackground(ctx, fconn)

return nil
}
Expand Down Expand Up @@ -238,8 +209,8 @@ func (c *Client) handleFrameError(err error, reconnection chan<- struct{}) {
c.errorfn(err)

// exit client program if stream has be closed.
if err == io.EOF {
c.ctxCancel(fmt.Errorf("%s: remote shutdown", c.clientType.String()))
if se := new(yquic.ErrConnClosed); errors.As(err, &se) {
c.ctxCancel(fmt.Errorf("%s: shutdown with error=%s", c.clientType.String(), se.Error()))
return
}

Expand All @@ -256,9 +227,9 @@ func (c *Client) Wait() {
<-c.ctx.Done()
}

func (c *Client) handleReadFrames(fs *FrameStream, reconnection chan struct{}) {
func (c *Client) handleReadFrames(fconn frame.Conn, reconnection chan struct{}) {
for {
f, err := fs.ReadFrame()
f, err := fconn.ReadFrame()
if err != nil {
c.handleFrameError(err, reconnection)
return
Expand Down
45 changes: 6 additions & 39 deletions core/connection.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package core

import (
"context"

"github.com/quic-go/quic-go"
"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/core/metadata"
"golang.org/x/exp/slog"
Expand Down Expand Up @@ -31,42 +28,28 @@ type Connection struct {
clientType ClientType
metadata metadata.M
observeDataTags []uint32
conn quic.Connection
fs *FrameStream
fconn frame.Conn
Logger *slog.Logger
}

func newConnection(
name string, id string, clientType ClientType, md metadata.M, tags []uint32,
conn quic.Connection, fs *FrameStream, logger *slog.Logger) *Connection {
fconn frame.Conn, logger *slog.Logger,
) *Connection {

logger = logger.With("conn_id", id, "conn_name", name)
if conn != nil {
logger.Info("new client connected", "remote_addr", conn.RemoteAddr().String(), "client_type", clientType.String())
}

return &Connection{
name: name,
id: id,
clientType: clientType,
metadata: md,
observeDataTags: tags,
conn: conn,
fs: fs,
fconn: fconn,
Logger: logger,
}
}

// Close closes the connection.
func (c *Connection) Close() error {
return c.fs.Close()
}

// Context returns the context of the connection.
func (c *Connection) Context() context.Context {
return c.fs.Context()
}

// ID returns the connection ID.
func (c *Connection) ID() string {
return c.id
Expand All @@ -87,26 +70,10 @@ func (c *Connection) ObserveDataTags() []uint32 {
return c.observeDataTags
}

// ReadFrame reads a frame from the connection.
func (c *Connection) ReadFrame() (frame.Frame, error) {
return c.fs.ReadFrame()
}

// ClientType returns the client type of the connection.
func (c *Connection) ClientType() ClientType {
return c.clientType
}

// WriteFrame writes a frame to the connection.
func (c *Connection) WriteFrame(f frame.Frame) error {
return c.fs.WriteFrame(f)
func (c *Connection) FrameConn() frame.Conn {
return c.fconn
}

// CloseWithError closes the connection with error.
func (c *Connection) CloseWithError(errString string) error {
return c.conn.CloseWithError(YomoCloseErrorCode, errString)
}

// YomoCloseErrorCode is the error code for close quic Connection for yomo.
// If the Connection implemented by quic is closed, the quic ApplicationErrorCode is always 0x13.
const YomoCloseErrorCode = quic.ApplicationErrorCode(0x13)
Loading

1 comment on commit a628936

@vercel
Copy link

@vercel vercel bot commented on a628936 Nov 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

yomo – ./

yomo-git-master-yomorun.vercel.app
yomo.vercel.app
yomo.run
yomo-yomorun.vercel.app
www.yomo.run

Please sign in to comment.