Transport layer for go-kit
See examples in tests files at the links:
StreamEndpoint, Endpoint, TestService
transport/wirenet.go
// server side
func MakeWirenetHandlers(wire wirenet.Wire, endpoints Set) {
wire.Stream("uploadFile", wirenettransport.NewStreamServer(
endpoints.UploadFileEndpoint,
uploadFileServerSideCodec,
[]wirenettransport.StreamServerOption{}...,
).Handle)
}
transport/wirenet.go
// client side
func MakeWirenetClient(wire wirenet.Wire) Service {
return &Set{
UploadFileEndpoint: wirenettransport.NewStreamClient(
wire,
"uploadFile",
uploadFileClientSideCodec,
[]wirenettransport.StreamClientOption{}...,
).Endpoint(),
}
}
transport/wirenet_codec.go
func uploadFileServerSideCodec(_ context.Context, s wirenet.Stream) (interface{}, error) {
defer s.Close()
w := s.Writer()
r := s.Reader()
// read fileInfo
var req UploadFileRequest
if err := json.NewDecoder(r).Decode(&req); err != nil {
return nil, err
}
r.Close()
// read data
fp := filepath.Join(os.TempDir(), req.Name)
file, err := os.Create(fp)
if err != nil {
return nil, err
}
defer file.Close()
_, err = s.WriteTo(file)
resp := &UploadFileResponse{
Err: err,
}
// write data
if err := json.NewEncoder(w).Encode(resp); err != nil {
return nil, err
}
return resp, nil
}
transport/wirenet_codec.go
func uploadFileClientSideCodec(_ context.Context, request interface{}, s wirenet.Stream) (interface{}, error) {
req := request.(UploadFileRequest)
file, err := os.Open(req.Filepath)
if err != nil {
return nil, err
}
defer file.Close()
defer s.Close()
w := s.Writer()
r := s.Reader()
// write fileInfo
if err := json.NewEncoder(w).Encode(&req); err != nil {
return nil, err
}
w.Close()
// write data
if _, err = s.ReadFrom(file); err != nil {
return nil, err
}
// read data
var resp UploadFileResponse
if err := json.NewDecoder(r).Decode(&resp); err != nil {
return err, nil
}
r.Close()
return resp, nil
}
cmd/client.go
client := MakeWirenetClient(wire)
...
sess := sessionHub.findSessionByUser("123")
...
// Sets the current wirenet session id
ctxWithSess := wirenettransport.InjectSessionID(sess.ID(), ctx)
err = client.UploadFile(ctxWithSess, "./test/testdata/data.db", 1024, "data.db")
cmd/server.go
svc := NewService()
endpoints := NewEndpointSet(svc)
MakeWirenetHandlers(wire, endpoints)
transport/wirenet.go
// server side
func MakeWirenetHandlers(wire wirenet.Wire, endpoints Set) {
options := make([]wirenettransport.ServerOption, 0)
wire.Stream("updateBalance", wirenettransport.NewServer(
endpoints.UpdateBalanceEndpoint,
decodeWirenetUpdateBalanceRequest,
encodeWirenetUpdateBalanceResponse,
options...,
).Handle)
}
transport/wirenet.go
// client side
func MakeWirenetClient(wire wirenet.Wire) Service {
options := make([]wirenettransport.ClientOption, 0)
return &Set{
UpdateBalanceEndpoint: wirenettransport.NewClient(
wire,
"updateBalance",
encodeWirenetUpdateBalanceRequest,
decodeWirenetUpdateBalanceResponse,
options...,
).Endpoint(),
}
}
transport/wirenet_encode_decode.go
func decodeWirenetUpdateBalanceRequest(_ context.Context, r io.ReadCloser) (request interface{}, err error) {
defer r.Close()
var req UpdateBalanceRequest
err = json.NewDecoder(r).Decode(&req)
return req, err
}
func encodeWirenetUpdateBalanceRequest(_ context.Context, request interface{}, w io.WriteCloser) error {
defer w.Close()
return json.NewEncoder(w).Encode(&request)
}
func decodeWirenetUpdateBalanceResponse(_ context.Context, r io.ReadCloser) (response interface{}, err error) {
defer r.Close()
var resp UpdateBalanceResponse
err = json.NewDecoder(r).Decode(&resp)
return resp, err
}
func encodeWirenetUpdateBalanceResponse(_ context.Context, response interface{}, w io.WriteCloser) error {
defer w.Close()
return json.NewEncoder(w).Encode(response)
}
cmd/server.go
// go-kit
svc := NewService()
endpoints := NewEndpointSet(svc)
MakeWirenetHandlers(wire, endpoints)
cmd/client.go
client := MakeWirenetClient(wire)
...
sess := sessionHub.findSessionByUser("123")
...
// Sets the current wirenet session id
ctxWithSess := wirenettransport.InjectSessionID(sess.ID(), ctx)
sum, err := client.UpdateBalance(ctxWithSess, 1, 4)