Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add visualization tool to debug image sources #494

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- [#498](https://github.com/spegel-org/spegel/pull/498) Update to Go 1.22.
- [#499](https://github.com/spegel-org/spegel/pull/499) Add paralleltest linter and set all unit tests to run in parallel.
- [#501](https://github.com/spegel-org/spegel/pull/501) Rename mock router to memory router and add tests.
- [#494](https://github.com/spegel-org/spegel/pull/494) Add visualization tool to debug image sources.

### Deprecated

Expand Down
1 change: 1 addition & 0 deletions charts/spegel/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,5 +101,6 @@ spec:
| spegel.registries | list | `["https://cgr.dev","https://docker.io","https://ghcr.io","https://quay.io","https://mcr.microsoft.com","https://public.ecr.aws","https://gcr.io","https://registry.k8s.io","https://k8s.gcr.io","https://lscr.io"]` | Registries for which mirror configuration will be created. |
| spegel.resolveLatestTag | bool | `true` | When true latest tags will be resolved to digests. |
| spegel.resolveTags | bool | `true` | When true Spegel will resolve tags to digests. |
| spegel.visualize.enabled | bool | `false` | When true registry requests will be recorded and UI will be served. |
| tolerations | list | `[{"key":"CriticalAddonsOnly","operator":"Exists"},{"effect":"NoExecute","operator":"Exists"},{"effect":"NoSchedule","operator":"Exists"}]` | Tolerations for pod assignment. |
| updateStrategy | object | `{}` | An update strategy to replace existing pods with new pods. |
1 change: 1 addition & 0 deletions charts/spegel/templates/daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ spec:
- --registry-addr=:{{ .Values.service.registry.port }}
- --router-addr=:{{ .Values.service.router.port }}
- --metrics-addr=:{{ .Values.service.metrics.port }}
- --visualize-enabled={{ .Values.spegel.visualize.enabled }}
{{- with .Values.spegel.registries }}
- --registries
{{- range . }}
Expand Down
3 changes: 3 additions & 0 deletions charts/spegel/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,6 @@ spegel:
blobSpeed: ""
# -- When true existing mirror configuration will be appended to instead of replaced.
appendMirrors: false
visualize:
# -- When true registry requests will be recorded and UI will be served.
enabled: false
19 changes: 11 additions & 8 deletions docs/FAQ.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,23 @@ Read the [benchmark documentation](./BENCHMARK.md) for information of expected g

## How do I know that Spegel is working?

Spegel is meant to be a painless experience to install, meaning that it may be difficult initially to know if things are working or not. Simply put a good indicator that things are working is if all Spegel pods have started and are in a ready state.
Spegel does a couple of checks on startup to verify that any required configuration is correct, if it is not it will exit with an error. While it runs it will log all received requests, both those it mirrors and it serves.
Spegel is meant to be a painless experience to install. Image pulls will fallback to the original registry if Spegel does not work, meaning that it can be difficult to determine if things are working or not. Spegel has a UI that visualizes incoming and outgoing requests, this will allow you understand if images are pulled from other Spegel instances or not.

An incoming request to Spegel that is mirrored will receive the following log.
The UI is disabled by default as it adds additional overhead. To access the UI enable the feature in the Helm values.

```
{"level":"info","ts":1692304805.9038486,"caller":"[email protected]/logger.go:53","msg":"","path":"/v2/library/nginx/blobs/sha256:1cb127bd932119089b5ffb612ffa84537ddd1318e6784f2fce80916bbb8bd166","status":200,"method":"GET","latency":0.005075836,"ip":"172.18.0.5","handler":"mirror"}
```yaml
spegel:
visualize:
enabled: true
```
While the Spegel instance on the other end will log.
After all Spegel instances have restarted you can port forward to one of the Spegel pods.
```shell
kubectl -n spegel port-forward ${POD_NAME} 9090
```
{"level":"info","ts":1692304805.9035861,"caller":"[email protected]/logger.go:53","msg":"","path":"/v2/library/nginx/blobs/sha256:1cb127bd932119089b5ffb612ffa84537ddd1318e6784f2fce80916bbb8bd166","status":200,"method":"GET","latency":0.003644997,"ip":"172.18.0.5","handler":"blob"}
```

Open the UI at `http://localhost:9090/visualize` in a browser. If all is configured propery you should be presented with and interface showing registry requests.

## Will image pulls break or be delayed if a spegel instance fails or is removed?

Expand Down
8 changes: 8 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/spegel-org/spegel/pkg/routing"
"github.com/spegel-org/spegel/pkg/state"
"github.com/spegel-org/spegel/pkg/throttle"
"github.com/spegel-org/spegel/pkg/visualize"
)

type ConfigurationCmd struct {
Expand Down Expand Up @@ -63,6 +64,7 @@ type RegistryCmd struct {
MirrorResolveTimeout time.Duration `arg:"--mirror-resolve-timeout,env:MIRROR_RESOLVE_TIMEOUT" default:"5s" help:"Max duration spent finding a mirror."`
MirrorResolveRetries int `arg:"--mirror-resolve-retries,env:MIRROR_RESOLVE_RETRIES" default:"3" help:"Max amount of mirrors to attempt."`
ResolveLatestTag bool `arg:"--resolve-latest-tag,env:RESOLVE_LATEST_TAG" default:"true" help:"When true latest tags will be resolved to digests."`
VisualizeEnabled bool `arg:"--visualize-enabled,env:VISUALIZE_ENABLED" default:"false" help:"When true visualizer will run and record events."`
}

type Arguments struct {
Expand Down Expand Up @@ -141,6 +143,11 @@ func registryCommand(ctx context.Context, args *RegistryCmd) (err error) {
mux.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
mux.Handle("/debug/pprof/block", pprof.Handler("block"))
mux.Handle("/debug/pprof/mutex", pprof.Handler("mutex"))
var eventStore visualize.EventStore
if args.VisualizeEnabled {
eventStore = visualize.NewMemoryStore()
mux.Handle("/visualize/", visualize.Handler(eventStore))
}
metricsSrv := &http.Server{
Addr: args.MetricsAddr,
Handler: mux,
Expand Down Expand Up @@ -195,6 +202,7 @@ func registryCommand(ctx context.Context, args *RegistryCmd) (err error) {
registry.WithResolveTimeout(args.MirrorResolveTimeout),
registry.WithLocalAddress(args.LocalAddr),
registry.WithLogger(log),
registry.WithEventStore(eventStore),
}
if args.BlobSpeed != nil {
registryOpts = append(registryOpts, registry.WithBlobSpeed(*args.BlobSpeed))
Expand Down
7 changes: 7 additions & 0 deletions pkg/registry/distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ func (r reference) hasLatestTag() bool {
return tag == "latest"
}

func (r reference) tagOrDigest() string {
if r.name != "" {
return r.name
}
return r.dgst.String()
}

// Package is used to parse components from requests which comform with the OCI distribution spec.
// https://github.com/opencontainers/distribution-spec/blob/main/spec.md
// /v2/<name>/manifests/<reference>
Expand Down
45 changes: 36 additions & 9 deletions pkg/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net"
"net/http"
"net/http/httputil"
"net/netip"
"net/url"
"path"
"strconv"
Expand All @@ -21,13 +22,15 @@ import (
"github.com/spegel-org/spegel/pkg/oci"
"github.com/spegel-org/spegel/pkg/routing"
"github.com/spegel-org/spegel/pkg/throttle"
"github.com/spegel-org/spegel/pkg/visualize"
)

const (
MirroredHeaderKey = "X-Spegel-Mirrored"
)

type Registry struct {
eventStore visualize.EventStore
log logr.Logger
throttler *throttle.Throttler
ociClient oci.Client
Expand Down Expand Up @@ -83,6 +86,12 @@ func WithLogger(log logr.Logger) Option {
}
}

func WithEventStore(eventStore visualize.EventStore) Option {
return func(r *Registry) {
r.eventStore = eventStore
}
}

func NewRegistry(ociClient oci.Client, router routing.Router, opts ...Option) *Registry {
r := &Registry{
ociClient: ociClient,
Expand Down Expand Up @@ -188,6 +197,17 @@ func (r *Registry) registryHandler(rw mux.ResponseWriter, req *http.Request) str
return "mirror"
}

if r.eventStore != nil {
defer func() {
ip := getClientIP(req)
addr, err := netip.ParseAddr(ip)
if err != nil {
return
}
r.eventStore.RecordRequest(ref.tagOrDigest(), addr, req.Method, rw.Status(), false)
}()
}

// Serve registry endpoints.
switch ref.kind {
case referenceKindManifest:
Expand All @@ -203,12 +223,7 @@ func (r *Registry) registryHandler(rw mux.ResponseWriter, req *http.Request) str
}

func (r *Registry) handleMirror(rw mux.ResponseWriter, req *http.Request, ref reference) {
key := ref.dgst.String()
if key == "" {
key = ref.name
}

log := r.log.WithValues("key", key, "path", req.URL.Path, "ip", getClientIP(req))
log := r.log.WithValues("key", ref.tagOrDigest(), "path", req.URL.Path, "ip", getClientIP(req))

isExternal := r.isExternalRequest(req)
if isExternal {
Expand Down Expand Up @@ -237,7 +252,7 @@ func (r *Registry) handleMirror(rw mux.ResponseWriter, req *http.Request, ref re
resolveCtx, cancel := context.WithTimeout(req.Context(), r.resolveTimeout)
defer cancel()
resolveCtx = logr.NewContext(resolveCtx, log)
peerCh, err := r.router.Resolve(resolveCtx, key, isExternal, r.resolveRetries)
peerCh, err := r.router.Resolve(resolveCtx, ref.tagOrDigest(), isExternal, r.resolveRetries)
if err != nil {
rw.WriteError(http.StatusInternalServerError, fmt.Errorf("error occurred when attempting to resolve mirrors: %w", err))
return
Expand All @@ -248,12 +263,18 @@ func (r *Registry) handleMirror(rw mux.ResponseWriter, req *http.Request, ref re
select {
case <-req.Context().Done():
// Request has been closed by server or client. No use continuing.
rw.WriteError(http.StatusNotFound, fmt.Errorf("mirroring for image component %s has been cancelled: %w", key, resolveCtx.Err()))
rw.WriteError(http.StatusNotFound, fmt.Errorf("mirroring for image component %s has been cancelled: %w", ref.tagOrDigest(), resolveCtx.Err()))
return
case ipAddr, ok := <-peerCh:
// Channel closed means no more mirrors will be received and max retries has been reached.
if !ok {
err = fmt.Errorf("mirror with image component %s could not be found", key)
// Register not found if no mirror attempts have been made.
fmt.Println("mirror channel closed", ref.tagOrDigest(), mirrorAttempts)
if r.eventStore != nil && mirrorAttempts == 0 {
r.eventStore.RecordNoMirrors(ref.tagOrDigest())
}

err = fmt.Errorf("mirror with image component %s could not be found", ref.tagOrDigest())
if mirrorAttempts > 0 {
err = errors.Join(err, fmt.Errorf("requests to %d mirrors failed, all attempts have been exhausted or timeout has been reached", mirrorAttempts))
}
Expand Down Expand Up @@ -288,6 +309,12 @@ func (r *Registry) handleMirror(rw mux.ResponseWriter, req *http.Request, ref re
return nil
}
proxy.ServeHTTP(rw, req)

// Track image events if enabled
if r.eventStore != nil {
r.eventStore.RecordRequest(ref.tagOrDigest(), ipAddr.Addr(), req.Method, rw.Status(), true)
}

if !succeeded {
break
}
Expand Down
150 changes: 150 additions & 0 deletions pkg/visualize/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package visualize

import (
"net/http"
"net/netip"
"sync"
"time"
)

type GraphData struct {
Nodes []Node `json:"nodes"`
Links []Link `json:"links"`
}

type Node struct {
ID string `json:"id"`
}

type Link struct {
ID string `json:"id"`
Source string `json:"source"`
Target string `json:"target"`
Status int `json:"status"`
}

type EventStore interface {
RecordNoMirrors(id string)
RecordRequest(id string, peer netip.Addr, method string, status int, mirror bool)
FilterByDirection(rootIsSource bool) EventStore
Graph() GraphData
LastModified() time.Time
}

type edge struct {
node string
id string
status int
rootIsSource bool
}

var _ EventStore = &MemoryStore{}

type MemoryStore struct {
lastModified time.Time
edgeIndex map[string]int
edges []edge
mx sync.RWMutex
}

func NewMemoryStore() *MemoryStore {
return &MemoryStore{
edges: []edge{},
edgeIndex: map[string]int{},
}
}

func (m *MemoryStore) set(e edge) {
m.mx.Lock()
defer m.mx.Unlock()

m.lastModified = time.Now()
if idx, ok := m.edgeIndex[e.id]; ok {
m.edges[idx] = e
return
}
m.edges = append(m.edges, e)
m.edgeIndex[e.id] = len(m.edges) - 1
}

func (m *MemoryStore) RecordNoMirrors(id string) {
e := edge{
node: "Not Found",
id: id,
rootIsSource: true,
}
m.set(e)
}

func (m *MemoryStore) RecordRequest(id string, peer netip.Addr, method string, status int, mirror bool) {
if method != http.MethodGet {
return
}
e := edge{
node: peer.String(),
id: id,
status: status,
rootIsSource: mirror,
}
m.set(e)
}

func (m *MemoryStore) FilterByDirection(rootIsSource bool) EventStore { //nolint: ireturn // Have to return interface to implement interface.
m.mx.RLock()
defer m.mx.RUnlock()

f := NewMemoryStore()
f.lastModified = m.lastModified
for _, edge := range m.edges {
if edge.rootIsSource != rootIsSource {
continue
}
f.edges = append(f.edges, edge)
f.edgeIndex[edge.id] = len(f.edges) - 1
}
return f
}

func (m *MemoryStore) Graph() GraphData {
m.mx.RLock()
defer m.mx.RUnlock()

gd := GraphData{
Nodes: []Node{
{
ID: "self",
},
},
Links: []Link{},
}
nodeIndex := map[string]interface{}{}
for _, edge := range m.edges {
src := gd.Nodes[0].ID
dest := edge.node
if !edge.rootIsSource {
src = edge.node
dest = gd.Nodes[0].ID
}
link := Link{
ID: edge.id,
Source: src,
Target: dest,
Status: edge.status,
}
gd.Links = append(gd.Links, link)

if _, ok := nodeIndex[edge.node]; ok {
continue
}
gd.Nodes = append(gd.Nodes, Node{ID: edge.node})
nodeIndex[edge.node] = nil
}
return gd
}

func (m *MemoryStore) LastModified() time.Time {
m.mx.RLock()
defer m.mx.RUnlock()

return m.lastModified
}
Loading
Loading