Skip to content

Commit

Permalink
Jetstream (#45)
Browse files Browse the repository at this point in the history
* add a new rusi.pubsub component and a NATS Jetstream implementation

---------

Co-authored-by: Radu Popovici <[email protected]>
  • Loading branch information
lghinet and oncicaradupopovici authored Oct 30, 2023
1 parent b00ee3b commit f1b1a82
Show file tree
Hide file tree
Showing 15 changed files with 736 additions and 1,071 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.17
go-version: 1.21

- name: Build
run: make build-linux
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.17
go-version: 1.21

- name: Set release version
run: echo "RELEASE_VERSION=${GITHUB_REF#refs/*/v}" >> $GITHUB_ENV
Expand Down
4 changes: 4 additions & 0 deletions cmd/rusid/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"rusi/pkg/custom-resource/components/middleware"
"rusi/pkg/custom-resource/components/pubsub"
"rusi/pkg/messaging"
"rusi/pkg/messaging/jetstream"
natsstreaming "rusi/pkg/messaging/nats"
"rusi/pkg/runtime"
)
Expand All @@ -17,6 +18,9 @@ func RegisterComponentFactories() (result []runtime.Option) {
pubsub.New("natsstreaming", func() messaging.PubSub {
return natsstreaming.NewNATSStreamingPubSub()
}),
pubsub.New("jetstream", func() messaging.PubSub {
return jetstream.NewJetStreamPubSub()
}),
),
runtime.WithPubsubMiddleware(
middleware.New("uppercase", func(properties map[string]string) messaging.Middleware {
Expand Down
5 changes: 3 additions & 2 deletions cmd/rusid/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"flag"
"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/klog/v2"
"net/http"
"syscall"
Expand Down Expand Up @@ -108,8 +109,8 @@ func startDiagnosticsServer(ctx context.Context, wg *sync.WaitGroup, appId strin
router.Handle("/healthz", healthcheck.HandlerFunc(options...))

if enableMetrics {
exporter := metrics.SetupPrometheusMetrics(appId)
router.HandleFunc("/metrics", exporter.ServeHTTP)
_ = metrics.SetupPrometheusMetrics(appId)
router.Handle("/metrics", promhttp.Handler())
} else {
metrics.SetNoopMeterProvider()
}
Expand Down
20 changes: 20 additions & 0 deletions examples/components/comp-jetstream.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
apiVersion: rusi.io/v1alpha1
kind: Component
metadata:
name: jetstream-pubsub
spec:
type: pubsub.jetstream
version: v1
metadata:
- name: natsURL
value: "nats://kube-worker1:31552"
- name: connectWait
value: 10s
- name: ackWaitTime
value: 50s
- name: maxInFlight
value: '1'
- name: commandsStream
value: "commands_stream"
- name: eventsStream
value: "events_stream"
12 changes: 6 additions & 6 deletions examples/components/config-node-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ metadata:
spec:
metric:
enabled: true
# tracing:
# samplingRate: '1'
# jaeger:
# useAgent: false
# collectorEndpointAddress: 'http://kube-worker1.totalsoft.local:31034/api/traces'
tracing:
samplingRate: '1'
jaeger:
useAgent: false
collectorEndpointAddress: 'linux-ts1858:4317'
# subscriberPipeline:
# handlers:
# - name: pubsub-uppercase
# type: middleware.pubsub.uppercase
pubSub:
name: natsstreaming-pubsub
name: jetstream-pubsub
112 changes: 62 additions & 50 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,77 +1,89 @@
module rusi

go 1.17
go 1.21

//https://github.com/golang/go/wiki/Modules#how-to-upgrade-and-downgrade-dependencies

require (
github.com/google/uuid v1.3.0
github.com/google/uuid v1.3.1
github.com/json-iterator/go v1.1.12
github.com/kelseyhightower/envconfig v1.4.0
github.com/nats-io/nats.go v1.16.0
github.com/nats-io/stan.go v0.10.3
github.com/nats-io/nats.go v1.31.0
github.com/nats-io/stan.go v0.10.4
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/contrib/propagators/jaeger v1.4.0
go.opentelemetry.io/otel v1.4.0
go.opentelemetry.io/otel/exporters/jaeger v1.4.0
go.opentelemetry.io/otel/exporters/prometheus v0.27.0
go.opentelemetry.io/otel/metric v0.27.0
go.opentelemetry.io/otel/sdk v1.4.0
go.opentelemetry.io/otel/sdk/metric v0.27.0
go.opentelemetry.io/otel/trace v1.4.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
google.golang.org/grpc v1.44.0
google.golang.org/protobuf v1.27.1
github.com/prometheus/client_golang v1.17.0
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/contrib/propagators/jaeger v1.20.0
go.opentelemetry.io/otel v1.19.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0
go.opentelemetry.io/otel/exporters/prometheus v0.42.0
go.opentelemetry.io/otel/metric v1.19.0
go.opentelemetry.io/otel/sdk v1.19.0
go.opentelemetry.io/otel/sdk/metric v1.19.0
go.opentelemetry.io/otel/trace v1.19.0
golang.org/x/sync v0.4.0
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.31.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.23.3
k8s.io/apiextensions-apiserver v0.23.3
k8s.io/apimachinery v0.23.3
k8s.io/client-go v0.23.3
k8s.io/klog/v2 v2.40.1
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9
k8s.io/api v0.28.3
k8s.io/apiextensions-apiserver v0.28.3
k8s.io/apimachinery v0.28.3
k8s.io/client-go v0.28.3
k8s.io/klog/v2 v2.100.1
k8s.io/utils v0.0.0-20230726121419-3b25d923346b
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/go-logr/logr v1.2.2 // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.7 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/googleapis/gnostic v0.5.5 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
github.com/imdario/mergo v0.3.6 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/nats-io/nats-server/v2 v2.7.4 // indirect
github.com/nats-io/nats-streaming-server v0.24.3 // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nats-io/nats-server/v2 v2.10.3 // indirect
github.com/nats-io/nats-streaming-server v0.25.5 // indirect
github.com/nats-io/nkeys v0.4.5 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.12.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.opentelemetry.io/otel/internal/metric v0.27.0 // indirect
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/oauth2 v0.11.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220215190005-e57b466719ef // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
k8s.io/kube-openapi v0.0.0-20220124234850-424119656bbf // indirect
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)
Loading

0 comments on commit f1b1a82

Please sign in to comment.