diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a65968c..47eb5a11 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - [#498](https://github.com/spegel-org/spegel/pull/498) Update to Go 1.22. - [#499](https://github.com/spegel-org/spegel/pull/499) Add paralleltest linter and set all unit tests to run in parallel. - [#501](https://github.com/spegel-org/spegel/pull/501) Rename mock router to memory router and add tests. +- [#494](https://github.com/spegel-org/spegel/pull/494) Add visualization tool to debug image sources. ### Deprecated diff --git a/charts/spegel/README.md b/charts/spegel/README.md index f7394231..3ed2f58f 100644 --- a/charts/spegel/README.md +++ b/charts/spegel/README.md @@ -101,5 +101,6 @@ spec: | spegel.registries | list | `["https://cgr.dev","https://docker.io","https://ghcr.io","https://quay.io","https://mcr.microsoft.com","https://public.ecr.aws","https://gcr.io","https://registry.k8s.io","https://k8s.gcr.io","https://lscr.io"]` | Registries for which mirror configuration will be created. | | spegel.resolveLatestTag | bool | `true` | When true latest tags will be resolved to digests. | | spegel.resolveTags | bool | `true` | When true Spegel will resolve tags to digests. | +| spegel.visualize.enabled | bool | `false` | When true registry requests will be recorded and UI will be served. | | tolerations | list | `[{"key":"CriticalAddonsOnly","operator":"Exists"},{"effect":"NoExecute","operator":"Exists"},{"effect":"NoSchedule","operator":"Exists"}]` | Tolerations for pod assignment. | | updateStrategy | object | `{}` | An update strategy to replace existing pods with new pods. | \ No newline at end of file diff --git a/charts/spegel/templates/daemonset.yaml b/charts/spegel/templates/daemonset.yaml index a1b01d7b..d5108e28 100644 --- a/charts/spegel/templates/daemonset.yaml +++ b/charts/spegel/templates/daemonset.yaml @@ -78,6 +78,7 @@ spec: - --registry-addr=:{{ .Values.service.registry.port }} - --router-addr=:{{ .Values.service.router.port }} - --metrics-addr=:{{ .Values.service.metrics.port }} + - --visualize-enabled={{ .Values.spegel.visualize.enabled }} {{- with .Values.spegel.registries }} - --registries {{- range . }} diff --git a/charts/spegel/values.yaml b/charts/spegel/values.yaml index 02c406fd..04b5578c 100644 --- a/charts/spegel/values.yaml +++ b/charts/spegel/values.yaml @@ -160,3 +160,6 @@ spegel: blobSpeed: "" # -- When true existing mirror configuration will be appended to instead of replaced. appendMirrors: false + visualize: + # -- When true registry requests will be recorded and UI will be served. + enabled: false diff --git a/docs/FAQ.md b/docs/FAQ.md index 89a1ae4d..2a3f08bd 100644 --- a/docs/FAQ.md +++ b/docs/FAQ.md @@ -13,20 +13,23 @@ Read the [benchmark documentation](./BENCHMARK.md) for information of expected g ## How do I know that Spegel is working? -Spegel is meant to be a painless experience to install, meaning that it may be difficult initially to know if things are working or not. Simply put a good indicator that things are working is if all Spegel pods have started and are in a ready state. -Spegel does a couple of checks on startup to verify that any required configuration is correct, if it is not it will exit with an error. While it runs it will log all received requests, both those it mirrors and it serves. +Spegel is meant to be a painless experience to install. Image pulls will fallback to the original registry if Spegel does not work, meaning that it can be difficult to determine if things are working or not. Spegel has a UI that visualizes incoming and outgoing requests, this will allow you understand if images are pulled from other Spegel instances or not. -An incoming request to Spegel that is mirrored will receive the following log. +The UI is disabled by default as it adds additional overhead. To access the UI enable the feature in the Helm values. -``` -{"level":"info","ts":1692304805.9038486,"caller":"gin@v0.0.9/logger.go:53","msg":"","path":"/v2/library/nginx/blobs/sha256:1cb127bd932119089b5ffb612ffa84537ddd1318e6784f2fce80916bbb8bd166","status":200,"method":"GET","latency":0.005075836,"ip":"172.18.0.5","handler":"mirror"} +```yaml +spegel: + visualize: + enabled: true ``` -While the Spegel instance on the other end will log. +After all Spegel instances have restarted you can port forward to one of the Spegel pods. +```shell +kubectl -n spegel port-forward ${POD_NAME} 9090 ``` -{"level":"info","ts":1692304805.9035861,"caller":"gin@v0.0.9/logger.go:53","msg":"","path":"/v2/library/nginx/blobs/sha256:1cb127bd932119089b5ffb612ffa84537ddd1318e6784f2fce80916bbb8bd166","status":200,"method":"GET","latency":0.003644997,"ip":"172.18.0.5","handler":"blob"} -``` + +Open the UI at `http://localhost:9090/visualize` in a browser. If all is configured propery you should be presented with and interface showing registry requests. ## Will image pulls break or be delayed if a spegel instance fails or is removed? diff --git a/main.go b/main.go index 907e844d..6c06dd36 100644 --- a/main.go +++ b/main.go @@ -29,6 +29,7 @@ import ( "github.com/spegel-org/spegel/pkg/routing" "github.com/spegel-org/spegel/pkg/state" "github.com/spegel-org/spegel/pkg/throttle" + "github.com/spegel-org/spegel/pkg/visualize" ) type ConfigurationCmd struct { @@ -63,6 +64,7 @@ type RegistryCmd struct { MirrorResolveTimeout time.Duration `arg:"--mirror-resolve-timeout,env:MIRROR_RESOLVE_TIMEOUT" default:"5s" help:"Max duration spent finding a mirror."` MirrorResolveRetries int `arg:"--mirror-resolve-retries,env:MIRROR_RESOLVE_RETRIES" default:"3" help:"Max amount of mirrors to attempt."` ResolveLatestTag bool `arg:"--resolve-latest-tag,env:RESOLVE_LATEST_TAG" default:"true" help:"When true latest tags will be resolved to digests."` + VisualizeEnabled bool `arg:"--visualize-enabled,env:VISUALIZE_ENABLED" default:"false" help:"When true visualizer will run and record events."` } type Arguments struct { @@ -141,6 +143,11 @@ func registryCommand(ctx context.Context, args *RegistryCmd) (err error) { mux.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate")) mux.Handle("/debug/pprof/block", pprof.Handler("block")) mux.Handle("/debug/pprof/mutex", pprof.Handler("mutex")) + var eventStore visualize.EventStore + if args.VisualizeEnabled { + eventStore = visualize.NewMemoryStore() + mux.Handle("/visualize/", visualize.Handler(eventStore)) + } metricsSrv := &http.Server{ Addr: args.MetricsAddr, Handler: mux, @@ -195,6 +202,7 @@ func registryCommand(ctx context.Context, args *RegistryCmd) (err error) { registry.WithResolveTimeout(args.MirrorResolveTimeout), registry.WithLocalAddress(args.LocalAddr), registry.WithLogger(log), + registry.WithEventStore(eventStore), } if args.BlobSpeed != nil { registryOpts = append(registryOpts, registry.WithBlobSpeed(*args.BlobSpeed)) diff --git a/pkg/registry/distribution.go b/pkg/registry/distribution.go index 9a8d0113..980b7025 100644 --- a/pkg/registry/distribution.go +++ b/pkg/registry/distribution.go @@ -31,6 +31,13 @@ func (r reference) hasLatestTag() bool { return tag == "latest" } +func (r reference) tagOrDigest() string { + if r.name != "" { + return r.name + } + return r.dgst.String() +} + // Package is used to parse components from requests which comform with the OCI distribution spec. // https://github.com/opencontainers/distribution-spec/blob/main/spec.md // /v2//manifests/ diff --git a/pkg/registry/registry.go b/pkg/registry/registry.go index d7c6e7f8..f3edd080 100644 --- a/pkg/registry/registry.go +++ b/pkg/registry/registry.go @@ -8,6 +8,7 @@ import ( "net" "net/http" "net/http/httputil" + "net/netip" "net/url" "path" "strconv" @@ -21,6 +22,7 @@ import ( "github.com/spegel-org/spegel/pkg/oci" "github.com/spegel-org/spegel/pkg/routing" "github.com/spegel-org/spegel/pkg/throttle" + "github.com/spegel-org/spegel/pkg/visualize" ) const ( @@ -28,6 +30,7 @@ const ( ) type Registry struct { + eventStore visualize.EventStore log logr.Logger throttler *throttle.Throttler ociClient oci.Client @@ -83,6 +86,12 @@ func WithLogger(log logr.Logger) Option { } } +func WithEventStore(eventStore visualize.EventStore) Option { + return func(r *Registry) { + r.eventStore = eventStore + } +} + func NewRegistry(ociClient oci.Client, router routing.Router, opts ...Option) *Registry { r := &Registry{ ociClient: ociClient, @@ -188,6 +197,17 @@ func (r *Registry) registryHandler(rw mux.ResponseWriter, req *http.Request) str return "mirror" } + if r.eventStore != nil { + defer func() { + ip := getClientIP(req) + addr, err := netip.ParseAddr(ip) + if err != nil { + return + } + r.eventStore.RecordRequest(ref.tagOrDigest(), addr, req.Method, rw.Status(), false) + }() + } + // Serve registry endpoints. switch ref.kind { case referenceKindManifest: @@ -203,12 +223,7 @@ func (r *Registry) registryHandler(rw mux.ResponseWriter, req *http.Request) str } func (r *Registry) handleMirror(rw mux.ResponseWriter, req *http.Request, ref reference) { - key := ref.dgst.String() - if key == "" { - key = ref.name - } - - log := r.log.WithValues("key", key, "path", req.URL.Path, "ip", getClientIP(req)) + log := r.log.WithValues("key", ref.tagOrDigest(), "path", req.URL.Path, "ip", getClientIP(req)) isExternal := r.isExternalRequest(req) if isExternal { @@ -237,7 +252,7 @@ func (r *Registry) handleMirror(rw mux.ResponseWriter, req *http.Request, ref re resolveCtx, cancel := context.WithTimeout(req.Context(), r.resolveTimeout) defer cancel() resolveCtx = logr.NewContext(resolveCtx, log) - peerCh, err := r.router.Resolve(resolveCtx, key, isExternal, r.resolveRetries) + peerCh, err := r.router.Resolve(resolveCtx, ref.tagOrDigest(), isExternal, r.resolveRetries) if err != nil { rw.WriteError(http.StatusInternalServerError, fmt.Errorf("error occurred when attempting to resolve mirrors: %w", err)) return @@ -248,12 +263,18 @@ func (r *Registry) handleMirror(rw mux.ResponseWriter, req *http.Request, ref re select { case <-req.Context().Done(): // Request has been closed by server or client. No use continuing. - rw.WriteError(http.StatusNotFound, fmt.Errorf("mirroring for image component %s has been cancelled: %w", key, resolveCtx.Err())) + rw.WriteError(http.StatusNotFound, fmt.Errorf("mirroring for image component %s has been cancelled: %w", ref.tagOrDigest(), resolveCtx.Err())) return case ipAddr, ok := <-peerCh: // Channel closed means no more mirrors will be received and max retries has been reached. if !ok { - err = fmt.Errorf("mirror with image component %s could not be found", key) + // Register not found if no mirror attempts have been made. + fmt.Println("mirror channel closed", ref.tagOrDigest(), mirrorAttempts) + if r.eventStore != nil && mirrorAttempts == 0 { + r.eventStore.RecordNoMirrors(ref.tagOrDigest()) + } + + err = fmt.Errorf("mirror with image component %s could not be found", ref.tagOrDigest()) if mirrorAttempts > 0 { err = errors.Join(err, fmt.Errorf("requests to %d mirrors failed, all attempts have been exhausted or timeout has been reached", mirrorAttempts)) } @@ -288,6 +309,12 @@ func (r *Registry) handleMirror(rw mux.ResponseWriter, req *http.Request, ref re return nil } proxy.ServeHTTP(rw, req) + + // Track image events if enabled + if r.eventStore != nil { + r.eventStore.RecordRequest(ref.tagOrDigest(), ipAddr.Addr(), req.Method, rw.Status(), true) + } + if !succeeded { break } diff --git a/pkg/visualize/store.go b/pkg/visualize/store.go new file mode 100644 index 00000000..671854b3 --- /dev/null +++ b/pkg/visualize/store.go @@ -0,0 +1,150 @@ +package visualize + +import ( + "net/http" + "net/netip" + "sync" + "time" +) + +type GraphData struct { + Nodes []Node `json:"nodes"` + Links []Link `json:"links"` +} + +type Node struct { + ID string `json:"id"` +} + +type Link struct { + ID string `json:"id"` + Source string `json:"source"` + Target string `json:"target"` + Status int `json:"status"` +} + +type EventStore interface { + RecordNoMirrors(id string) + RecordRequest(id string, peer netip.Addr, method string, status int, mirror bool) + FilterByDirection(rootIsSource bool) EventStore + Graph() GraphData + LastModified() time.Time +} + +type edge struct { + node string + id string + status int + rootIsSource bool +} + +var _ EventStore = &MemoryStore{} + +type MemoryStore struct { + lastModified time.Time + edgeIndex map[string]int + edges []edge + mx sync.RWMutex +} + +func NewMemoryStore() *MemoryStore { + return &MemoryStore{ + edges: []edge{}, + edgeIndex: map[string]int{}, + } +} + +func (m *MemoryStore) set(e edge) { + m.mx.Lock() + defer m.mx.Unlock() + + m.lastModified = time.Now() + if idx, ok := m.edgeIndex[e.id]; ok { + m.edges[idx] = e + return + } + m.edges = append(m.edges, e) + m.edgeIndex[e.id] = len(m.edges) - 1 +} + +func (m *MemoryStore) RecordNoMirrors(id string) { + e := edge{ + node: "Not Found", + id: id, + rootIsSource: true, + } + m.set(e) +} + +func (m *MemoryStore) RecordRequest(id string, peer netip.Addr, method string, status int, mirror bool) { + if method != http.MethodGet { + return + } + e := edge{ + node: peer.String(), + id: id, + status: status, + rootIsSource: mirror, + } + m.set(e) +} + +func (m *MemoryStore) FilterByDirection(rootIsSource bool) EventStore { //nolint: ireturn // Have to return interface to implement interface. + m.mx.RLock() + defer m.mx.RUnlock() + + f := NewMemoryStore() + f.lastModified = m.lastModified + for _, edge := range m.edges { + if edge.rootIsSource != rootIsSource { + continue + } + f.edges = append(f.edges, edge) + f.edgeIndex[edge.id] = len(f.edges) - 1 + } + return f +} + +func (m *MemoryStore) Graph() GraphData { + m.mx.RLock() + defer m.mx.RUnlock() + + gd := GraphData{ + Nodes: []Node{ + { + ID: "self", + }, + }, + Links: []Link{}, + } + nodeIndex := map[string]interface{}{} + for _, edge := range m.edges { + src := gd.Nodes[0].ID + dest := edge.node + if !edge.rootIsSource { + src = edge.node + dest = gd.Nodes[0].ID + } + link := Link{ + ID: edge.id, + Source: src, + Target: dest, + Status: edge.status, + } + gd.Links = append(gd.Links, link) + + if _, ok := nodeIndex[edge.node]; ok { + continue + } + gd.Nodes = append(gd.Nodes, Node{ID: edge.node}) + nodeIndex[edge.node] = nil + } + return gd +} + +func (m *MemoryStore) LastModified() time.Time { + m.mx.RLock() + defer m.mx.RUnlock() + + return m.lastModified +} diff --git a/pkg/visualize/store_test.go b/pkg/visualize/store_test.go new file mode 100644 index 00000000..782b2878 --- /dev/null +++ b/pkg/visualize/store_test.go @@ -0,0 +1,116 @@ +package visualize + +import ( + "net/http" + "net/netip" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMemoryStore(t *testing.T) { + t.Parallel() + + store := NewMemoryStore() + store.RecordRequest("one", netip.MustParseAddr("127.0.0.1"), http.MethodGet, http.StatusOK, true) + store.RecordRequest("two", netip.MustParseAddr("127.0.0.1"), http.MethodGet, http.StatusNotFound, true) + store.RecordRequest("three", netip.MustParseAddr("10.0.0.0"), http.MethodGet, http.StatusOK, false) + + tests := []struct { + name string + store EventStore + expectedNodes []Node + expectedLinks []Link + }{ + { + name: "no filter", + store: store, + expectedNodes: []Node{ + { + ID: "self", + }, + { + ID: "127.0.0.1", + }, + { + ID: "10.0.0.0", + }, + }, + expectedLinks: []Link{ + { + ID: "one", + Source: "self", + Target: "127.0.0.1", + Status: http.StatusOK, + }, + { + ID: "two", + Source: "self", + Target: "127.0.0.1", + Status: http.StatusNotFound, + }, + { + ID: "three", + Source: "10.0.0.0", + Target: "self", + Status: http.StatusOK, + }, + }, + }, + { + name: "only from root", + store: store.FilterByDirection(true), + expectedNodes: []Node{ + { + ID: "self", + }, + { + ID: "127.0.0.1", + }, + }, + expectedLinks: []Link{ + { + ID: "one", + Source: "self", + Target: "127.0.0.1", + Status: http.StatusOK, + }, + { + ID: "two", + Source: "self", + Target: "127.0.0.1", + Status: http.StatusNotFound, + }, + }, + }, + { + name: "only to root", + store: store.FilterByDirection(false), + expectedNodes: []Node{ + { + ID: "self", + }, + { + ID: "10.0.0.0", + }, + }, + expectedLinks: []Link{ + { + ID: "three", + Source: "10.0.0.0", + Target: "self", + Status: http.StatusOK, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + gd := tt.store.Graph() + require.ElementsMatch(t, tt.expectedNodes, gd.Nodes) + require.ElementsMatch(t, tt.expectedLinks, gd.Links) + }) + } +} diff --git a/pkg/visualize/visualize.go b/pkg/visualize/visualize.go new file mode 100644 index 00000000..72c4790e --- /dev/null +++ b/pkg/visualize/visualize.go @@ -0,0 +1,195 @@ +package visualize + +import ( + "encoding/hex" + "encoding/json" + "hash/fnv" + "net/http" + "strconv" + + "github.com/spegel-org/spegel/internal/mux" +) + +func Handler(store EventStore) http.Handler { + handler := func(rw mux.ResponseWriter, req *http.Request) { + switch req.URL.Path { + case "/visualize/": + indexHandler(rw, req) + case "/visualize/graph": + graphHandler(rw, req, store) + default: + rw.WriteHeader(http.StatusNotFound) + } + } + return mux.NewServeMux(handler) +} + +func indexHandler(rw mux.ResponseWriter, _ *http.Request) { + index := ` + + + + Spegel + + + + + + + + +
+
+

Spegel

+
+
+ Request Direction + + + + + + + + + +
+
+
+ +
+
+ + + +` + //nolint: errcheck // Ignore error. + rw.Write([]byte(index)) +} + +func graphHandler(rw mux.ResponseWriter, req *http.Request, store EventStore) { + directionFilter := req.URL.Query().Get("direction") + if directionFilter != "" { + isRootSource, err := strconv.ParseBool(directionFilter) + if err != nil { + rw.WriteError(http.StatusBadRequest, err) + return + } + store = store.FilterByDirection(isRootSource) + } + eTagValue := directionFilter + "-" + store.LastModified().String() + hash := fnv.New32a() + _, err := hash.Write([]byte(eTagValue)) + if err != nil { + rw.WriteError(http.StatusInternalServerError, err) + return + } + eTag := hex.EncodeToString(hash.Sum(nil)) + if eTag == req.Header.Get("If-None-Match") { + rw.WriteHeader(http.StatusNotModified) + return + } + rw.Header().Set("etag", eTag) + gd := store.Graph() + b, err := json.Marshal(&gd) + if err != nil { + rw.WriteError(http.StatusInternalServerError, err) + return + } + //nolint: errcheck // Ignore error. + rw.Write(b) +} diff --git a/test/e2e/e2e.sh b/test/e2e/e2e.sh index 8bd2d473..d14d3b71 100755 --- a/test/e2e/e2e.sh +++ b/test/e2e/e2e.sh @@ -44,10 +44,10 @@ else for NODE in control-plane worker2 worker3 worker4 do NAME=$KIND_NAME-$NODE - docker exec $NAME ctr -n k8s.io image rm docker.io/library/nginx:1.21.0@sha256:2f1cd90e00fe2c991e18272bb35d6a8258eeb27785d121aa4cc1ae4235167cfd - docker exec $NAME ctr -n k8s.io image rm docker.io/library/nginx:1.23.0 - docker exec $NAME ctr -n k8s.io image rm docker.io/library/nginx@sha256:b3a676a9145dc005062d5e79b92d90574fb3bf2396f4913dc1732f9065f55c4b - docker exec $NAME ctr -n k8s.io image rm mcr.microsoft.com/containernetworking/azure-cns@sha256:7944413c630746a35d5596f56093706e8d6a3db0569bec0c8e58323f965f7416 + docker exec $NAME crictl rmi docker.io/library/nginx:1.21.0@sha256:2f1cd90e00fe2c991e18272bb35d6a8258eeb27785d121aa4cc1ae4235167cfd || true + docker exec $NAME crictl rmi docker.io/library/nginx:1.23.0 || true + docker exec $NAME crictl rmi docker.io/library/nginx@sha256:b3a676a9145dc005062d5e79b92d90574fb3bf2396f4913dc1732f9065f55c4b || true + docker exec $NAME crictl rmi mcr.microsoft.com/containernetworking/azure-cns@sha256:7944413c630746a35d5596f56093706e8d6a3db0569bec0c8e58323f965f7416 || true done # Delete Spegel from all nodes