Skip to content
This repository has been archived by the owner on Dec 28, 2024. It is now read-only.

Commit

Permalink
feat: add HTTPBroadcastHandler to embed
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Mar 6, 2024
1 parent af6f559 commit 330f481
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 11 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ test-conformance-http: tmp/anycable-go-test
\
ANYCABLE_BROADCAST_ADAPTER=http ANYCABLE_HTTP_BROADCAST_SECRET=any_secret \
ANYCABLE_HTTP_RPC_SECRET=rpc_secret ANYCABLE_HTTP_RPC_MOUNT_PATH=/_anycable \
ANYCABLE_HTTP_BROADCAST_URL=http://localhost:8080/_anycable \
ANYCABLE_HTTP_BROADCAST_URL=http://localhost:8080/_broadcast \
bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token --rpc_host=http://localhost:9292/_anycable" --target-url="ws://localhost:8080/cable" --require=etc/anyt/broadcast_tests/*.rb

test-conformance-nats: tmp/anycable-go-test
Expand All @@ -174,7 +174,7 @@ test-conformance-broker-nats: tmp/anycable-go-test
test-conformance-embedded: tmp/anycable-embedded-test
\
ANYCABLE_BROADCAST_ADAPTER=http ANYCABLE_HTTP_BROADCAST_SECRET=any_secret \
ANYCABLE_HTTP_BROADCAST_URL=http://localhost:8080/_anycable \
ANYCABLE_HTTP_BROADCAST_URL=http://localhost:8080/broadcast \
ANYCABLE_HTTP_RPC_SECRET=rpc_secret ANYCABLE_HTTP_RPC_MOUNT_PATH=/_anycable \
ANYCABLE_RPC_HOST=http://localhost:9292/_anycable \
ANYCABLE_HEADERS=cookie,x-api-token \
Expand Down
27 changes: 19 additions & 8 deletions broadcast/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,9 @@ func (HTTPBroadcaster) IsFanout() bool {
return false
}

// Start creates an HTTP server or attaches a handler to the existing one
func (s *HTTPBroadcaster) Start(done chan (error)) error {
server, err := server.ForPort(strconv.Itoa(s.port))

if err != nil {
return err
}

// Prepare configures the broadcaster to make it ready to accept requests
// (i.e., calculates the authentication token, etc.)
func (s *HTTPBroadcaster) Prepare() error {
authHeader := ""

if s.conf.Secret == "" && s.conf.SecretBase != "" {
Expand All @@ -97,6 +92,22 @@ func (s *HTTPBroadcaster) Start(done chan (error)) error {

s.authHeader = authHeader

return nil
}

// Start creates an HTTP server or attaches a handler to the existing one
func (s *HTTPBroadcaster) Start(done chan (error)) error {
server, err := server.ForPort(strconv.Itoa(s.port))

if err != nil {
return err
}

err = s.Prepare()
if err != nil {
return err
}

s.server = server
s.server.SetupHandler(s.path, http.HandlerFunc(s.Handler))

Expand Down
14 changes: 14 additions & 0 deletions cli/embed.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net/http"

"github.com/anycable/anycable-go/broadcast"
"github.com/anycable/anycable-go/node"
"github.com/anycable/anycable-go/utils"
"github.com/anycable/anycable-go/version"
Expand Down Expand Up @@ -40,6 +41,19 @@ func (e *Embedded) SSEHandler(ctx context.Context) (http.Handler, error) {
return sseHandler, nil
}

// HTTPBroadcastHandler returns an HTTP handler to process broadcasting requests
func (e *Embedded) HTTPBroadcastHandler() (http.Handler, error) {
broadcaster := broadcast.NewHTTPBroadcaster(e.n, &e.r.config.HTTPBroadcast, e.r.log)

err := broadcaster.Prepare()

if err != nil {
return nil, err
}

return http.HandlerFunc(broadcaster.Handler), nil
}

// Shutdown stops the AnyCable node gracefully.
func (e *Embedded) Shutdown(ctx context.Context) error {
for _, shutdownable := range e.r.shutdownables {
Expand Down
9 changes: 8 additions & 1 deletion cmd/embedded-cable/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ func main() {
cli.WithDefaultRPCController(),
cli.WithDefaultBroker(),
cli.WithDefaultSubscriber(),
cli.WithDefaultBroadcaster(),
cli.WithTelemetry(),
cli.WithLogger(logger),
}
Expand Down Expand Up @@ -56,8 +55,16 @@ func main() {
os.Exit(1)
}

broadcastHandler, err := anycable.HTTPBroadcastHandler()

if err != nil {
fmt.Printf("%+v\n", err)
os.Exit(1)
}

http.Handle("/cable", wsHandler)
http.Handle("/sse", seeHandler)
http.Handle("/broadcast", broadcastHandler)

go http.ListenAndServe(":8080", nil) // nolint:errcheck,gosec

Expand Down

0 comments on commit 330f481

Please sign in to comment.