Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: stream #649

Closed
wants to merge 14 commits into from
5 changes: 5 additions & 0 deletions cli/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,16 @@ var serveCmd = &cobra.Command{
listenAddr := fmt.Sprintf("%s:%d", conf.Host, conf.Port)

options := []yomo.ZipperOption{yomo.WithZipperTracerProvider(tp)}
// auth
if _, ok := conf.Auth["type"]; ok {
if tokenString, ok := conf.Auth["token"]; ok {
options = append(options, yomo.WithAuth("token", tokenString))
}
}
// stream
if conf.Stream != nil {
options = append(options, yomo.WithZipperStreamChunkSize(conf.Stream.ChunkSize))
}

zipper, err := yomo.NewZipper(conf.Name, conf.Downstreams, options...)
if err != nil {
Expand Down
173 changes: 170 additions & 3 deletions core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"io"
"reflect"
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/quic-go/quic-go"
Expand All @@ -18,6 +20,11 @@ import (
"golang.org/x/exp/slog"
)

const (
// DefaultReconnectInterval is the default interval of reconnecting to zipper.
DefaultReconnectInterval = 1 * time.Second
)

// Client is the abstraction of a YoMo-Client. a YoMo-Client can be
// Source, Upstream Zipper or StreamFunction.
type Client struct {
Expand All @@ -36,6 +43,13 @@ type Client struct {
ctxCancel context.CancelCauseFunc

writeFrameChan chan frame.Frame

// conn quic.Connection
conn atomic.Pointer[quic.Connection]
// fs frame stream
fs atomic.Pointer[FrameStream]
// data streams
dataStreams sync.Map
}

// NewClient creates a new YoMo-Client.
Expand Down Expand Up @@ -155,10 +169,11 @@ func (c *Client) runBackground(ctx context.Context, addr string, conn quic.Conne
return
}
c.logger.Error("reconnect to zipper error", "err", cr.err)
time.Sleep(time.Second)
time.Sleep(DefaultReconnectInterval)
goto reconnect
}
fs = cr.fs
c.setConnection(&cr.conn)
go c.handleReadFrames(fs, reconnection)
}
}
Expand All @@ -183,8 +198,12 @@ connect:
c.logger.Error("can not connect to zipper", "err", result.err)
return result.err
}
c.logger = c.logger.With("local_addr", result.conn.LocalAddr().String())
c.logger.Info("connected to zipper")

c.setConnection(&result.conn)
c.setFrameStream(result.fs)

go c.runBackground(ctx, addr, result.conn, result.fs)

return nil
Expand Down Expand Up @@ -293,8 +312,11 @@ func (c *Client) handleFrame(f frame.Frame) {
c.processor(ff)
case *frame.BackflowFrame:
c.receiver(ff)
case *frame.StreamFrame:
// TODO: handle stream frame
c.logger.Debug("receive stream frame", "stream_id", ff.StreamID, "conn_id", ff.ClientID, "tag", ff.Tag)
default:
c.logger.Warn("received unexpected frame", "frame_type", f.Type().String())
c.logger.Error("received unexpected frame", "frame_type", f.Type().String())
}
}

Expand Down Expand Up @@ -356,4 +378,149 @@ type ErrAuthenticateFailed struct {
}

// Error returns a string that represents the ErrAuthenticateFailed error for the implementation of the error interface.
func (e ErrAuthenticateFailed) Error() string { return e.ReasonFromServer }
func (e ErrAuthenticateFailed) Error() string {
return e.ReasonFromServer
}

// readStream read stream from client.
func (c *Client) readStream() error {
STREAM:
select {
case <-c.ctx.Done():
return c.ctx.Err()
default:
qconn := c.Connection()
if qconn == nil {
err := errors.New("quic connection is nil")
c.logger.Error(err.Error())
return err
}
dataStream, err := qconn.AcceptStream(c.ctx)
if err != nil {
c.logger.Error("client accept stream error", "err", err)
return err
}
// close data stream
defer dataStream.Close()
c.logger.Debug("client accept stream success", "stream_id", dataStream.StreamID())
// read stream frame
fs := NewFrameStream(dataStream, y3codec.Codec(), y3codec.PacketReadWriter())
f, err := fs.ReadFrame()
if err != nil {
c.logger.Warn("failed to read data stream", "err", err)
return err
}
c.logger.Debug("client read stream frame success", "stream_id", dataStream.StreamID())
switch f.Type() {
case frame.TypeStreamFrame:
streamFrame := f.(*frame.StreamFrame)
// lookfor data stream
reader, ok := c.dataStreams.Load(streamFrame.ID)
if !ok {
c.logger.Debug(
"data stream is not found",
"stream_id", dataStream.StreamID(),
"datastream_id", streamFrame.ID,
"stream_chunk_size", streamFrame.ChunkSize,
// "datastream_id", dataStreamID,
// "received_id", streamFrame.ID,
"client_id", streamFrame.ClientID,
"tag", streamFrame.Tag,
)
goto STREAM
}
// clean data stream
defer c.dataStreams.Delete(streamFrame.ID)
// if found, pipe stream
c.logger.Debug(
"client pipe stream is ready",
"remote_addr", qconn.RemoteAddr().String(),
"datastream_id", streamFrame.ID,
"stream_id", dataStream.StreamID(),
"stream_chunk_size", streamFrame.ChunkSize,
"client_id", streamFrame.ClientID,
"tag", streamFrame.Tag,
)
// pipe stream
stream := reader.(io.Reader)
// TEST: read source stream
// buf, err := io.ReadAll(stream)
// if err != nil {
// c.logger.Error("!!!pipe stream error!!!", "err", err)
// return
// }
// bufString := string(buf)
// l := len(bufString)
// if l > 1000 {
// bufString = bufString[l-1000:]
// }
// c.logger.Info("!!!pipe stream success!!!",
// "remote_addr", qconn.RemoteAddr().String(),
// "datastream_id", streamFrame.ID,
// "stream_id", dataStream.StreamID(),
// "client_id", streamFrame.ClientID,
// "tag", streamFrame.Tag,
// "buf", bufString,
// "len", len(buf),
// )
buf := make([]byte, streamFrame.ChunkSize)
n, err := io.CopyBuffer(dataStream, stream, buf)
if err != nil {
c.logger.Error("client pipe stream error", "err", err)
return err
}
c.logger.Info("client pipe stream success",
"remote_addr", qconn.RemoteAddr().String(),
"id", streamFrame.ID,
"stream_id", dataStream.StreamID(),
"stream_chunk_size", streamFrame.ChunkSize,
"client_id", streamFrame.ClientID,
"tag", streamFrame.Tag,
"n", n,
)
default:
c.logger.Error("!!!unexpected frame!!!", "unexpected_frame_type", f.Type().String())
return errors.New("unexpected frame")
}
}
return nil
}

// PipeStream pipe a stream to server.
func (c *Client) PipeStream(dataStreamID string, stream io.Reader) error {
c.logger.Debug(fmt.Sprintf("client pipe stream[%s] -- start", dataStreamID))
c.dataStreams.Store(dataStreamID, stream)
// process all data streams
err := c.readStream()
c.logger.Debug(fmt.Sprintf("client pipe stream[%s] -- end", dataStreamID))
return err
}

// Connection returns the connection of client.
func (c *Client) Connection() quic.Connection {
conn := c.conn.Load()
if conn != nil {
return *conn
}
return nil
}

// setConnection set the connection of client.
func (c *Client) setConnection(conn *quic.Connection) {
c.conn.Store(conn)
}

// FrameStream returns the FrameStream of client.
func (c *Client) FrameStream() *FrameStream {
return c.fs.Load()
}

// setFrameStream set the FrameStream of client.
func (c *Client) setFrameStream(fs *FrameStream) {
c.fs.Store(fs)
}

// DataStreams returns the data streams of client.
func (c *Client) DataStreams() *sync.Map {
return &c.dataStreams
}
9 changes: 8 additions & 1 deletion core/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type Connection interface {
frame.ReadWriteCloser
// CloseWithError closes the connection with an error string.
CloseWithError(string) error
// QuicConnection returns raw quic connection.
QuicConnection() quic.Connection
}

type connection struct {
Expand All @@ -44,7 +46,8 @@ type connection struct {

func newConnection(
name string, id string, clientType ClientType, md metadata.M, tags []uint32,
conn quic.Connection, fs *FrameStream) *connection {
conn quic.Connection, fs *FrameStream,
) *connection {
return &connection{
name: name,
id: id,
Expand Down Expand Up @@ -96,6 +99,10 @@ func (c *connection) CloseWithError(errString string) error {
return c.conn.CloseWithError(YomoCloseErrorCode, errString)
}

func (c *connection) QuicConnection() quic.Connection {
return c.conn
}

// YomoCloseErrorCode is the error code for close quic Connection for yomo.
// If the Connection implemented by quic is closed, the quic ApplicationErrorCode is always 0x13.
const YomoCloseErrorCode = quic.ApplicationErrorCode(0x13)
64 changes: 46 additions & 18 deletions core/frame/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
// 4. BackflowFrame
// 5. RejectedFrame
// 6. GoawayFrame
// 7. StreamFrame
//
// Read frame comments to understand the role of the frame.
type Frame interface {
Expand Down Expand Up @@ -98,22 +99,47 @@ type GoawayFrame struct {
// Type returns the type of GoawayFrame.
func (f *GoawayFrame) Type() Type { return TypeGoawayFrame }

// StreamFrame is used to transmit data across DataStream.
type StreamFrame struct {
ID string
ClientID string
StreamID int64
ChunkSize int64
Tag Tag
}

// Type returns the type of StreamFrame.
func (f *StreamFrame) Type() Type { return TypeStreamFrame }

// RequestStreamFrame is used to request a stream.
type RequestStreamFrame struct {
ClientID string
Tag Tag
}

// Type returns the type of RequestStreamFrame.
func (f *RequestStreamFrame) Type() Type { return TypeRequestStreamFrame }

const (
TypeDataFrame Type = 0x3F // TypeDataFrame is the type of DataFrame.
TypeHandshakeFrame Type = 0x31 // TypeHandshakeFrame is the type of HandshakeFrame.
TypeHandshakeAckFrame Type = 0x29 // TypeHandshakeAckFrame is the type of HandshakeAckFrame.
TypeRejectedFrame Type = 0x39 // TypeRejectedFrame is the type of RejectedFrame.
TypeBackflowFrame Type = 0x2D // TypeBackflowFrame is the type of BackflowFrame.
TypeGoawayFrame Type = 0x2E // TypeGoawayFrame is the type of GoawayFrame.
TypeDataFrame Type = 0x3F // TypeDataFrame is the type of DataFrame.
TypeHandshakeFrame Type = 0x31 // TypeHandshakeFrame is the type of HandshakeFrame.
TypeHandshakeAckFrame Type = 0x29 // TypeHandshakeAckFrame is the type of HandshakeAckFrame.
TypeRejectedFrame Type = 0x39 // TypeRejectedFrame is the type of RejectedFrame.
TypeBackflowFrame Type = 0x2D // TypeBackflowFrame is the type of BackflowFrame.
TypeGoawayFrame Type = 0x2E // TypeGoawayFrame is the type of GoawayFrame.
TypeStreamFrame Type = 0x2F // TypeStreamFrame is the type of StreamFrame.
TypeRequestStreamFrame Type = 0x30 // TypeRequestStreamFrame is the type of RequestStreamFrame.
)

var frameTypeStringMap = map[Type]string{
TypeDataFrame: "DataFrame",
TypeHandshakeFrame: "HandshakeFrame",
TypeHandshakeAckFrame: "HandshakeAckFrame",
TypeRejectedFrame: "RejectedFrame",
TypeBackflowFrame: "BackflowFrame",
TypeGoawayFrame: "GoawayFrame",
TypeDataFrame: "DataFrame",
TypeHandshakeFrame: "HandshakeFrame",
TypeHandshakeAckFrame: "HandshakeAckFrame",
TypeRejectedFrame: "RejectedFrame",
TypeBackflowFrame: "BackflowFrame",
TypeGoawayFrame: "GoawayFrame",
TypeStreamFrame: "StreamFrame",
TypeRequestStreamFrame: "RequestStreamFrame",
}

// String returns a human-readable string which represents the frame type.
Expand All @@ -127,12 +153,14 @@ func (f Type) String() string {
}

var frameTypeNewFuncMap = map[Type]func() Frame{
TypeDataFrame: func() Frame { return new(DataFrame) },
TypeHandshakeFrame: func() Frame { return new(HandshakeFrame) },
TypeHandshakeAckFrame: func() Frame { return new(HandshakeAckFrame) },
TypeRejectedFrame: func() Frame { return new(RejectedFrame) },
TypeBackflowFrame: func() Frame { return new(BackflowFrame) },
TypeGoawayFrame: func() Frame { return new(GoawayFrame) },
TypeDataFrame: func() Frame { return new(DataFrame) },
TypeHandshakeFrame: func() Frame { return new(HandshakeFrame) },
TypeHandshakeAckFrame: func() Frame { return new(HandshakeAckFrame) },
TypeRejectedFrame: func() Frame { return new(RejectedFrame) },
TypeBackflowFrame: func() Frame { return new(BackflowFrame) },
TypeGoawayFrame: func() Frame { return new(GoawayFrame) },
TypeStreamFrame: func() Frame { return new(StreamFrame) },
TypeRequestStreamFrame: func() Frame { return new(RequestStreamFrame) },
}

// NewFrame creates a new frame from Type.
Expand Down
12 changes: 12 additions & 0 deletions core/frame_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ func (fs *FrameStream) Context() context.Context {
return fs.underlying.Context()
}

// ReadStream reads the underlying stream.
func (fs *FrameStream) ReadStream() (quic.Stream, error) {
return fs.underlying, nil
}

// ReadFrame reads next frame from underlying stream.
func (fs *FrameStream) ReadFrame() (frame.Frame, error) {
select {
Expand Down Expand Up @@ -87,3 +92,10 @@ func (fs *FrameStream) Close() error {

return fs.underlying.Close()
}

// Codec returns the codec of the FrameStream.
func (fs *FrameStream) Codec() frame.Codec {
fs.mu.Lock()
defer fs.mu.Unlock()
return fs.codec
}
Loading
Loading