Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Commit

Permalink
v0.1.0 release [BREAKING CHANGES]
Browse files Browse the repository at this point in the history
  • Loading branch information
rajveermalviya committed May 28, 2020
1 parent d73d07e commit 6336ad3
Show file tree
Hide file tree
Showing 8 changed files with 1,169 additions and 577 deletions.
159 changes: 99 additions & 60 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,120 +44,161 @@ For documentation check [godoc](https://godoc.org/github.com/unifrost/unifrost).

unifrost uses Server-Sent-Events, because of this it doesn't require to run a
standalone server, unlike websockets it can be embedded in your api server.
unifrost's streamer has a ServeHTTP method i.e it implements http.Handler interface
unifrost's stream handler has a ServeHTTP method i.e it implements http.Handler interface
so that it can be used directly or can be wrapped with middlewares like
Authentication easily.

```go
// Using streamer directly
streamer, err := unifrost.NewStreamer(
// Golang (psuedo-code)

// Using stream handler directly
streamHandler, err := unifrost.NewStreamHandler(
ctx,
&memdriver.Client{},
unifrost.ClientTTL(2*time.Second),
unifrost.ConsumerTTL(2*time.Second),
)
log.Fatal("HTTP server error: ", http.ListenAndServe("localhost:3000", streamer))
log.Fatal("HTTP server error: ", http.ListenAndServe("localhost:3000", streamHandler))
```

```go
// Using streamer by wrapping it in auth middleware
streamer, err := unifrost.NewStreamer(
// Golang (psuedo-code)

// Using stream handler by wrapping it in auth middleware
streamHandler, err := unifrost.NewStreamHandler(
ctx,
&memdriver.Client{},
unifrost.ClientTTL(2*time.Second),
unifrost.ConsumerTTL(2*time.Second),
)

mux := http.NewServeMux()
mux.HandleFunc("/events", func (w http.ResponseWriter, r *http.Request) {
err := Auth(w,r)
err := Auth(r)
if err != nil {
return
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
streamer.ServeHTTP(w,r)

streamHandler.ServeHTTP(w,r)
})
log.Fatal("HTTP server error: ", http.ListenAndServe("localhost:3000", mux))
```

When client connects to the server it will send a message that will contain
two things the configuration and an array of all the topics to which the client
has subscribed.
# Message Protocol

Every message sent by the server is encoded in plaintext in JSON,
it contains topic and the payload.

Every message will be an array of length 2, first index will be the topic string,
second index will be the payload in string type.

When consumer connects to the server, server sends a preflight message that contains
the initial server configuration and list of topics the consumer has already been subscribed.

1. Configuration: it contains the client-id and client-ttl set by the
streamer config
2. Subscriptions associated with the specified client id.
1. Configuration: it contains the consumer_id and consumer_ttl set by the
stream handler config
2. Subscriptions associated with the specified consumer id.

Example first message:

```json
{
"config": {
"client_id": "9ba6f4e1-8f80-4e61-944e-e3f409ae514f",
"client_ttl_millis": 60000
},
"subscriptions": ["topic5", "topic1", "topic3", "topic4"]
}
[
"/unifrost/info",

"{\"config\":{\"consumer_id\":\"unique-id\",\"consumer_ttl_millis\":2000},\"subscriptions\":[]}"
]
```

Example error messaage:
Example error message:

```json
{
"error": {
"topic": "topic10",
"code": "subscription-failure",
"message": "Cannot recieve message from subscription, closing subscription"
}
}
[
"/unifrost/error",

"{\"error\":{\"code\":\"subscription-failure\",\"message\":\"Cannot receive message from subscription, closing subscription\",\"topic\":\"topic3\"}}"
]
```

All the messages are streamed over single channel, i.e using EventSource JS API
`new EventSource().onmessage` or `new EventSource().addEventListener('message', (e) =>{})`
methods will listen to them.

All the info events are streamed over message channel i.e using the EventSource JS API,
`onmessage` or `addEventListener('message', () => {})` method will listen to them.
All the subscription events have event name same as their topic name, so to listen to
topic events you need to add an event-listener on the EventSource object.

# Example

Client example:

```js
const sse = new EventSource('/events?id=9ba6f4e1-8f80-4e61-944e-e3f409ae514f');
```ts
// Typescript (psuedo-code)
const consumerID = 'unique-id';

const sse = new EventSource(`/events?id=${consumerID}`);
// for info events like first-message and errors
sse.addEventListener('message', e => {
console.log(e);
});
sse.addEventListener('message', (e) => {
const message = JSON.parse(e.data);

// for subscription events
sse.addEventListener('topic10', e => {
console.log(e);
const topic = message[0] as String;
const payload = message[1] as String;

// Payload is the exact message from Pub Sub broker, probably JSON.
// Decode payload
const data = JSON.parse(payload);
});
```

**Note:** _The only way to listen to subscription events is by adding an eventlistener to that specific topic. `onmessage` method will only listen to info messages._
New consumer is registered explicitly using the `streamHandler.NewConsumer()`
with an auto generated id.
To register a consumer with custom id use `streamHandler.NewCustomConsumer(id)`

New client is created explicitly using the `streamer.NewClient()` for
client with auto generated id or `streamer.NewCustomClient()` for client
with specified id.
This makes it easy to integrate authentication with `unifrost.StreamHandler`.
One possible auth workflow can be, create a new unifrost consumer after login
and return the consumer id to the client to store it in the local storage of the
browser. Further using the consumer id to connect to the stream handler.

This makes it easy to integrate authentication to the streamer, just create
a new client when user connects to your application and return the unifrost
streamer `client_id` (custom or autogenerated) with your API auth workflow.
If you don't care about authentication, you can also generate a new client
automatically everytime a new client connects without the `id` parameter
If you don't care about authentication, you can also generate a new consumer
automatically everytime a new consumer connects without the `id` parameter
use the following middleware with the streamer.
And handle registering the `id` to your backend from your client.

```go
// Golang (psuedo-code)

mux.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) {
// Auto generate new clientID, when new client connects. (Not recommended)
// Auto generate new consumer_id, when new consumer connects.
q := r.URL.Query()
if q.Get("id") == "" {
client, _ := streamer.NewClient(ctx)
q.Set("id", client.ID)
consumer, _ := streamHandler.NewConsumer(ctx)
q.Set("id", consumer.ID)
r.URL.RawQuery = q.Encode()
}

streamer.ServeHTTP(w, r)
})
```

When a client gets disconnected it has a time window to connect to the server
again with the state unchanged. If client ttl is not specified in the
When a consumer gets disconnected it has a time window to connect to the server
again with the state unchanged. If consumer ttl is not specified in the
streamer config then default ttl is set to one.

# Managing subscriptions

`unifrost.StreamHandler` provides simple API for subscribing and unsubscribing to topics.

```go
func (s *StreamHandler) Subscribe(ctx context.Context, consumerID string, topic string) error


func (s *StreamHandler) Unsubscribe(ctx context.Context, consumerID string, topic string) error
```

These methods can be used to add or remove subscriptions for a consumer.

If you want to give subscription control to the client look at
[the implementation](examples/nats_example) in the example.

To know more, check out the [example](examples/nats_example)

## Why Server Sent Events (SSE) ?
Expand All @@ -168,10 +209,10 @@ One reason SSEs have been kept in the shadow is because later APIs like
WebSockets provide a richer protocol to perform bi-directional, full-duplex
communication. However, in some scenarios data doesn't need to be sent from the
client. You simply need updates from some server action. A few examples would
be friends' status updates, stock tickers, news feeds, or other automated data
push mechanisms (e.g. updating a client-side Web SQL Database or IndexedDB
object store). If you'll need to send data to a server, Fetch API is always a
friend.
be status updates, tweet likes, tweet retweets, tickers, news feeds, or other
automated data push mechanisms (e.g. updating a client-side Web SQL Database or
IndexedDB object store). If you'll need to send data to a server, Fetch API is
always a friend.

SSEs are sent over traditional HTTP. That means they do not require a special
protocol or server implementation to get working. WebSockets on the other hand,
Expand All @@ -193,9 +234,7 @@ Slack Workspace for questions and discussions.
## Future Goals:

- Standalone server that can be configured by yaml, while also staying modular.
- Making it horizontally scalabe using [raft](https://raft.github.io/) consensus algorithm.
- Creating a website for documentation & overview, and some examples.
- Become a [CNCF](https://cncf.io) project (...maybe).

## Users

Expand Down
99 changes: 0 additions & 99 deletions client.go

This file was deleted.

6 changes: 3 additions & 3 deletions examples/nats_example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ Connect to the event streamer using the [EventSource](https://developer.mozilla.
in the browser or by using good old curl

```sh
curl 'localhost:3000/events?id=custom_client'
curl 'localhost:3000/events?id=unique_id'
```

And in another terminal update the subscriptions of the client
And in another terminal update the subscriptions of the consumer

```sh
curl -d '{"client_id": "custom_client", "add": ["topic1", "topic2"], "remove": []}' -XPOST 'localhost:3000/update_subscriptions'
curl -d '{"consumer_id": "unique_id", "add": ["topic1", "topic2"], "remove": []}' -XPOST 'localhost:3000/update_subscriptions'
```
10 changes: 10 additions & 0 deletions examples/nats_example/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module github.com/unifrost/examples/nats_example

go 1.14

require (
github.com/nats-io/nats.go v1.10.0
github.com/unifrost/unifrost v0.1.0
gocloud.dev v0.19.0
gocloud.dev/pubsub/natspubsub v0.19.0
)
Loading

0 comments on commit 6336ad3

Please sign in to comment.