Skip to content

Commit

Permalink
add cli command, fix migration, add ingress saving
Browse files Browse the repository at this point in the history
  • Loading branch information
TimVosch committed Jan 13, 2025
1 parent 6da6ced commit f931ecd
Show file tree
Hide file tree
Showing 9 changed files with 303 additions and 167 deletions.
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ require (
github.com/containerd/containerd v1.7.14 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/cpuguy83/dockercfg v0.3.1 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/distribution/reference v0.5.0 // indirect
github.com/docker/go-connections v0.5.0 // indirect
Expand Down Expand Up @@ -119,16 +120,19 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/rs/xid v1.4.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/shirou/gopsutil/v3 v3.24.2 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/tklauser/go-sysconf v0.3.13 // indirect
github.com/tklauser/numcpus v0.7.0 // indirect
github.com/ulikunitz/xz v0.5.9 // indirect
github.com/urfave/cli/v2 v2.27.5 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0 // indirect
go.opentelemetry.io/otel/sdk v1.32.0 // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3
github.com/cpuguy83/dockercfg v0.3.1 h1:/FpZ+JaygUR/lZP2NlFI2DVfrOEMAIKP5wWEJdoYe9E=
github.com/cpuguy83/dockercfg v0.3.1/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc=
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/cpuguy83/go-md2man/v2 v2.0.5 h1:ZtcqGrnekaHpVLArFSe4HK5DoKx1T0rq2DwVB0alcyc=
github.com/cpuguy83/go-md2man/v2 v2.0.5/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY=
github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -266,6 +268,8 @@ github.com/rs/cors v1.8.2 h1:KCooALfAYGs415Cwu5ABvv9n9509fSiG5SQJn/AQo4U=
github.com/rs/cors v1.8.2/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
github.com/rs/xid v1.4.0 h1:qd7wPTDkN6KQx2VmMBLrpHkiyQwgFXRnkOLacUiaSNY=
github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/russross/blackfriday v1.6.0 h1:KqfZb0pUVN2lYqZUYRddxF4OR8ZMURnJIG5Y3VRLtww=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/samber/lo v1.39.0 h1:4gTz1wUhNYLhFSKl6O+8peW0v2F4BCY034GRpU9WnuA=
github.com/samber/lo v1.39.0/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
Expand Down Expand Up @@ -306,6 +310,8 @@ github.com/twpayne/go-geom v1.5.7/go.mod h1:y4fTAQtLedXW8eG2Yo4tYrIGN1yIwwKkmA+K
github.com/ulikunitz/xz v0.5.8/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14=
github.com/ulikunitz/xz v0.5.9 h1:RsKRIA2MO8x56wkkcd3LbtcE/uMszhb6DpRf+3uwa3I=
github.com/ulikunitz/xz v0.5.9/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14=
github.com/urfave/cli/v2 v2.27.5 h1:WoHEJLdsXr6dDWoJgMq/CboDmyY/8HMMH1fTECbih+w=
github.com/urfave/cli/v2 v2.27.5/go.mod h1:3Sevf16NykTbInEnD0yKkjDAeZDS0A6bzhBH5hrMvTQ=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.30.0/go.mod h1:2rsYD01CKFrjjsvFxx75KlEUNpWNBY9JWD3K/7o2Cus=
Expand All @@ -316,6 +322,8 @@ github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 h1:nIPpBwaJSVYIxUFsDv3M8ofmx9yWTog9BfvIu0q41lo=
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8/go.mod h1:HUYIGzjTL3rfEspMxjDjgmT5uz5wzYJKVo23qUhYTos=
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4=
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
Expand Down
26 changes: 26 additions & 0 deletions services/tracing/cmd/cleanDatabase.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package cmd

import (
"fmt"

"github.com/urfave/cli/v2"
"sensorbucket.nl/sensorbucket/services/tracing/tracing"
)

func cmdCleanDatabase(cmd *cli.Context) error {
db, err := createDB()
if err != nil {
return fmt.Errorf("could not create database connection: %w", err)
}

svc := tracing.Create(db)
if err := svc.PeriodicCleanup(); err != nil {
return fmt.Errorf("could not perform database cleanup: %w", err)
}

if err := db.Close(); err != nil {
return fmt.Errorf("could not properly close database: %w", err)
}

return nil
}
17 changes: 17 additions & 0 deletions services/tracing/cmd/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package cmd

import "github.com/urfave/cli/v2"

var App = &cli.App{
Action: cmdServe,
Commands: []*cli.Command{
{
Name: "serve",
Action: cmdServe,
},
{
Name: "cleanup",
Action: cmdCleanDatabase,
},
},
}
145 changes: 145 additions & 0 deletions services/tracing/cmd/serve.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package cmd

import (
"context"
"errors"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"time"

"github.com/go-chi/chi/v5"
chimw "github.com/go-chi/chi/v5/middleware"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/jmoiron/sqlx"
"github.com/rabbitmq/amqp091-go"
"github.com/urfave/cli/v2"

"sensorbucket.nl/sensorbucket/internal/buildinfo"
"sensorbucket.nl/sensorbucket/internal/cleanupper"
"sensorbucket.nl/sensorbucket/internal/env"
"sensorbucket.nl/sensorbucket/internal/web"
"sensorbucket.nl/sensorbucket/pkg/auth"
"sensorbucket.nl/sensorbucket/pkg/healthchecker"
"sensorbucket.nl/sensorbucket/pkg/mq"
"sensorbucket.nl/sensorbucket/services/tracing/migrations"
"sensorbucket.nl/sensorbucket/services/tracing/tracing"
)

var (
HTTP_ADDR = env.Could("HTTP_ADDR", ":3000")
HTTP_BASE = env.Could("HTTP_BASE", "http://localhost:3000/api")
DB_DSN = env.Must("DB_DSN")
AMQP_HOST = env.Must("AMQP_HOST")
AMQP_QUEUE_TRACES = env.Could("AMQP_QUEUE_TRACES", "tracing.traces")
AMQP_XCHG_PIPELINEMESSAGES = env.Could("AMQP_XCHG_PIPELINEMESSAGES", "pipeline.messages")
AMQP_XCHG_PIPELINEMESSAGES_TOPIC = env.Could("AMQP_XCHG_PIPELINEMESSAGES_TOPIC", "#")
AMQP_QUEUE_INGRESS = env.Could("AMQP_QUEUE_INGRESS", "archive-ingress")
AMQP_XCHG_INGRESS = env.Could("AMQP_XCHG_INGRESS", "ingress")
AMQP_XCHG_INGRESS_TOPIC = env.Could("AMQP_XCHG_INGRESS_TOPIC", "ingress.*")
AUTH_JWKS_URL = env.Could("AUTH_JWKS_URL", "http://oathkeeper:4456/.well-known/jwks.json")
)

func cmdServe(cmd *cli.Context) error {
buildinfo.Print()
cleanup := cleanupper.Create()
defer func() {
if err := cleanup.Execute(5 * time.Second); err != nil {
log.Printf("[Warn] Cleanup error(s) occured: %s\n", err)
}
}()

// Create shutdown context
ctx, cancel := signal.NotifyContext(cmd.Context, os.Interrupt)
defer cancel()

stopProfiler, err := web.RunProfiler()
if err != nil {
log.Printf("could not setup profiler server: %s\n", err)
}
cleanup.Add(stopProfiler)

db, err := createDB()
if err != nil {
return fmt.Errorf("could not create database connection: %w", err)
}

r := chi.NewRouter()
r.Use(
chimw.Logger,
auth.Authenticate(auth.NewJWKSHttpClient(AUTH_JWKS_URL)),
auth.Protect(),
)

mqConn := mq.NewConnection(AMQP_HOST)
cleanup.Add(func(ctx context.Context) error {
mqConn.Shutdown()
return nil
})
go mqConn.Start()

svc := tracing.Create(db)
transportHTTP := tracing.CreateTransport(svc)
r.Mount("/", transportHTTP)
go mq.StartQueueProcessor(mqConn,
AMQP_QUEUE_INGRESS, AMQP_XCHG_INGRESS, AMQP_XCHG_INGRESS_TOPIC,
func() mq.ProcessorFunc {
return func(delivery amqp091.Delivery) error {
if err := tracing.ProcessIngress(svc, &delivery); err != nil {
return fmt.Errorf("process ingress message: %w", err)
}
return nil
}
},
)
go mq.StartQueueProcessor(mqConn,
AMQP_QUEUE_TRACES, AMQP_XCHG_PIPELINEMESSAGES, AMQP_XCHG_PIPELINEMESSAGES_TOPIC,
func() mq.ProcessorFunc {
return func(delivery amqp091.Delivery) error {
if err := tracing.ProcessMessage(svc, &delivery, "errors"); err != nil {
return fmt.Errorf("process ingress message: %w", err)
}
return nil
}
},
)

healthShutdown := healthchecker.Create().WithEnv().WithMessagQueue(mqConn).Start(ctx)
cleanup.Add(healthShutdown)

srv := &http.Server{
Addr: HTTP_ADDR,
WriteTimeout: 5 * time.Second,
ReadTimeout: 5 * time.Second,
Handler: r,
}
cleanup.Add(func(ctx context.Context) error {
return srv.Shutdown(ctx)
})
go func() {
if err := srv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) && err != nil {
log.Printf("HTTP Server error: %v\n", err)
}
}()

log.Println("Server running, send interrupt (i.e. CTRL+C) to initiate shutdown")
<-ctx.Done()
log.Println("Shutting down... send another interrupt to force shutdown")

return nil
}

func createDB() (*sqlx.DB, error) {
db, err := sqlx.Open("pgx", DB_DSN)
if err != nil {
return nil, err
}
db.SetMaxIdleConns(2)
db.SetMaxOpenConns(10)
if err := migrations.MigratePostgres(db.DB); err != nil {
return nil, fmt.Errorf("failed to migrate db: %w", err)
}
return db, nil
}
Loading

0 comments on commit f931ecd

Please sign in to comment.