Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add streaming support #5

Merged
merged 1 commit into from
Mar 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 24 additions & 29 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -1,40 +1,35 @@
on:
push:
branches: [ main ]
branches: [main]
pull_request:
name: Test
jobs:
test:
strategy:
matrix:
go-version: [1.18.x]
go-version: [1.21.x, 1.22.x]
platform: [ubuntu-latest, macos-latest, windows-latest]
runs-on: ${{ matrix.platform }}
steps:
- name: Install Go
uses: actions/setup-go@v3
with:
go-version: ${{ matrix.go-version }}
- name: Install staticcheck
run: go install honnef.co/go/tools/cmd/staticcheck@latest
shell: bash
- name: Install golint
run: go install golang.org/x/lint/golint@latest
shell: bash
- name: Update PATH
run: echo "$(go env GOPATH)/bin" >> $GITHUB_PATH
shell: bash
- name: Checkout code
uses: actions/checkout@v1
- name: Fmt
if: matrix.platform != 'windows-latest' # :(
run: "diff <(gofmt -d .) <(printf '')"
shell: bash
- name: Vet
run: go vet ./...
- name: Staticcheck
run: staticcheck ./...
- name: Lint
run: golint ./...
- name: Test
run: go test -race ./...
- name: Install Go
uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go-version }}
- name: Install staticcheck
run: go install honnef.co/go/tools/cmd/staticcheck@latest
shell: bash
- name: Update PATH
run: echo "$(go env GOPATH)/bin" >> $GITHUB_PATH
shell: bash
- name: Checkout code
uses: actions/checkout@v4
- name: Fmt
if: matrix.platform != 'windows-latest' # :(
run: "diff <(gofmt -d .) <(printf '')"
shell: bash
- name: Vet
run: go vet ./...
- name: Staticcheck
run: staticcheck ./...
- name: Test
run: go test -race ./...
132 changes: 98 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,55 @@ This library implements a simple, custom [RPC protocol](https://en.wikipedia.org
A strongly typed client may look like this:

```go
// Define the request, message and receipt types for the RPC call.
client, err := execrpc.StartClient(
execrpc.ClientOptions[model.ExampleRequest, model.ExampleResponse]{
execrpc.ClientOptions[model.ExampleRequest, model.ExampleMessage, model.ExampleReceipt]{
ClientRawOptions: execrpc.ClientRawOptions{
Version: 1,
Cmd: "go",
Dir: "./examples/servers/typed"
Dir: "./examples/servers/typed",
Args: []string{"run", "."},
Env: env,
Timeout: 30 * time.Second,
},
Codec: codecs.JSONCodec[model.ExampleRequest, model.ExampleResponse]{},
Codec: codec,
},
)

result, _ := client.Execute(model.ExampleRequest{Text: "world"})
if err != nil {
logg.Fatal(err)
}


// Consume standalone messages (e.g. log messages) in its own goroutine.
go func() {
for msg := range client.MessagesRaw() {
fmt.Println("got message", string(msg.Body))
}
}()

// Execute the request.
result := client.Execute(model.ExampleRequest{Text: "world"})

fmt.Println(result.Hello)
// Check for errors.
if err; result.Err(); err != nil {
logg.Fatal(err)
}

// Consume the messages.
for m := range result.Messages() {
fmt.Println(m)
}

//...
// Wait for the receipt.
receipt := result.Receipt()

// Check again for errors.
if err; result.Err(); err != nil {
logg.Fatal(err)
}

client.Close()
fmt.Println(receipt.Text)

```

Expand All @@ -35,41 +65,75 @@ And the server side of the above:

```go
func main() {
server, _ := execrpc.NewServer(
execrpc.ServerOptions[model.ExampleRequest, model.ExampleResponse]{
Call: func(d execrpc.Dispatcher, req model.ExampleRequest) model.ExampleResponse {
return model.ExampleResponse{
Hello: "Hello " + req.Text + "!",
}
getHasher := func() hash.Hash {
return fnv.New64a()
}

server, err := execrpc.NewServer(
execrpc.ServerOptions[model.ExampleRequest, model.ExampleMessage, model.ExampleReceipt]{
// Optional function to get a hasher for the ETag.
GetHasher: getHasher,

// Allows you to delay message delivery, and drop
// them after reading the receipt (e.g. the ETag matches the ETag seen by client).
DelayDelivery: false,

// Handle the incoming call.
Handle: func(c *execrpc.Call[model.ExampleRequest, model.ExampleMessage, model.ExampleReceipt]) {
// Raw messages are passed directly to the client,
// typically used for log messages.
c.SendRaw(
execrpc.Message{
Header: execrpc.Header{
Version: 32,
Status: 150,
},
Body: []byte("a log message"),
},
)

// Enqueue one or more messages.
c.Enqueue(
model.ExampleMessage{
Hello: "Hello 1!",
},
model.ExampleMessage{
Hello: "Hello 2!",
},
)

c.Enqueue(
model.ExampleMessage{
Hello: "Hello 3!",
},
)

// Wait for the framework generated receipt.
receipt := <-c.Receipt()

// ETag provided by the framework.
// A hash of all message bodies.
fmt.Println("Receipt:", receipt.ETag)

// Modify if needed.
receipt.Size = uint32(123)

// Close the message stream.
c.Close(false, receipt)
},
},
)
if err != nil {
log.Fatal(err)
}

// Start the server. This will block.
if err := server.Start(); err != nil {
// ... handle error
log.Fatal(err)
}
_ = server.Wait()
}
```

Of the included codecs, JSON seems to win by a small margin (but only tested with small requests/responses):

```bsh
name time/op
Client/JSON-10 4.89µs ± 0%
Client/TOML-10 5.51µs ± 0%
Client/Gob-10 17.0µs ± 0%

name alloc/op
Client/JSON-10 922B ± 0%
Client/TOML-10 1.67kB ± 0%
Client/Gob-10 9.22kB ± 0%

name allocs/op
Client/JSON-10 19.0 ± 0%
Client/TOML-10 28.0 ± 0%
Client/Gob-10 227 ± 0%
```

## Status Codes

The status codes in the header between 1 and 99 are reserved for the system. This will typically be used to catch decoding/encoding errors on the server.
Loading
Loading