Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core): implement FrameConn interface using quic-go #642

Merged
merged 20 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 31 additions & 60 deletions core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@
"context"
"errors"
"fmt"
"io"
"reflect"
"runtime"
"time"

"github.com/quic-go/quic-go"
"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/pkg/frame-codec/y3codec"
"github.com/yomorun/yomo/pkg/id"
yquic "github.com/yomorun/yomo/pkg/listener/quic"
oteltrace "go.opentelemetry.io/otel/trace"
"golang.org/x/exp/slog"
)
Expand Down Expand Up @@ -69,35 +68,10 @@
}
}

type connectResult struct {
conn quic.Connection
fs *FrameStream
err error
}

func newConnectResult(conn quic.Connection, fs *FrameStream, err error) *connectResult {
return &connectResult{
conn: conn,
fs: fs,
err: err,
}
}

func (c *Client) connect(ctx context.Context) *connectResult {
conn, err := quic.DialAddr(ctx, c.zipperAddr, c.opts.tlsConfig, c.opts.quicConfig)
if err != nil {
return newConnectResult(conn, nil, err)
}

stream, err := conn.OpenStream()
func (c *Client) connect(ctx context.Context, addr string) (frame.Conn, error) {
conn, err := yquic.DialAddr(ctx, addr, y3codec.Codec(), y3codec.PacketReadWriter(), c.opts.tlsConfig, c.opts.quicConfig)
if err != nil {
return newConnectResult(conn, nil, err)
}

fs := NewFrameStream(stream, y3codec.Codec(), y3codec.PacketReadWriter())

if credential := c.opts.credential; credential != nil {
c.Logger.Info("use credential", "credential_name", credential.Name())
return conn, err
}

hf := &frame.HandshakeFrame{
Expand All @@ -109,79 +83,76 @@
AuthPayload: c.opts.credential.Payload(),
}

if err := fs.WriteFrame(hf); err != nil {
return newConnectResult(conn, nil, err)
if err := conn.WriteFrame(hf); err != nil {
return conn, err

Check warning on line 87 in core/client.go

View check run for this annotation

Codecov / codecov/patch

core/client.go#L87

Added line #L87 was not covered by tests
}

received, err := fs.ReadFrame()
received, err := conn.ReadFrame()
if err != nil {
return newConnectResult(conn, nil, err)
return nil, err

Check warning on line 92 in core/client.go

View check run for this annotation

Codecov / codecov/patch

core/client.go#L92

Added line #L92 was not covered by tests
}

switch received.Type() {
case frame.TypeRejectedFrame:
se := ErrAuthenticateFailed{received.(*frame.RejectedFrame).Message}
return newConnectResult(conn, fs, se)
return conn, ErrAuthenticateFailed{received.(*frame.RejectedFrame).Message}
case frame.TypeHandshakeAckFrame:
return newConnectResult(conn, fs, nil)
return conn, nil
default:
se := ErrAuthenticateFailed{
return conn, ErrAuthenticateFailed{

Check warning on line 100 in core/client.go

View check run for this annotation

Codecov / codecov/patch

core/client.go#L100

Added line #L100 was not covered by tests
fmt.Sprintf("authentication failed: read unexcepted frame, frame read: %s", received.Type().String()),
}
return newConnectResult(conn, fs, se)
}
}

func (c *Client) runBackground(ctx context.Context, conn quic.Connection, fs *FrameStream) {
func (c *Client) runBackground(ctx context.Context, conn frame.Conn) {
reconnection := make(chan struct{})

go c.handleReadFrames(fs, reconnection)
go c.handleReadFrames(conn, reconnection)

var err error
for {
select {
case <-c.ctx.Done():
fs.Close()
conn.CloseWithError("yomo: client closed")
return
case <-ctx.Done():
fs.Close()
conn.CloseWithError("yomo: parent context canceled")

Check warning on line 118 in core/client.go

View check run for this annotation

Codecov / codecov/patch

core/client.go#L118

Added line #L118 was not covered by tests
return
case f := <-c.writeFrameChan:
if err := fs.WriteFrame(f); err != nil {
if err := conn.WriteFrame(f); err != nil {
c.handleFrameError(err, reconnection)
}
case <-reconnection:
reconnect:
cr := c.connect(ctx)
if err := cr.err; err != nil {
conn, err = c.connect(ctx, c.zipperAddr)
if err != nil {

Check warning on line 127 in core/client.go

View check run for this annotation

Codecov / codecov/patch

core/client.go#L126-L127

Added lines #L126 - L127 were not covered by tests
if errors.As(err, new(ErrAuthenticateFailed)) {
return
}
c.Logger.Error("reconnect to zipper error", "err", cr.err)
c.Logger.Error("reconnect to zipper error", "err", err)

Check warning on line 131 in core/client.go

View check run for this annotation

Codecov / codecov/patch

core/client.go#L131

Added line #L131 was not covered by tests
time.Sleep(time.Second)
goto reconnect
}
fs = cr.fs
go c.handleReadFrames(fs, reconnection)
go c.handleReadFrames(conn, reconnection)

Check warning on line 135 in core/client.go

View check run for this annotation

Codecov / codecov/patch

core/client.go#L135

Added line #L135 was not covered by tests
}
}
}

// Connect connect client to server.
func (c *Client) Connect(ctx context.Context) error {
connect:
result := c.connect(ctx)
if result.err != nil {
fconn, err := c.connect(ctx, c.zipperAddr)
if err != nil {
if c.opts.connectUntilSucceed {
c.Logger.Error("failed to connect to zipper, trying to reconnect", "err", result.err)
c.Logger.Error("failed to connect to zipper, trying to reconnect", "err", err)

Check warning on line 146 in core/client.go

View check run for this annotation

Codecov / codecov/patch

core/client.go#L146

Added line #L146 was not covered by tests
time.Sleep(time.Second)
goto connect
}
c.Logger.Error("can not connect to zipper", "err", result.err)
return result.err
c.Logger.Error("can not connect to zipper", "err", err)
return err
}
c.Logger.Info("connected to zipper")

go c.runBackground(ctx, result.conn, result.fs)
go c.runBackground(ctx, fconn)

return nil
}
Expand Down Expand Up @@ -238,8 +209,8 @@
c.errorfn(err)

// exit client program if stream has be closed.
if err == io.EOF {
c.ctxCancel(fmt.Errorf("%s: remote shutdown", c.clientType.String()))
if se := new(yquic.ErrConnClosed); errors.As(err, &se) {
venjiang marked this conversation as resolved.
Show resolved Hide resolved
c.ctxCancel(fmt.Errorf("%s: shutdown with error=%s", c.clientType.String(), se.Error()))
return
}

Expand All @@ -256,9 +227,9 @@
<-c.ctx.Done()
}

func (c *Client) handleReadFrames(fs *FrameStream, reconnection chan struct{}) {
func (c *Client) handleReadFrames(fconn frame.Conn, reconnection chan struct{}) {
for {
f, err := fs.ReadFrame()
f, err := fconn.ReadFrame()
if err != nil {
c.handleFrameError(err, reconnection)
return
Expand Down
45 changes: 6 additions & 39 deletions core/connection.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package core

import (
"context"

"github.com/quic-go/quic-go"
"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/core/metadata"
"golang.org/x/exp/slog"
Expand Down Expand Up @@ -31,42 +28,28 @@ type Connection struct {
clientType ClientType
metadata metadata.M
observeDataTags []uint32
conn quic.Connection
fs *FrameStream
fconn frame.Conn
Logger *slog.Logger
}

func newConnection(
name string, id string, clientType ClientType, md metadata.M, tags []uint32,
conn quic.Connection, fs *FrameStream, logger *slog.Logger) *Connection {
fconn frame.Conn, logger *slog.Logger,
) *Connection {

logger = logger.With("conn_id", id, "conn_name", name)
if conn != nil {
logger.Info("new client connected", "remote_addr", conn.RemoteAddr().String(), "client_type", clientType.String())
}

return &Connection{
name: name,
id: id,
clientType: clientType,
metadata: md,
observeDataTags: tags,
conn: conn,
fs: fs,
fconn: fconn,
Logger: logger,
}
}

// Close closes the connection.
func (c *Connection) Close() error {
return c.fs.Close()
}

// Context returns the context of the connection.
func (c *Connection) Context() context.Context {
return c.fs.Context()
}

// ID returns the connection ID.
func (c *Connection) ID() string {
return c.id
Expand All @@ -87,26 +70,10 @@ func (c *Connection) ObserveDataTags() []uint32 {
return c.observeDataTags
}

// ReadFrame reads a frame from the connection.
func (c *Connection) ReadFrame() (frame.Frame, error) {
return c.fs.ReadFrame()
}

// ClientType returns the client type of the connection.
func (c *Connection) ClientType() ClientType {
return c.clientType
}

// WriteFrame writes a frame to the connection.
func (c *Connection) WriteFrame(f frame.Frame) error {
return c.fs.WriteFrame(f)
func (c *Connection) FrameConn() frame.Conn {
return c.fconn
}

// CloseWithError closes the connection with error.
func (c *Connection) CloseWithError(errString string) error {
return c.conn.CloseWithError(YomoCloseErrorCode, errString)
}

// YomoCloseErrorCode is the error code for close quic Connection for yomo.
// If the Connection implemented by quic is closed, the quic ApplicationErrorCode is always 0x13.
const YomoCloseErrorCode = quic.ApplicationErrorCode(0x13)
Loading