diff --git a/README.md b/README.md index 00d748e..d6ab716 100644 --- a/README.md +++ b/README.md @@ -15,17 +15,21 @@ with our highly available, performant and low latency edge proxies network neare ## Features -- [x] ProtoBuf/gRPC service API definitions -- [x] Client side service tooling in Go -- [x] Server side service tooling in Go -- [ ] Minekube Connect plugin support for: - - [ ] [Gate](https://github.com/minekube/gate) - - [ ] Spigot/PaperMC - - [ ] Velocity - - [ ] BungeeCord +- [x] ProtoBuf typed +- [x] Streaming transport protocols + - [x] WebSocket support + - equally or more efficient than gRPC + - better web proxy support: cloudflared, nginx, ... + - [ ] gRPC support (improved developer experience) + - No immediate support planned, [see](internal/grpc) +- [x] Minekube Connect plugin support for: + - [x] [Gate](https://github.com/minekube/gate) + - [x] [Spigot/PaperMC](https://github.com/minekube/connect-java) + - [x] [Velocity](https://github.com/minekube/connect-java) + - [x] [BungeeCord](https://github.com/minekube/connect-java) - [ ] Sponge - [ ] Minestom -- [ ] Client side service tooling in Java -- [x] Provide test tunnel service implementation in Go -- [ ] Provide test tunnel service implementation in Java -- [ ] Easy documentation website +- [x] Client side service tooling in Go +- [x] Server side service tooling in Go +- [x] Client- & service-side tests implementation in Go +- [ ] Awesome documentation website diff --git a/aliases.go b/aliases.go index 7b6a071..2a0fed3 100644 --- a/aliases.go +++ b/aliases.go @@ -1,8 +1,9 @@ package connect import ( - pb "go.minekube.com/connect/internal/api/minekube/connect/v1alpha1" "google.golang.org/genproto/googleapis/rpc/status" + + pb "go.minekube.com/connect/internal/api/minekube/connect/v1alpha1" ) // Type alias to better support updating versions. @@ -21,44 +22,6 @@ type ( GameProfile = pb.GameProfile GameProfileProperty = pb.GameProfileProperty - WatchServiceClient = pb.WatchServiceClient - WatchServiceServer = pb.WatchServiceServer - WatchService_WatchClient = pb.WatchService_WatchClient - WatchService_WatchServer = pb.WatchService_WatchServer - WatchRequest = pb.WatchRequest - WatchResponse = pb.WatchResponse - - TunnelServiceClient = pb.TunnelServiceClient - TunnelServiceServer = pb.TunnelServiceServer - TunnelService_TunnelClient = pb.TunnelService_TunnelClient - TunnelService_TunnelServer = pb.TunnelService_TunnelServer - TunnelRequest = pb.TunnelRequest - TunnelResponse = pb.TunnelResponse - - UnimplementedWatchServiceServer struct { - pb.UnimplementedWatchServiceServer - } - UnimplementedTunnelServiceServer struct { - pb.UnimplementedTunnelServiceServer - } -) - -// Alias to better support updating versions. -// See the referenced type for documentation. -// -// Other go files should only ever use the provided -// alias type and never import a specific proto version. -var ( - NewWatchServiceClient = pb.NewWatchServiceClient - NewTunnelServiceClient = pb.NewTunnelServiceClient - - RegisterWatchServiceServer = pb.RegisterWatchServiceServer - RegisterTunnelServiceServer = pb.RegisterTunnelServiceServer -) - -// Well-known metadata keys -const ( - MDPrefix = "connect-" // The prefix of metadata keys. - MDSession = MDPrefix + "session" // Metadata key specifying the session id when calling the TunnelService. - MDEndpoint = MDPrefix + "endpoint" // Metadata key specifying the endpoint when calling the WatchService. + WatchRequest = pb.WatchRequest + WatchResponse = pb.WatchResponse ) diff --git a/api/buf.lock b/api/buf.lock index 0c07bad..3cc07ce 100644 --- a/api/buf.lock +++ b/api/buf.lock @@ -5,6 +5,6 @@ deps: owner: googleapis repository: googleapis branch: main - commit: 0b64ae0918a6421b8deeffb20bd7c58a - digest: b1-rcRLiZYvmis9EDBO1iRVlWakV5U988o2oNLyldwzo6Q= - create_time: 2022-02-24T15:04:40.484238Z + commit: 6f475a54b1614e6cafd96fd376b1738d + digest: b1-a7t276NpOOZlwSgobbKDlRuKe1lyAcZHdiangmLShGg= + create_time: 2022-03-02T15:12:37.274824Z diff --git a/connect.go b/connect.go new file mode 100644 index 0000000..dffcd22 --- /dev/null +++ b/connect.go @@ -0,0 +1,106 @@ +package connect + +import ( + "context" + "net" + + "go.minekube.com/connect/internal/ctxkey" +) + +// Well-known headers / metadata keys +const ( + MDPrefix = "connect-" // The prefix of Connect metadata keys. + MDSession = MDPrefix + "session" // Metadata key specifying the session id for a Tunnel. + MDEndpoint = MDPrefix + "endpoint" // Metadata key specifying the watching Endpoint. +) + +// Tunnel represents an outbound only tunnel initiated by +// an Endpoint for a specific SessionProposal. +type Tunnel net.Conn + +// Tunneler creates a Tunnel. +type Tunneler interface { + Tunnel(context.Context) (Tunnel, error) +} + +// Watcher registers the calling endpoint and watches for sessions +// proposed by the WatchService. To stop watching cancel the context. +// If ReceiveProposal returns a non-nil non-EOF error Watch returns it. +type Watcher interface { + Watch(context.Context, ReceiveProposal) error +} + +// ReceiveProposal is called when Watcher receives a SessionProposal. +type ReceiveProposal func(proposal SessionProposal) error + +// SessionProposal specifies an incoming session proposal. +// Use the Session to create the connection tunnel or reject the session with an optional reason. +type SessionProposal interface { + Session() *Session // The session proposed to connect to the Endpoint. + Reject(context.Context, *RejectionReason) error // Rejects the session proposal with an optional reason. +} + +// Endpoint is an endpoint that listens for +// sessions to either reject them or establish +// a tunnel for receiving the connection. +type Endpoint interface { + Watcher + Tunneler +} + +// TunnelListener is a network listener for tunnel connections. +type TunnelListener interface { + AcceptTunnel(context.Context, Tunnel) error +} + +// EndpointListener is a network listener for endpoint watches. +type EndpointListener interface { + AcceptEndpoint(context.Context, EndpointWatch) error +} + +// EndpointWatch is a watching Endpoint. +type EndpointWatch interface { + // Propose proposes a session to the Endpoint. + // The Endpoint either rejects the proposal or initiates + // a Tunnel to receive the session connection. + Propose(context.Context, *Session) error + Rejections() <-chan *SessionRejection // Rejections receives rejected session proposals from the Endpoint. +} + +// Listener listens for watching endpoints and tunnel connections from endpoints. +type Listener interface { + EndpointListener + TunnelListener +} + +// TunnelOptions are options for Tunneler and TunnelListener. +// Use WithTunnelOptions to propagate TunnelOptions in context. +type TunnelOptions struct { + // LocalAddr fakes the local address of the Tunnel when specified. + // + // If this TunnelOptions destine to Tunneler + // it is recommended to use the Endpoint address/name. + // + // If this TunnelOptions destine to TunnelListener + // it is recommended to use the underlying network listener address. + LocalAddr net.Addr + // RemoteAddr fakes the remote address of the Tunnel when specified. + // + // If this TunnelOptions destine to Tunneler + // it is recommended to use the underlying connection remote address. + // + // If this TunnelOptions destine to TunnelListener + // it is recommended to use the Endpoint address/name. + RemoteAddr net.Addr // It is recommended to use the player address. +} + +// WithTunnelOptions stores TunnelOptions in a context. +func WithTunnelOptions(ctx context.Context, opts TunnelOptions) context.Context { + return context.WithValue(ctx, ctxkey.TunnelOptions{}, opts) +} + +// Addr is an address in the "connect" network. +type Addr string + +func (a Addr) String() string { return string(a) } +func (a Addr) Network() string { return "connect" } diff --git a/go.mod b/go.mod index d4007b1..de01bcd 100644 --- a/go.mod +++ b/go.mod @@ -7,14 +7,25 @@ require ( google.golang.org/genproto v0.0.0-20220126215142-9970aeb2e350 google.golang.org/grpc v1.44.0 google.golang.org/protobuf v1.27.1 + nhooyr.io/websocket v1.8.7 ) require ( - github.com/davecgh/go-spew v1.1.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/gobwas/ws v1.1.0 // indirect github.com/golang/protobuf v1.5.2 // indirect + github.com/google/go-cmp v0.5.7 // indirect + github.com/gorilla/websocket v1.5.0 // indirect + github.com/klauspost/compress v1.14.4 // indirect + github.com/kr/pretty v0.1.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect - golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27 // indirect golang.org/x/text v0.3.7 // indirect + gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect ) + +require ( + golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect + golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27 // indirect +) diff --git a/go.sum b/go.sum index 6f6e62c..2db9122 100644 --- a/go.sum +++ b/go.sum @@ -14,8 +14,9 @@ github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -24,11 +25,32 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.m github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= +github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= +github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14= +github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M= +github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q= +github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= +github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no= +github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= +github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY= +github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI= +github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= +github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU= +github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM= +github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= +github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og= +github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= +github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= +github.com/gobwas/ws v1.1.0 h1:7RFti/xnNkMJnrK7D1yQ/iCIB5OrrY/54/H930kIbHA= +github.com/gobwas/ws v1.1.0/go.mod h1:nzvNcVha5eUziGrbxFCo6qFIojQHjJV5cLYIbezhfL0= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= @@ -45,19 +67,48 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= +github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/json-iterator/go v1.1.9 h1:9yzud/Ht36ygwatGx56VwCZtlI/2AD15T1X2sjSuGns= +github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4= +github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= +github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= +github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= +github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= +github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= +github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -84,8 +135,10 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201207223542-d4d67f95c62d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -95,10 +148,12 @@ golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -137,11 +192,17 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g= +nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= diff --git a/internal/ctxkey/keys.go b/internal/ctxkey/keys.go new file mode 100644 index 0000000..8fb5122 --- /dev/null +++ b/internal/ctxkey/keys.go @@ -0,0 +1,3 @@ +package ctxkey + +type TunnelOptions struct{} diff --git a/internal/ctxutil/util.go b/internal/ctxutil/util.go new file mode 100644 index 0000000..e1a84dd --- /dev/null +++ b/internal/ctxutil/util.go @@ -0,0 +1,30 @@ +package ctxutil + +import ( + "context" + + "google.golang.org/grpc/peer" + + "go.minekube.com/connect" + "go.minekube.com/connect/internal/ctxkey" +) + +func tunnelOptions(ctx context.Context) connect.TunnelOptions { + opts, _ := ctx.Value(ctxkey.TunnelOptions{}).(connect.TunnelOptions) + return opts +} + +func TunnelOptionsOrDefault(ctx context.Context) connect.TunnelOptions { + opts := tunnelOptions(ctx) + if opts.LocalAddr == nil { + opts.LocalAddr = connect.Addr("unknown") + } + if opts.RemoteAddr == nil { + if p, ok := peer.FromContext(ctx); ok { + opts.RemoteAddr = p.Addr + } else { + opts.RemoteAddr = connect.Addr("unknown") + } + } + return opts +} diff --git a/internal/grpc/README.md b/internal/grpc/README.md new file mode 100644 index 0000000..736e82b --- /dev/null +++ b/internal/grpc/README.md @@ -0,0 +1,9 @@ +# grpc + +This package was already tested to work, but we moved over to maintaining WebSocket +firstly because of wider support by web proxies. + +These are the steps to introduce the grpc package back: +- don't move this `grpc` package to root +- instead, create a new `grpc` package and delete this one +- the new `grpc` package should take package `ws` as best practice reference diff --git a/internal/grpc/aliases.go b/internal/grpc/aliases.go new file mode 100644 index 0000000..6e0797a --- /dev/null +++ b/internal/grpc/aliases.go @@ -0,0 +1,65 @@ +package grpc + +import ( + "google.golang.org/genproto/googleapis/rpc/status" + + pb "go.minekube.com/connect/internal/api/minekube/connect/v1alpha1" +) + +// Type alias to better support updating versions. +// See the referenced type for documentation. +// +// Other go files should only ever use the provided +// alias type and never import a specific proto version. +type ( + Session = pb.Session + Authentication = pb.Authentication + + SessionRejection = pb.SessionRejection + RejectionReason = status.Status // The reason why a session proposal is rejected. + + Player = pb.Player + GameProfile = pb.GameProfile + GameProfileProperty = pb.GameProfileProperty + + WatchServiceClient = pb.WatchServiceClient + WatchServiceServer = pb.WatchServiceServer + WatchService_WatchClient = pb.WatchService_WatchClient + WatchService_WatchServer = pb.WatchService_WatchServer + WatchRequest = pb.WatchRequest + WatchResponse = pb.WatchResponse + + TunnelServiceClient = pb.TunnelServiceClient + TunnelServiceServer = pb.TunnelServiceServer + TunnelService_TunnelClient = pb.TunnelService_TunnelClient + TunnelService_TunnelServer = pb.TunnelService_TunnelServer + TunnelRequest = pb.TunnelRequest + TunnelResponse = pb.TunnelResponse + + UnimplementedWatchServiceServer struct { + pb.UnimplementedWatchServiceServer + } + UnimplementedTunnelServiceServer struct { + pb.UnimplementedTunnelServiceServer + } +) + +// Alias to better support updating versions. +// See the referenced type for documentation. +// +// Other go files should only ever use the provided +// alias type and never import a specific proto version. +var ( + NewWatchServiceClient = pb.NewWatchServiceClient + NewTunnelServiceClient = pb.NewTunnelServiceClient + + RegisterWatchServiceServer = pb.RegisterWatchServiceServer + RegisterTunnelServiceServer = pb.RegisterTunnelServiceServer +) + +// Well-known metadata keys +const ( + MDPrefix = "connect-" // The prefix of metadata keys. + MDSession = MDPrefix + "session" // Metadata key specifying the session id when calling the TunnelService. + MDEndpoint = MDPrefix + "endpoint" // Metadata key specifying the endpoint when calling the WatchService. +) diff --git a/deadline.go b/internal/grpc/deadline.go similarity index 96% rename from deadline.go rename to internal/grpc/deadline.go index f07558c..8cacec4 100644 --- a/deadline.go +++ b/internal/grpc/deadline.go @@ -1,4 +1,4 @@ -package connect +package grpc import ( "context" diff --git a/tunnel_client.go b/internal/grpc/tunnel_client.go similarity index 86% rename from tunnel_client.go rename to internal/grpc/tunnel_client.go index 97aaac9..28f3c56 100644 --- a/tunnel_client.go +++ b/internal/grpc/tunnel_client.go @@ -1,8 +1,9 @@ -package connect +package grpc import ( "context" "errors" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -10,7 +11,7 @@ import ( // TunnelOptions are the options for a call to Tunnel. type TunnelOptions struct { - TunnelCli TunnelServiceClient // The TunnelServiceClient to use for tunneling. + TunnelClient TunnelServiceClient // The TunnelServiceClient to use for tunneling. // The local address as reflected by TunnelConn.LocalAddr(). LocalAddr string // It is recommended to use the endpoint name. // The remote address as reflected by TunnelConn.RemoteAddr(). @@ -27,8 +28,8 @@ func Tunnel( callOpts ...grpc.CallOption, ) (TunnelConn, error) { // Validate options - if tunnelOpts.TunnelCli == nil { - return nil, errors.New("missing TunnelCli in TunnelOptions") + if tunnelOpts.TunnelClient == nil { + return nil, errors.New("missing TunnelClient in TunnelOptions") } if tunnelOpts.LocalAddr == "" { return nil, errors.New("missing LocalAddr in TunnelOptions") @@ -38,7 +39,7 @@ func Tunnel( } // Make tunnel connection tunnelCtx, tunnelCancel := context.WithCancel(ctx) - tunnelStream, err := tunnelOpts.TunnelCli.Tunnel(tunnelCtx, callOpts...) + tunnelStream, err := tunnelOpts.TunnelClient.Tunnel(tunnelCtx, callOpts...) if err != nil { tunnelCancel() return nil, status.Errorf(statusErr(err).Code(), "%s: %v", "could not create tunnel", err) diff --git a/tunnel_conn.go b/internal/grpc/tunnel_conn.go similarity index 99% rename from tunnel_conn.go rename to internal/grpc/tunnel_conn.go index 2fc8a32..b3923a3 100644 --- a/tunnel_conn.go +++ b/internal/grpc/tunnel_conn.go @@ -1,4 +1,4 @@ -package connect +package grpc import ( "context" diff --git a/tunnel_reader.go b/internal/grpc/tunnel_reader.go similarity index 99% rename from tunnel_reader.go rename to internal/grpc/tunnel_reader.go index 1573081..44fd5b4 100644 --- a/tunnel_reader.go +++ b/internal/grpc/tunnel_reader.go @@ -1,4 +1,4 @@ -package connect +package grpc import ( "bytes" diff --git a/tunnel_reader_test.go b/internal/grpc/tunnel_reader_test.go similarity index 99% rename from tunnel_reader_test.go rename to internal/grpc/tunnel_reader_test.go index 2f786cf..79e286f 100644 --- a/tunnel_reader_test.go +++ b/internal/grpc/tunnel_reader_test.go @@ -1,4 +1,4 @@ -package connect +package grpc import ( "context" diff --git a/tunnel_service.go b/internal/grpc/tunnel_service.go similarity index 99% rename from tunnel_service.go rename to internal/grpc/tunnel_service.go index 9d02c26..df113c5 100644 --- a/tunnel_service.go +++ b/internal/grpc/tunnel_service.go @@ -1,4 +1,4 @@ -package connect +package grpc import ( "context" diff --git a/tunnel_writer.go b/internal/grpc/tunnel_writer.go similarity index 99% rename from tunnel_writer.go rename to internal/grpc/tunnel_writer.go index 682aaab..a2fd14b 100644 --- a/tunnel_writer.go +++ b/internal/grpc/tunnel_writer.go @@ -1,4 +1,4 @@ -package connect +package grpc import ( "context" diff --git a/tunnel_writer_test.go b/internal/grpc/tunnel_writer_test.go similarity index 98% rename from tunnel_writer_test.go rename to internal/grpc/tunnel_writer_test.go index 29ba7bd..7e64976 100644 --- a/tunnel_writer_test.go +++ b/internal/grpc/tunnel_writer_test.go @@ -1,4 +1,4 @@ -package connect +package grpc import ( "context" diff --git a/watch_client.go b/internal/grpc/watch_client.go similarity index 89% rename from watch_client.go rename to internal/grpc/watch_client.go index dc41ca3..33c0b93 100644 --- a/watch_client.go +++ b/internal/grpc/watch_client.go @@ -1,10 +1,11 @@ -package connect +package grpc import ( "context" "errors" - "google.golang.org/grpc" "io" + + "google.golang.org/grpc" ) // SessionProposal specifies an incoming session proposal. @@ -16,7 +17,7 @@ type SessionProposal interface { // WatchOptions are the options for a call to Watch. type WatchOptions struct { - Cli WatchServiceClient // The WatchServiceClient to use for watching. + Client WatchServiceClient // The WatchServiceClient to use for watching. Callback func(proposal SessionProposal) error // The callback that is called when receiving session proposals. } @@ -30,8 +31,8 @@ func Watch( opts ...grpc.CallOption, ) error { // Validate options - if watchOpts.Cli == nil { - return errors.New("missing Cli in WatchOptions") + if watchOpts.Client == nil { + return errors.New("missing Client in WatchOptions") } if watchOpts.Callback == nil { return errors.New("missing Callback in WatchOptions") @@ -39,7 +40,7 @@ func Watch( // Start watch watchCtx, cancel := context.WithCancel(ctx) defer cancel() - stream, err := watchOpts.Cli.Watch(watchCtx, opts...) + stream, err := watchOpts.Client.Watch(watchCtx, opts...) if err != nil { return err } diff --git a/watch_service.go b/internal/grpc/watch_service.go similarity index 96% rename from watch_service.go rename to internal/grpc/watch_service.go index a169f30..8e8184a 100644 --- a/watch_service.go +++ b/internal/grpc/watch_service.go @@ -1,11 +1,11 @@ -package connect +package grpc import ( "context" "errors" - "google.golang.org/grpc" - "io" "net" + + "google.golang.org/grpc" ) // Watcher represents a watching endpoint. @@ -93,14 +93,10 @@ func (w *watcher) startRecvRejections() { for { req, err := w.stream.Recv() if err != nil { - if errors.Is(err, io.EOF) { - return // stream closed, done - } return // drop error } if req.GetSessionRejection() == nil { - // Unexpected - continue + continue // Unexpected } select { case w.rejections <- req.GetSessionRejection(): diff --git a/watch_service_test.go b/internal/grpc/watch_service_test.go similarity index 98% rename from watch_service_test.go rename to internal/grpc/watch_service_test.go index 08bd9b1..a8c9cd5 100644 --- a/watch_service_test.go +++ b/internal/grpc/watch_service_test.go @@ -1,17 +1,18 @@ -package connect +package grpc import ( "context" "errors" + "net" + "testing" + "time" + "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" - "net" - "testing" - "time" ) func TestWatchService_Serve(t *testing.T) { @@ -50,7 +51,7 @@ func TestWatchService_Serve(t *testing.T) { var proposals int ctx = metadata.AppendToOutgoingContext(ctx, MDEndpoint, "foo") err = Watch(ctx, WatchOptions{ - Cli: watchCli, + Client: watchCli, Callback: func(proposal SessionProposal) error { proposals++ require.Equal(t, "abc", proposal.Session().GetId()) @@ -111,7 +112,7 @@ func TestSessionProposal_Reject(t *testing.T) { watchCli := NewWatchServiceClient(cliConn) var proposals int err = Watch(ctx, WatchOptions{ - Cli: watchCli, + Client: watchCli, Callback: func(proposal SessionProposal) error { proposals++ return proposal.Reject(nil) // reject all diff --git a/internal/netutil/util.go b/internal/netutil/util.go new file mode 100644 index 0000000..83fe031 --- /dev/null +++ b/internal/netutil/util.go @@ -0,0 +1,24 @@ +package netutil + +import ( + "fmt" + "net" +) + +type Conn struct { + net.Conn + CloseFn func() error + VLocalAddr, VRemoteAddr net.Addr +} + +func (c *Conn) Close() error { return c.CloseFn() } +func (c *Conn) LocalAddr() net.Addr { return c.VLocalAddr } +func (c *Conn) RemoteAddr() net.Addr { return c.VRemoteAddr } +func (c *Conn) String() string { + if s, ok := c.Conn.(fmt.Stringer); ok { + return s.String() + } + return fmt.Sprintf("Tunnel(remote=%s via %s, local=%s via %s)", + c.RemoteAddr().String(), c.RemoteAddr().Network(), + c.LocalAddr().String(), c.LocalAddr().Network()) +} diff --git a/internal/testutil/suite.go b/internal/testutil/suite.go new file mode 100644 index 0000000..92e29fc --- /dev/null +++ b/internal/testutil/suite.go @@ -0,0 +1,178 @@ +package testutil + +import ( + "context" + "fmt" + "io" + "sync/atomic" + "time" + + "github.com/stretchr/testify/suite" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "go.minekube.com/connect" +) + +type Suite struct { + suite.Suite + + Endpoint connect.Endpoint + StartWatchServer func(ctx context.Context, ln connect.EndpointListener) error + StartTunnelServer func(ctx context.Context, ln connect.TunnelListener) error +} + +func (suite *Suite) SetupTest() { + suite.NotNil(suite.Endpoint) + suite.NotNil(suite.StartWatchServer) +} + +func (suite *Suite) TestWatchReject() { + proposal := &connect.Session{Id: "abc"} + rejection := status.New(codes.Aborted, "don't want this").Proto() + + var expSeq = []string{ + "s: got watcher", + "c: got proposal " + proposal.GetId(), + "c: rejection sent", + "s: got rejection " + rejection.String(), + } + var seq sequence + + ctx, stop := context.WithTimeout(context.TODO(), time.Second*3) + + ln := acceptEndpoint(func(ctx context.Context, watch connect.EndpointWatch) error { + seq.Add("s: got watcher") + suite.NoError(watch.Propose(ctx, proposal)) + for rej := range watch.Rejections() { + seq.Add("s: got rejection " + rej.GetReason().String()) + break + } + time.Sleep(time.Millisecond * 100) // let "c: rejection sent" + + return nil + }) + + go func() { suite.NoError(suite.StartWatchServer(ctx, ln)) }() + + time.Sleep(time.Millisecond * 100) // Wait for server + go func() { + err := suite.Endpoint.Watch(ctx, func(proposal connect.SessionProposal) error { + seq.Add("c: got proposal " + proposal.Session().GetId()) + suite.NoError(proposal.Reject(ctx, rejection)) + seq.Add("c: rejection sent") + return nil + }) + suite.NotNil(err) + suite.Contains(err.Error(), "closed serverside") + stop() + }() + + <-ctx.Done() + suite.Assert().ErrorIs(ctx.Err(), context.Canceled) + suite.Assert().Equal(expSeq, seq.Get()) +} + +func (suite *Suite) TestTunnel() { + ctx, stop := context.WithTimeout(context.TODO(), time.Second*3) + + toClientMsg := []byte("hello client") + toServerMsg := []byte("hello server") + + var expSeq = []string{ + "c: tunnel opened Tunnel(remote=unknown via connect, local=unknown via connect)", + "s: got tunnel Tunnel(remote=unknown via connect, local=unknown via connect)", + "s: read", + "c: read", + "c: no read", + } + var seq sequence + + ln := acceptTunnel(func(ctx context.Context, tunnel connect.Tunnel) error { + time.Sleep(time.Millisecond * 100) // let "c: tunnel opened" + seq.Add("s: got tunnel " + fmt.Sprint(tunnel)) + + // client -> server + b := make([]byte, 100) + n, err := tunnel.Read(b) + suite.NoError(err) + suite.Equal(len(toServerMsg), n) + suite.Equal(toServerMsg, b[:n]) + seq.Add("s: read") + + // client <- server + n, err = tunnel.Write(toClientMsg) + suite.NoError(err) + suite.Equal(len(toClientMsg), n) + + // Close server side + suite.NoError(tunnel.Close()) + return nil + }) + + go func() { suite.NoError(suite.StartTunnelServer(ctx, ln)) }() + + time.Sleep(time.Millisecond * 100) // Wait for server + go func() { + tunnel, err := suite.Endpoint.Tunnel(ctx) + suite.NoError(err) + seq.Add("c: tunnel opened " + fmt.Sprint(tunnel)) + + // client -> server + n, err := tunnel.Write(toServerMsg) + suite.NoError(err) + suite.Equal(len(toServerMsg), n) + + // client <- server + b := make([]byte, 100) + n, err = tunnel.Read(b) + suite.NoError(err) + suite.Equal(len(toClientMsg), n) + suite.Equal(toClientMsg, b[:n]) + seq.Add("c: read") + + // Should be closed server side by now + b = make([]byte, 100) + for i := 0; i < 5; i++ { + n, err = tunnel.Read(b) + suite.Empty(n) + if err == nil { + continue // retry + } + } + suite.ErrorIs(err, io.EOF) + seq.Add("c: no read") + + _ = tunnel.Close() + stop() + }() + + <-ctx.Done() + suite.Assert().ErrorIs(ctx.Err(), context.Canceled) + suite.Assert().Equal(expSeq, seq.Get()) +} + +type sequence struct { + v atomic.Value +} + +func (s *sequence) Add(str string) { + s.v.Store(append(s.Get(), str)) +} + +func (s *sequence) Get() []string { + str, _ := s.v.Load().([]string) + return str +} + +type acceptEndpoint func(ctx context.Context, watch connect.EndpointWatch) error + +func (fn acceptEndpoint) AcceptEndpoint(ctx context.Context, watch connect.EndpointWatch) error { + return fn(ctx, watch) +} + +type acceptTunnel func(ctx context.Context, tunnel connect.Tunnel) error + +func (fn acceptTunnel) AcceptTunnel(ctx context.Context, tunnel connect.Tunnel) error { + return fn(ctx, tunnel) +} diff --git a/internal/util/util.go b/internal/util/util.go new file mode 100644 index 0000000..420e920 --- /dev/null +++ b/internal/util/util.go @@ -0,0 +1,63 @@ +package util + +import ( + "context" + "errors" + + "go.minekube.com/connect" +) + +type SessionProposal struct { + Proposal *connect.Session + RejectFn func(ctx context.Context, r *connect.RejectionReason) error +} + +func (p *SessionProposal) String() string { + return p.Proposal.String() +} + +func (p *SessionProposal) Session() *connect.Session { + return p.Proposal +} + +func (p *SessionProposal) Reject(ctx context.Context, r *connect.RejectionReason) error { + return p.RejectFn(ctx, r) +} + +type EndpointWatch struct { + ProposeFn func(ctx context.Context, session *connect.Session) error + RejectionsChan chan *connect.SessionRejection + + Receive func() (*connect.WatchRequest, error) +} + +func (w *EndpointWatch) Propose(ctx context.Context, session *connect.Session) error { + if session == nil { + return errors.New("session must not be nil") + } + if session.GetId() == "" { + return errors.New("missing session id in proposal") + } + return w.ProposeFn(ctx, session) +} +func (w *EndpointWatch) Rejections() <-chan *connect.SessionRejection { + return w.RejectionsChan +} + +func (w *EndpointWatch) StartReceiveRejections(ctx context.Context) { + defer close(w.RejectionsChan) + for { + req, err := w.Receive() + if err != nil { + return // drop error + } + if req.GetSessionRejection() == nil { + continue // Unexpected + } + select { + case w.RejectionsChan <- req.GetSessionRejection(): + case <-ctx.Done(): + return // stream closed, done + } + } +} diff --git a/ws/client.go b/ws/client.go new file mode 100644 index 0000000..32f4727 --- /dev/null +++ b/ws/client.go @@ -0,0 +1,130 @@ +package ws + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + + "google.golang.org/grpc/metadata" + "nhooyr.io/websocket" + "nhooyr.io/websocket/wspb" + + "go.minekube.com/connect" + "go.minekube.com/connect/internal/ctxutil" + "go.minekube.com/connect/internal/netutil" + "go.minekube.com/connect/internal/util" +) + +// ClientOptions for Watch and Tunnel, implementing connect.Tunneler. +type ClientOptions struct { + URL string // The URL of the WebSocket server + DialContext context.Context // Optional WebSocket dial context + DialOptions websocket.DialOptions // Optional WebSocket dial options + Handshake HandshakeResponse // Optionally run after successful WebSocket handshake +} + +// HandshakeResponse is called after receiving the +// WebSocket handshake response from the server. +type HandshakeResponse func(ctx context.Context, res *http.Response) (context.Context, error) + +// Tunnel implements connect.Tunneler and creates a connection over a WebSocket. +func (o ClientOptions) Tunnel(ctx context.Context) (connect.Tunnel, error) { + ctx, ws, err := o.dial(ctx) + if err != nil { + return nil, err + } + + // Extract additional options + opts := ctxutil.TunnelOptionsOrDefault(ctx) + + // Return connection + ctx, cancel := context.WithCancel(ctx) + wsConn := websocket.NetConn(ctx, ws, websocket.MessageBinary) + return &netutil.Conn{ + Conn: wsConn, + CloseFn: func() error { cancel(); return wsConn.Close() }, + VLocalAddr: opts.LocalAddr, + VRemoteAddr: opts.RemoteAddr, + }, nil +} + +// Watch implements connect.Watcher and watches for session proposals. +func (o ClientOptions) Watch(ctx context.Context, propose connect.ReceiveProposal) error { + ctx, ws, err := o.dial(ctx) + if err != nil { + return err + } + // Watch for session proposals + for { + res := new(connect.WatchResponse) + err = wspb.Read(ctx, ws, res) + if err != nil { + break + } + if res.GetSession() == nil { + continue + } + id := res.GetSession().GetId() + rejectFn := func(ctx context.Context, r *connect.RejectionReason) error { + return wspb.Write(ctx, ws, &connect.WatchRequest{ + SessionRejection: &connect.SessionRejection{ + Id: id, + Reason: r, + }, + }) + } + proposal := &util.SessionProposal{ + Proposal: res.GetSession(), + RejectFn: rejectFn, + } + if err = propose(proposal); err != nil { + break + } + } + _ = ws.Close(websocket.StatusNormalClosure, err.Error()) + if errors.Is(err, io.EOF) { + err = nil + } + return err +} + +func (o *ClientOptions) dial(ctx context.Context) (context.Context, *websocket.Conn, error) { + if o.URL == "" { + return nil, nil, errors.New("missing websocket url") + } + + // Add metadata to websocket handshake request header + md, _ := metadata.FromOutgoingContext(ctx) + if o.DialOptions.HTTPHeader == nil { + o.DialOptions.HTTPHeader = http.Header(md) + } else { + header := metadata.Join(metadata.MD(o.DialOptions.HTTPHeader), md) + o.DialOptions.HTTPHeader = http.Header(header) + } + + // Dial service + if o.DialContext == nil { + o.DialContext = ctx + } + ws, res, err := websocket.Dial(o.DialContext, o.URL, &o.DialOptions) + if err != nil { + return nil, nil, fmt.Errorf("error handshaking with websocket server: %w", err) + } + + // Callback for handshake response + if o.Handshake != nil { + ctx, err = o.Handshake(ctx, res) + if err != nil { + _ = ws.Close(websocket.StatusNormalClosure, fmt.Sprintf( + "handshake response rejected: %v", err)) + return nil, nil, err + } + } + + return ctx, ws, nil +} + +var _ connect.Tunneler = (*ClientOptions)(nil) +var _ connect.Watcher = (*ClientOptions)(nil) diff --git a/ws/server.go b/ws/server.go new file mode 100644 index 0000000..f86625c --- /dev/null +++ b/ws/server.go @@ -0,0 +1,152 @@ +package ws + +import ( + "context" + "fmt" + "net/http" + "time" + + "google.golang.org/grpc/metadata" + "nhooyr.io/websocket" + "nhooyr.io/websocket/wspb" + + "go.minekube.com/connect" + "go.minekube.com/connect/internal/ctxutil" + "go.minekube.com/connect/internal/netutil" + "go.minekube.com/connect/internal/util" +) + +// ServerOptions for TunnelHandler and EndpointHandler. +type ServerOptions struct { + AcceptOptions websocket.AcceptOptions // Optional WebSocket accept options +} + +// TunnelHandler returns a new http.Handler for accepting WebSocket requests for tunneling. +func (o ServerOptions) TunnelHandler(ln connect.TunnelListener) http.Handler { + fn := func(ctx context.Context, w http.ResponseWriter, r *http.Request) error { + // Accept WebSocket + ws, err := websocket.Accept(w, r, &o.AcceptOptions) + if err != nil { + return err + } + + // Extract additional options + opts := ctxutil.TunnelOptionsOrDefault(ctx) + + // Create tunnel from WebSocket + ctx, cancel := context.WithCancel(ctx) + wsConn := websocket.NetConn(ctx, ws, websocket.MessageBinary) + conn := &netutil.Conn{ + Conn: wsConn, + CloseFn: func() error { cancel(); return wsConn.Close() }, + VLocalAddr: opts.LocalAddr, + VRemoteAddr: opts.RemoteAddr, + } + defer conn.Close() + + // Add WebSocket handshake request header to ctx metadata + md, _ := metadata.FromIncomingContext(ctx) + md = metadata.Join(md, metadata.MD(r.Header)) + ctx = metadata.NewIncomingContext(ctx, md) + + // Add http request to ctx + ctx = withRequest(ctx, r) + + // Accept tunnel + if err = ln.AcceptTunnel(ctx, conn); err != nil { + // Specify meaningful close error + _ = ws.Close(websocket.StatusProtocolError, fmt.Sprintf( + "did not accept tunnel: %v", err)) + return err + } + + // Block handler until tunnel closure + <-ctx.Done() + _ = ws.Close(websocket.StatusNormalClosure, "closed serverside") + return nil + } + + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Dropping this error as http.Error(...) would be already called + // at this point by our WebSocket library. + _ = fn(r.Context(), w, r) + }) +} + +// EndpointHandler returns a new http.Handler for accepting WebSocket requests for watching endpoints. +func (o ServerOptions) EndpointHandler(ln connect.EndpointListener) http.Handler { + fn := func(ctx context.Context, w http.ResponseWriter, r *http.Request) error { + // Accept WebSocket + ws, err := websocket.Accept(w, r, &o.AcceptOptions) + if err != nil { + return err + } + + // Prepare endpoint watch + ctx, cancel := context.WithCancel(ctx) + defer cancel() + ew := &util.EndpointWatch{ + ProposeFn: func(ctx context.Context, session *connect.Session) error { + return wspb.Write(ctx, ws, &connect.WatchResponse{Session: session}) + }, + Receive: func() (*connect.WatchRequest, error) { + req := new(connect.WatchRequest) + return req, wspb.Read(ctx, ws, req) + }, + RejectionsChan: make(chan *connect.SessionRejection), + } + + // Receive session rejections from endpoint + go func() { ew.StartReceiveRejections(ctx); cancel() }() + go pingLoop(ctx, pingInterval, ws) + + // Add http request to ctx + ctx = withRequest(ctx, r) + + // Start blocking watch callback + if err = ln.AcceptEndpoint(ctx, ew); err != nil { + // Specify meaningful close error + _ = ws.Close(websocket.StatusProtocolError, fmt.Sprintf( + "did not accept endpoint: %v", err)) + return err + } + + _ = ws.Close(websocket.StatusNormalClosure, "closed serverside") + return nil + } + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Dropping this error as http.Error(...) would be already called + // at this point by our WebSocket library. + _ = fn(r.Context(), w, r) + }) +} + +// RequestFromContext returns the accepted WebSocket request from the context. +func RequestFromContext(ctx context.Context) *http.Request { + r, _ := ctx.Value(httpRequestContextKey{}).(*http.Request) + return r +} + +const pingInterval = time.Second * 50 + +// Send periodic pings to keep the WebSocket active since some web proxies +// timeout the connection after 60-100 seconds. +// https://community.cloudflare.com/t/cloudflare-websocket-timeout/5865/2 +func pingLoop(ctx context.Context, d time.Duration, ws *websocket.Conn) { + t := time.NewTicker(d) + defer t.Stop() + for { + select { + case <-t.C: + _ = ws.Ping(ctx) + case <-ctx.Done(): + return + } + } +} + +type httpRequestContextKey struct{} + +func withRequest(ctx context.Context, r *http.Request) context.Context { + return context.WithValue(ctx, httpRequestContextKey{}, r) +} diff --git a/ws/suite_test.go b/ws/suite_test.go new file mode 100644 index 0000000..dbf4435 --- /dev/null +++ b/ws/suite_test.go @@ -0,0 +1,33 @@ +package ws + +import ( + "context" + "net/http" + "testing" + + "github.com/stretchr/testify/suite" + + "go.minekube.com/connect" + "go.minekube.com/connect/internal/testutil" +) + +func TestSuite(t *testing.T) { + suite.Run(t, &testutil.Suite{ + Endpoint: ClientOptions{ + URL: "ws://:8080", + }, + StartWatchServer: func(ctx context.Context, ln connect.EndpointListener) error { + return startServer(ctx, ":8080", ServerOptions{}.EndpointHandler(ln)) + }, + StartTunnelServer: func(ctx context.Context, ln connect.TunnelListener) error { + return startServer(ctx, ":8080", ServerOptions{}.TunnelHandler(ln)) + }, + }) +} + +func startServer(ctx context.Context, addr string, handler http.Handler) error { + svr := &http.Server{Addr: addr, Handler: handler} + go func() { _ = svr.ListenAndServe() }() + <-ctx.Done() + return svr.Close() +}