Skip to content

Commit

Permalink
pkg/events: Add support for a global CloudAMQP rabbit setup (#80)
Browse files Browse the repository at this point in the history
* pkg/event: Set server name in TLS config

* docker: Install CA certificates

* Revert "pkg/event: Set server name in TLS config"

This reverts commit f651a11.

* Dockerfile: a little more strict on alpine

* Revert "Revert "pkg/event: Set server name in TLS config""

This reverts commit d5b6959.

* pkg/event: Remove port from server certificate check

* pkg/event: Remove port fallback logic

wtf

* reducers: Allow changing stream state exchange

* cmd/analyzer: Make the default stream state exchange mist

* pkg/event: Set tls stream name on healthcheck as well

* health: Stop checking connection on healthcheck

not scaling with cloud rabbits

* reducers: Make stream state non optional
  • Loading branch information
victorges authored Nov 14, 2022
1 parent 1ac553b commit 147c919
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 23 deletions.
4 changes: 3 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ COPY . .

RUN make "version=$version"

FROM alpine:latest
FROM alpine:3.16

RUN apk add --no-cache ca-certificates

WORKDIR /app

Expand Down
10 changes: 6 additions & 4 deletions cmd/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ type cliFlags struct {
rabbitmqUri string
amqpUri string

golivepeerExchange string
shardPrefixesFlag string
shardPrefixes []string
golivepeerExchange string
shardPrefixesFlag string
shardPrefixes []string
streamStateExchange string

serverOpts api.ServerOptions
streamingOpts health.StreamingOptions
Expand All @@ -53,6 +54,7 @@ func parseFlags(version string) cliFlags {

fs.StringVar(&cli.golivepeerExchange, "golivepeer-exchange", "lp_golivepeer_metadata", "Name of RabbitMQ exchange to bind the stream to on creation")
fs.StringVar(&cli.shardPrefixesFlag, "shard-prefixes", "", "Comma-separated list of prefixes of manifest IDs to process events from")
fs.StringVar(&cli.streamStateExchange, "stream-state-exchange", "lp_mist_api_connector", "Name of RabbitMQ exchange where to receive stream state events")

// Server options
fs.StringVar(&cli.serverOpts.Host, "host", "localhost", "Hostname to bind to")
Expand Down Expand Up @@ -131,7 +133,7 @@ func Run(build BuildFlags) {
}
defer consumer.Close()

reducer := reducers.Default(cli.golivepeerExchange, cli.shardPrefixes)
reducer := reducers.Default(cli.golivepeerExchange, cli.shardPrefixes, cli.streamStateExchange)
healthcore := health.NewCore(health.CoreOptions{
Streaming: cli.streamingOpts,
StartTimeOffset: reducers.DefaultStarTimeOffset(),
Expand Down
5 changes: 0 additions & 5 deletions health/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,6 @@ func NewCore(opts CoreOptions, consumer event.StreamConsumer, reducer Reducer) *
}

func (c *Core) IsHealthy() bool {
err := c.consumer.CheckConnection()
if err != nil {
glog.Warningf("Health core is unhealthy. reason=consumerErr consumerErr=%q", err)
return false
}
if tol := c.opts.Streaming.EventFlowSilenceTolerance; tol > 0 && time.Since(c.lastEventTs) > tol {
glog.Warningf("Health core is unhealthy. reason=noEvents lastEventTs=%s, tolerance=%s", c.lastEventTs, tol)
return false
Expand Down
4 changes: 2 additions & 2 deletions health/reducers/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ var (
maxStatsWindow = statsWindows[len(statsWindows)-1]
)

func Default(golpExchange string, shardPrefixes []string) health.Reducer {
func Default(golpExchange string, shardPrefixes []string, streamStateExchange string) health.Reducer {
return Pipeline{
StreamStateReducer{},
StreamStateReducer{streamStateExchange},
TranscodeReducer{golpExchange, shardPrefixes},
MultistreamReducer{},
MediaServerMetrics{},
Expand Down
7 changes: 4 additions & 3 deletions health/reducers/stream_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
)

const (
globalExchange = "lp_global_replication"
streamStateBindingKey = "stream.state.#"

ConditionActive data.ConditionType = "Active"
Expand All @@ -20,10 +19,12 @@ type ActiveConditionExtraData struct {
Region string `json:"region"`
}

type StreamStateReducer struct{}
type StreamStateReducer struct {
exchange string
}

func (t StreamStateReducer) Bindings() []event.BindingArgs {
return []event.BindingArgs{{Exchange: globalExchange, Key: streamStateBindingKey}}
return []event.BindingArgs{{Exchange: t.exchange, Key: streamStateBindingKey}}
}

func (t StreamStateReducer) Conditions() []data.ConditionType {
Expand Down
19 changes: 11 additions & 8 deletions pkg/event/stream_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package event

import (
"context"
"crypto/tls"
"errors"
"fmt"
"net/url"
Expand Down Expand Up @@ -36,9 +37,7 @@ func NewStreamConsumer(streamUriStr, amqpUriStr string) (StreamConsumer, error)
return nil, err
}
glog.Infof("Connecting to RabbitMQ. streamUri=%q, amqpUri=%q", streamUri.Redacted(), amqpUri.Redacted())
opts := stream.NewEnvironmentOptions().
SetMaxConsumersPerClient(5).
SetUri(streamUri.String())
opts := baseEnvOpts(streamUri).SetMaxConsumersPerClient(5)
env, err := stream.NewEnvironment(opts)
if err != nil {
return nil, err
Expand All @@ -53,8 +52,7 @@ func (c *strmConsumer) Close() error {

func (c *strmConsumer) CheckConnection() error {
// create separate env for test to avoid infinite connect retry from lib
env, err := stream.NewEnvironment(
stream.NewEnvironmentOptions().SetUri(c.streamUri.String()))
env, err := stream.NewEnvironment(baseEnvOpts(c.streamUri))
if err != nil {
return err
}
Expand All @@ -66,6 +64,14 @@ func (c *strmConsumer) CheckConnection() error {
return env.Close()
}

func baseEnvOpts(uri *url.URL) *stream.EnvironmentOptions {
return stream.NewEnvironmentOptions().
SetUri(uri.String()).
SetTLSConfig(&tls.Config{
ServerName: uri.Hostname(),
})
}

func (c *strmConsumer) ConsumeChan(ctx context.Context, opts ConsumeOptions) (<-chan StreamMessage, error) {
err := c.ensureStream(opts.Stream, opts.StreamOptions)
if err != nil {
Expand Down Expand Up @@ -267,9 +273,6 @@ func coalesceUri(value *url.URL, fallback url.URL) *url.URL {
if result.Scheme == "" {
result.Scheme = fallback.Scheme
}
if result.Port() == "" && fallback.Port() != "" {
result.Host += ":" + fallback.Port()
}
if u := result.User; u == nil || u.String() == "" {
result.User = fallback.User
if u := result.User; u == nil || u.String() == "" {
Expand Down

0 comments on commit 147c919

Please sign in to comment.