From b458e9da8b5778a61e8b11ea47681e5bc2c2f1bd Mon Sep 17 00:00:00 2001 From: Bob Melander Date: Tue, 2 Jul 2024 15:18:18 +0200 Subject: [PATCH] feat: adds periodic health reporting to the k8s inventory agent Addresses: Enterprise-2483 Signed-off-by: Bob Melander --- anchore-k8s-inventory.yaml | 14 + cmd/root.go | 27 +- go.mod | 3 +- go.sum | 2 + internal/anchore/anchoreclient.go | 294 ++++++ internal/anchore/anchoreclient_test.go | 647 ++++++++++++ internal/config/config.go | 15 +- .../snapshot/TestDefaultConfigString.golden | 5 + .../snapshot/TestEmptyConfigString.golden | 5 + .../snapshot/TestSensitiveConfigJSON.golden | 3 + .../snapshot/TestSensitiveConfigString.golden | 5 + internal/time/time.go | 42 + internal/time/time_test.go | 154 +++ pkg/healthreporter/healthreporter.go | 186 ++++ pkg/healthreporter/healthreporter_test.go | 234 +++++ pkg/integration/integration.go | 409 ++++++++ pkg/integration/integration_test.go | 919 ++++++++++++++++++ pkg/lib.go | 58 +- skaffold.yaml | 1 + 19 files changed, 3007 insertions(+), 16 deletions(-) create mode 100644 internal/anchore/anchoreclient.go create mode 100644 internal/anchore/anchoreclient_test.go create mode 100644 internal/time/time.go create mode 100644 internal/time/time_test.go create mode 100644 pkg/healthreporter/healthreporter.go create mode 100644 pkg/healthreporter/healthreporter_test.go create mode 100644 pkg/integration/integration.go create mode 100644 pkg/integration/integration_test.go diff --git a/anchore-k8s-inventory.yaml b/anchore-k8s-inventory.yaml index 6445dd6..17f693a 100644 --- a/anchore-k8s-inventory.yaml +++ b/anchore-k8s-inventory.yaml @@ -14,6 +14,17 @@ log: # enable/disable checking for application updates on startup check-for-app-update: true +anchore-registration: + # The id to register the agent as with Enterprise, so Enterprise can map the agent to its integration uuid. + # If left unspecified, the agent will attempt to set registration-id to the uid of the K8s Deployment for the agent. + # If that fails (e.g., if the agent is not deployed on K8s), the agent will generate a UUID to use as registration-id. + registration-id: + # The name that the agent should have. If left unspecified, the agent will attempt to set it to the name of the K8s + # Deployment for the agent. If that fails it will be empty. + integration-name: + # A short description for the agent + integration-description: + kubeconfig: path: cluster: docker-desktop @@ -117,6 +128,9 @@ ignore-not-running: true # Only respected if mode is periodic polling-interval-seconds: 300 +# Only respected if mode is periodic +health-report-interval-seconds: 60 + # Batch Request configuration inventory-report-limits: namespaces: 0 # default of 0 means no limit per report diff --git a/cmd/root.go b/cmd/root.go index aaa151f..bc61d7b 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -3,11 +3,12 @@ package cmd import ( "errors" "fmt" - "os" - "runtime/pprof" - + "github.com/anchore/k8s-inventory/pkg/healthreporter" + "github.com/anchore/k8s-inventory/pkg/integration" "github.com/anchore/k8s-inventory/pkg/mode" "github.com/anchore/k8s-inventory/pkg/reporter" + "os" + "runtime/pprof" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -47,7 +48,20 @@ var rootCmd = &cobra.Command{ switch appConfig.RunMode { case mode.PeriodicPolling: - pkg.PeriodicallyGetInventoryReport(appConfig) + neverDone := make(chan bool, 1) + + ch := integration.GetChannels() + gatedReportInfo := healthreporter.GetGatedReportInfo() + + go healthreporter.PeriodicallySendHealthReport(appConfig, ch, gatedReportInfo) + go pkg.PeriodicallyGetInventoryReport(appConfig, ch, gatedReportInfo) + + _, err := integration.PerformRegistration(appConfig, ch) + if err != nil { + os.Exit(1) + } + + <-neverDone default: reports, err := pkg.GetInventoryReports(appConfig) if appConfig.Dev.ProfileCPU { @@ -58,10 +72,11 @@ var rootCmd = &cobra.Command{ os.Exit(1) } anErrorOccurred := false + reportInfo := healthreporter.InventoryReportInfo{} for account, reportsForAccount := range reports { for count, report := range reportsForAccount { log.Infof("Sending Inventory Report to Anchore Account %s, %d of %d", account, count+1, len(reportsForAccount)) - err = pkg.HandleReport(report, appConfig, account) + err = pkg.HandleReport(report, &reportInfo, appConfig, account) if errors.Is(err, reporter.ErrAnchoreAccountDoesNotExist) { // Retry with default account retryAccount := appConfig.AnchoreDetails.Account @@ -69,7 +84,7 @@ var rootCmd = &cobra.Command{ retryAccount = appConfig.AccountRouteByNamespaceLabel.DefaultAccount } log.Warnf("Error sending to Anchore Account %s, sending to default account", account) - err = pkg.HandleReport(report, appConfig, retryAccount) + err = pkg.HandleReport(report, &reportInfo, appConfig, retryAccount) } if err != nil { log.Errorf("Failed to handle Image Results: %+v", err) diff --git a/go.mod b/go.mod index 07da192..b2c36cf 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,9 @@ go 1.22.5 require ( github.com/adrg/xdg v0.5.0 github.com/anchore/go-testutils v0.0.0-20200925183923-d5f45b0d3c04 + github.com/google/uuid v1.6.0 github.com/h2non/gock v1.2.0 + github.com/hashicorp/go-version v1.7.0 github.com/mitchellh/go-homedir v1.1.0 github.com/nsf/jsondiff v0.0.0-20230430225905-43f6cf3098c1 github.com/sirupsen/logrus v1.9.3 @@ -33,7 +35,6 @@ require ( github.com/google/gnostic-models v0.6.8 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/imdario/mergo v0.3.6 // indirect diff --git a/go.sum b/go.sum index bb90673..46da7d5 100644 --- a/go.sum +++ b/go.sum @@ -61,6 +61,8 @@ github.com/h2non/gock v1.2.0 h1:K6ol8rfrRkUOefooBC8elXoaNGYkpp7y2qcxGG6BzUE= github.com/h2non/gock v1.2.0/go.mod h1:tNhoxHYW2W42cYkYb1WqzdbYIieALC99kpYr7rH/BQk= github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 h1:2VTzZjLZBgl62/EtslCrtky5vbi9dd7HrQPQIx6wqiw= github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI= +github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= +github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= diff --git a/internal/anchore/anchoreclient.go b/internal/anchore/anchoreclient.go new file mode 100644 index 0000000..d19819e --- /dev/null +++ b/internal/anchore/anchoreclient.go @@ -0,0 +1,294 @@ +package anchore + +import ( + "bytes" + "crypto/tls" + "encoding/json" + "errors" + "fmt" + "github.com/anchore/k8s-inventory/internal/config" + "github.com/anchore/k8s-inventory/internal/log" + "github.com/anchore/k8s-inventory/internal/tracker" + "github.com/h2non/gock" + "io" + "net/http" + "net/url" + "os" + "strings" + "syscall" + "time" +) + +type Version struct { + API struct { + Version string `json:"version"` + } `json:"api"` + DB struct { + SchemaVersion string `json:"schema_version"` + } `json:"db"` + Service struct { + Version string `json:"version"` + } `json:"service"` +} + +type ControllerErrorDetails struct { + Type string `json:"type"` + Title string `json:"title"` + Detail string `json:"detail"` + Status int `json:"status"` +} + +type APIErrorDetails struct { + Message string `json:"message"` + Detail map[string]interface{} `json:"detail"` + HTTPCode int `json:"httpcode"` +} + +type APIClientError struct { + HTTPStatusCode int + Message string + Path string + Method string + Body *[]byte + APIErrorDetails *APIErrorDetails + ControllerErrorDetails *ControllerErrorDetails +} + +func (e *APIClientError) Error() string { + return fmt.Sprintf("API errorMsg(%d): %s Path: %q %v %v", e.HTTPStatusCode, e.Message, e.Path, + e.APIErrorDetails, e.ControllerErrorDetails) +} + +func GetVersion(anchoreDetails config.AnchoreInfo) (*Version, error) { + operation := "version get" + defer tracker.TrackFunctionTime(time.Now(), fmt.Sprintf("Sent %s request to Anchore", operation)) + + log.Debug("Determining Anchore service version") + + client := getClient(anchoreDetails) + + response, err := client.Get(anchoreDetails.URL + "/version") + if err != nil { + return nil, err + } + defer response.Body.Close() + + err = checkHTTPErrors(response, operation) + if err != nil { + return nil, err + } + + responseBody, err := getBody(response, operation) + if err != nil { + return nil, err + } + + ver := Version{} + err = json.Unmarshal(*responseBody, &ver) + if err != nil { + return nil, fmt.Errorf("failed to parse API version: %w", err) + } + return &ver, nil +} + +func Post(requestBody []byte, id string, path string, anchoreDetails config.AnchoreInfo, operation string) (*[]byte, error) { + defer tracker.TrackFunctionTime(time.Now(), fmt.Sprintf("Sent %s request to Anchore", operation)) + + log.Debugf("Performing %s to Anchore using endpoint: %s", operation, strings.Replace(path, "{{id}}", id, 1)) + + client := getClient(anchoreDetails) + + anchoreURL, err := getURL(anchoreDetails, path, id) + if err != nil { + return nil, err + } + + request, err := getPostRequest(anchoreDetails, anchoreURL, requestBody, operation) + if err != nil { + return nil, err + } + + return doPost(client, request, operation) +} + +func getClient(anchoreDetails config.AnchoreInfo) *http.Client { + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: anchoreDetails.HTTP.Insecure}, + } // #nosec G402 + + client := &http.Client{ + Transport: tr, + Timeout: time.Duration(anchoreDetails.HTTP.TimeoutSeconds) * time.Second, + } + gock.InterceptClient(client) // Required to use gock for testing custom client + + return client +} + +func getURL(anchoreDetails config.AnchoreInfo, path string, id string) (string, error) { + anchoreURL, err := url.Parse(anchoreDetails.URL) + if err != nil { + return "", fmt.Errorf("failed to build path (%s) url: %w", path, err) + } + + anchoreURL.Path += strings.Replace(path, "{{id}}", id, 1) + return anchoreURL.String(), nil +} + +func getPostRequest(anchoreDetails config.AnchoreInfo, endpointURL string, reqBody []byte, operation string) (*http.Request, error) { + request, err := http.NewRequest("POST", endpointURL, bytes.NewBuffer(reqBody)) + if err != nil { + return nil, fmt.Errorf("failed to prepare %s request to Anchore: %w", operation, err) + } + + request.SetBasicAuth(anchoreDetails.User, anchoreDetails.Password) + request.Header.Set("Content-Type", "application/json") + request.Header.Set("x-anchore-account", anchoreDetails.Account) + return request, nil +} + +func doPost(client *http.Client, request *http.Request, operation string) (*[]byte, error) { + response, err := client.Do(request) + if err != nil { + return nil, err + } + defer response.Body.Close() + + err = checkHTTPErrors(response, operation) + if err != nil { + return nil, err + } + + responseBody, err := getBody(response, operation) + return responseBody, err +} + +func checkHTTPErrors(response *http.Response, operation string) error { + switch { + case response.StatusCode >= 400 && response.StatusCode <= 599: + msg := fmt.Sprintf("%s response from Anchore (during %s)", response.Status, operation) + log.Errorf(msg) + + respBody, _ := getBody(response, operation) + if respBody == nil { + return &APIClientError{Message: msg, Path: response.Request.URL.Path, Method: response.Request.Method, + Body: nil, HTTPStatusCode: response.StatusCode} + } + + // Depending on where an errorMsg is discovered during request processing on the server, the + // errorMsg information in the response will be either an APIErrorDetails or a ControllerErrorDetails + apiError := APIErrorDetails{} + err := json.Unmarshal(*respBody, &apiError) + if err == nil { + return &APIClientError{Message: msg, Path: response.Request.URL.Path, Method: response.Request.Method, + Body: nil, HTTPStatusCode: response.StatusCode, APIErrorDetails: &apiError} + } + + controllerError := ControllerErrorDetails{} + err = json.Unmarshal(*respBody, &controllerError) + if err == nil { + return &APIClientError{Message: msg, Path: response.Request.URL.Path, Method: response.Request.Method, + Body: nil, HTTPStatusCode: response.StatusCode, ControllerErrorDetails: &controllerError} + } + + return &APIClientError{Message: msg, Path: response.Request.URL.Path, Method: response.Request.Method, + Body: nil, HTTPStatusCode: response.StatusCode} + case response.StatusCode < 200 || response.StatusCode > 299: + msg := fmt.Sprintf("failed to perform %s to Anchore: %+v", operation, response) + log.Debugf(msg) + return &APIClientError{Message: msg, Path: response.Request.URL.Path, Method: response.Request.Method, + Body: nil, HTTPStatusCode: response.StatusCode} + } + return nil +} + +func getBody(response *http.Response, operation string) (*[]byte, error) { + responseBody, err := io.ReadAll(response.Body) + if err != nil { + errMsg := fmt.Sprintf("failed to read %s response body from Anchore:", operation) + log.Debugf("%s %v", operation, errMsg) + return nil, fmt.Errorf("%s %w", errMsg, err) + } + + // Check we received a valid JSON response from Anchore, this will help catch + // any redirect responses where it returns HTML login pages e.g. Enterprise + // running behind cloudflare where a login page is returned with the status 200 + if len(responseBody) > 0 && !json.Valid(responseBody) { + log.Debugf("Anchore %s response body: %s", operation, string(responseBody)) + return nil, fmt.Errorf("%s response from Anchore is not valid json: %+v", operation, response) + } + return &responseBody, nil +} + +func ServerIsOffline(err error) bool { + if os.IsTimeout(err) { + return true + } + + if errors.Is(err, syscall.ECONNREFUSED) { + return true + } + + if errors.Is(err, syscall.ECONNRESET) { + return true + } + + var apiClientError *APIClientError + if errors.As(err, &apiClientError) { + if apiClientError.HTTPStatusCode == http.StatusBadGateway || + apiClientError.HTTPStatusCode == http.StatusServiceUnavailable || + apiClientError.HTTPStatusCode == http.StatusGatewayTimeout { + return true + } + } + + return false +} + +func ServerLacksAgentHealthAPISupport(err error) bool { + var apiClientError *APIClientError + if errors.As(err, &apiClientError) { + if apiClientError.ControllerErrorDetails == nil { + return false + } + + if apiClientError.HTTPStatusCode == http.StatusNotFound && + strings.Contains(apiClientError.ControllerErrorDetails.Detail, "The requested URL was not found") { + return true + } + + if apiClientError.HTTPStatusCode == http.StatusMethodNotAllowed && + apiClientError.ControllerErrorDetails.Detail == "Method Not Allowed" { + return true + } + } + + return false +} + +func UserLacksAPIPrivileges(err error) bool { + var apiClientError *APIClientError + + if errors.As(err, &apiClientError) { + if apiClientError.APIErrorDetails == nil { + return false + } + + if apiClientError.HTTPStatusCode == http.StatusForbidden && + strings.Contains(apiClientError.APIErrorDetails.Message, "Not authorized. Requires permissions") { + return true + } + } + return false +} + +func IncorrectCredentials(err error) bool { + // This covers user that does not exist or incorrect password for user + var apiClientError *APIClientError + + if errors.As(err, &apiClientError) && apiClientError.HTTPStatusCode == http.StatusUnauthorized { + return true + } + + return false +} diff --git a/internal/anchore/anchoreclient_test.go b/internal/anchore/anchoreclient_test.go new file mode 100644 index 0000000..a4a74b3 --- /dev/null +++ b/internal/anchore/anchoreclient_test.go @@ -0,0 +1,647 @@ +package anchore + +import ( + "fmt" + "github.com/anchore/k8s-inventory/internal/config" + "github.com/h2non/gock" + "github.com/stretchr/testify/assert" + "net" + "net/http" + "net/url" + "os" + "syscall" + "testing" +) + +type httpError struct { + err string + timeout bool +} + +func (e *httpError) Error() string { return e.err } +func (e *httpError) Timeout() bool { return e.timeout } + +var ( + version = map[string]interface{}{ + "service": map[string]interface{}{ + "version": "5.11.0", + }, + "api": map[string]interface{}{ + "version": "2", + }, + "db": map[string]interface{}{ + "schema_version": "5110", + }, + } + + versionObj = Version{ + API: struct { + Version string `json:"version"` + }(struct { + Version string + }{"2"}), + DB: struct { + SchemaVersion string `json:"schema_version"` + }(struct { + SchemaVersion string + }{"5110"}), + Service: struct { + Version string `json:"version"` + }(struct { + Version string + }{"5.11.0"}), + } + + integration = map[string]interface{}{ + "uuid": "000d1e60-cb05-4cce-8d1e-60cb052cce1f", + "type": "k8s_inventory_agent", + "name": "k8s-inv-agent", + "description": "k8s-agent with health reporting", + "version": "2.0", + "reported_status": map[string]interface{}{"state": "HEALTHY"}, + "integration_status": map[string]interface{}{"state": "REGISTERED"}, + "started_at": "2024-04-10T12:14:16Z", + // "last_seen": nil + "uptime": "2.04", + "username": "admin", + "account_name": "admin", + "explicitly_account_bound": []interface{}{}, + "accounts": []interface{}{}, + "namespaces": []interface{}{}, + "cluster_name": "Docker-Desktop", + "namespace": "default", + "health_report_interval": 60, + "registration_id": "de2c3c58-4c20-4d87-ac3c-584c201d875a", + "registration_instance_id": "45743315", + } + + errOther = fmt.Errorf("other errorMsg") + + connectionTimeoutError = url.Error{ + Op: "Post", + URL: "http://127.0.0.1:8228/v2/system/integrations/registration", + Err: &httpError{err: "net/http: timeout awaiting response headers", timeout: true}, + } + + connectionRefusedError = url.Error{ + Op: "Post", + URL: "http://127.0.0.1:8228/v2/system/integrations/registration", + Err: &net.OpError{ + Op: "dial", + Net: "tcp", + Source: nil, + Addr: &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 8228, + Zone: "", + }, + Err: &os.SyscallError{ + Syscall: "connect", + Err: syscall.ECONNREFUSED, + }, + }, + } + + connectionResetError = url.Error{ + Op: "Post", + URL: "http://127.0.0.1:8228/v2/system/integrations/registration", + Err: &net.OpError{ + Op: "read", + Net: "tcp", + Source: &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 62122, + Zone: "", + }, + Addr: &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 8228, + Zone: "", + }, + Err: &os.SyscallError{ + Syscall: "read", + Err: syscall.ECONNRESET, + }, + }, + } + + badGatewayError = APIClientError{ + HTTPStatusCode: http.StatusBadGateway, + Message: "Bad Gateway", + Path: "/v2/system/integrations/registration", + Method: "POST", + } + + serviceUnavailableError = APIClientError{ + HTTPStatusCode: http.StatusServiceUnavailable, + Message: "Service Unavailable", + Path: "/v2/system/integrations/registration", + Method: "POST", + } + + gatewayTimeoutError = APIClientError{ + HTTPStatusCode: http.StatusGatewayTimeout, + Message: "Gateway Timeout", + Path: "/v2/system/integrations/registration", + Method: "POST", + } + + urlNotFoundError = APIClientError{ + HTTPStatusCode: http.StatusNotFound, + Message: "404 Not Found response from Anchore (during integration registration)", + Path: "/v2/system/integrations/registration", + Method: "POST", + ControllerErrorDetails: &ControllerErrorDetails{ + Type: "about:blank", + Title: "Not Found", + Detail: "The requested URL was not found on the server. If you entered the URL manually please check your spelling and try again.", + Status: http.StatusNotFound, + }, + } + + methodNotAllowedError = APIClientError{ + HTTPStatusCode: http.StatusMethodNotAllowed, + Message: "405 Method Not Allowed response from Anchore (during integration registration)", + Path: "/v2/system/integrations/registration", + Method: "POST", + ControllerErrorDetails: &ControllerErrorDetails{ + Type: "about:blank", + Title: "Method Not Allowed", + Detail: "Method Not Allowed", + Status: http.StatusMethodNotAllowed, + }, + } + + unAuthorizedError = APIClientError{ + HTTPStatusCode: http.StatusUnauthorized, + Message: "401 Unauthorized response from Anchore (during integration registration)", + Path: "/v2/system/integrations/registration", + Method: "POST", + } + + insufficientPrivilegeError = APIClientError{ + HTTPStatusCode: http.StatusForbidden, + Message: "403 Forbidden response from Anchore (during integration registration)", + Path: "/v2/system/integrations/registration", + Method: "POST", + APIErrorDetails: &APIErrorDetails{ + Message: "Not authorized. Requires permissions: domain=account0 action=registerIntegration target=", + Detail: map[string]interface{}{}, + HTTPCode: http.StatusForbidden, + }, + } + + anchoreDetails = config.AnchoreInfo{ + URL: "https://ancho.re", + User: "admin", + Password: "foobar", + Account: "account0", + } +) + +func TestGetVersion(t *testing.T) { + defer gock.Off() + + type args struct { + anchoreDetails config.AnchoreInfo + } + tests := []struct { + name string + args args + want *Version + wantErr bool + }{ + { + name: "successful get version", + args: args{ + anchoreDetails: anchoreDetails, + }, + want: &versionObj, + wantErr: false, + }, + { + name: "bad json response", + args: args{ + anchoreDetails: anchoreDetails, + }, + want: nil, + wantErr: true, + }, + { + name: "other error", + args: args{ + anchoreDetails: anchoreDetails, + }, + want: nil, + wantErr: true, + }, + { + name: "missing", + args: args{ + anchoreDetails: anchoreDetails, + }, + want: nil, + wantErr: true, + }, + } + for _, tt := range tests { + gock.Flush() + + switch tt.name { + case "successful get version": + gock.New("https://ancho.re"). + Get("/version"). + Reply(200). + JSON(version) + case "bad json response": + gock.New("https://ancho.re"). + Get("/version"). + Reply(200). + BodyString("bad json") + case "other error": + gock.New("https://ancho.re"). + Get("/version"). + Reply(http.StatusBadRequest) + case "missing": + gock.New("https://ancho.re"). + Get("/vversion"). + Reply(200) + } + t.Run(tt.name, func(t *testing.T) { + result, err := GetVersion(tt.args.anchoreDetails) + if tt.wantErr { + assert.Error(t, err) + assert.Nil(t, result) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.want, result) + } + }) + } +} + +func TestPost(t *testing.T) { + defer gock.Off() + + type args struct { + requestBody []byte + id string + path string + anchoreDetails config.AnchoreInfo + operation string + } + tests := []struct { + name string + args args + wantErr error + }{ + { + name: "successful registration", + args: args{ + requestBody: []byte(`{"id":"1"}`), + id: "", + path: "v2/system/integrations/registration", + anchoreDetails: anchoreDetails, + operation: "integration registration", + }, + }, + { + name: "401 error", + args: args{ + requestBody: []byte(`{"id":"1"}`), + id: "", + path: "v2/system/integrations/registration", + anchoreDetails: anchoreDetails, + operation: "integration registration", + }, + wantErr: &unAuthorizedError, + }, + { + name: "403 error", + args: args{ + requestBody: []byte(`{"id":"1"}`), + id: "", + path: "v2/system/integrations/registration", + anchoreDetails: anchoreDetails, + operation: "integration registration", + }, + wantErr: &insufficientPrivilegeError, + }, + { + name: "404 error", + args: args{ + requestBody: []byte(`{"id":"1"}`), + id: "", + path: "v2/system/integrations/registration", + anchoreDetails: anchoreDetails, + operation: "integration registration", + }, + wantErr: &urlNotFoundError, + }, + } + for _, tt := range tests { + switch tt.name { + case "successful registration": + gock.New("https://ancho.re"). + Post("v2/system/integrations/registration"). + Reply(200). + JSON(integration) + case "401 error": + gock.New("https://ancho.re"). + Post("v2/system/integrations/registration"). + Reply(401) + case "403 error": + gock.New("https://ancho.re"). + Post("v2/system/integrations/registration"). + Reply(403). + JSON(map[string]interface{}{ + "message": "Not authorized. Requires permissions: domain=account0 action=registerIntegration target=", + "detail": map[string]interface{}{}, + "httpcode": http.StatusForbidden, + }) + case "404 error": + gock.New("https://ancho.re"). + Post("v2/system/integrations/registration"). + Reply(404). + JSON(map[string]interface{}{ + "type": "about:blank", + "title": "Not Found", + "detail": "The requested URL was not found on the server. If you entered the URL manually please check your spelling and try again.", + "status": http.StatusNotFound, + }) + } + t.Run(tt.name, func(t *testing.T) { + result, err := Post(tt.args.requestBody, tt.args.id, tt.args.path, tt.args.anchoreDetails, tt.args.operation) + if tt.wantErr != nil { + assert.Error(t, err) + assert.Equal(t, tt.wantErr, err) + assert.Nil(t, result) + } else { + assert.NoError(t, err) + assert.NotNil(t, result) + } + }) + } +} + +func TestGetUrl(t *testing.T) { + type args struct { + anchoreDetails config.AnchoreInfo + url string + uuid string + } + type want struct { + expectedURL string + errorMsg string + } + tests := []struct { + name string + args args + want want + }{ + { + name: "Registration url", + args: args{ + anchoreDetails: anchoreDetails, + url: "v2/system/integrations/registration", + uuid: "", + }, + want: want{ + expectedURL: "https://ancho.re/v2/system/integrations/registration", + errorMsg: "", + }, + }, + { + name: "Health report url", + args: args{ + anchoreDetails: anchoreDetails, + url: "v2/system/integrations/{{id}}/health-report", + uuid: "0ec44439-d091-4bf2-8444-39d0916bf220", + }, + want: want{ + expectedURL: "https://ancho.re/v2/system/integrations/0ec44439-d091-4bf2-8444-39d0916bf220/health-report", + errorMsg: "", + }, + }, + { + name: "faulty url", + args: args{ + anchoreDetails: config.AnchoreInfo{ + URL: "htt$ps://ancho.re", + User: "admin", + Password: "foobar", + }, + url: "v2/system/integrations/{{id}}/health-report", + uuid: "0ec44439-d091-4bf2-8444-39d0916bf220", + }, + want: want{ + expectedURL: "", + errorMsg: "failed to build path (v2/system/integrations/{{id}}/health-report)", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := getURL(tt.args.anchoreDetails, tt.args.url, tt.args.uuid) + assert.Equal(t, tt.want.expectedURL, result) + if tt.want.errorMsg != "" { + assert.ErrorContains(t, err, tt.want.errorMsg) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestGetPostRequest(t *testing.T) { + type args struct { + url string + reqBody []byte + } + tests := []struct { + name string + args args + want error + }{ + { + name: "Good path", + args: args{ + url: "http://localhost:8228/v2/system/integrations/registration", + reqBody: make([]byte, 0), + }, + want: nil, + }, + { + name: "Bad path", + args: args{ + url: "_http://localhost:8228/v2/system/integrations/registration", + reqBody: nil, + }, + want: &url.Error{}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := getPostRequest(anchoreDetails, tt.args.url, tt.args.reqBody, "register integration") + if tt.want != nil { + assert.Nil(t, result) + assert.Error(t, err, tt.want) + } else { + assert.NoError(t, err) + assert.Equal(t, result.Header.Get("content-type"), "application/json") + assert.Contains(t, result.Header.Get("Authorization"), "Basic") + assert.Contains(t, result.Header.Get("X-Anchore-Account"), "account0") + } + }) + } +} + +func TestAnchoreIsOffline(t *testing.T) { + tests := []struct { + name string + err error + want bool + }{ + { + name: "Connection timeout returns true", + err: &connectionTimeoutError, + want: true, + }, + { + name: "Connection refused errorMsg returns true", + err: &connectionRefusedError, + want: true, + }, + { + name: "Connection reset errorMsg returns true", + err: &connectionResetError, + want: true, + }, + { + name: "AnchoreAPIClientError with 502 http_status returns true", + err: &badGatewayError, + want: true, + }, + { + name: "AnchoreAPIClientError with 503 http_status returns true", + err: &serviceUnavailableError, + want: true, + }, + { + name: "AnchoreAPIClientError with 504 http_status returns true", + err: &gatewayTimeoutError, + want: true, + }, + { + name: "AnchoreAPIClientError with 401 http_status returns false", + err: &unAuthorizedError, + want: false, + }, + { + name: "Other errorMsg returns false", + err: errOther, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := ServerIsOffline(tt.err) + assert.Equal(t, tt.want, result) + }) + } +} + +func TestAnchoreLacksAgentHealthAPISupport(t *testing.T) { + tests := []struct { + name string + err error + want bool + }{ + { + name: "AnchoreAPIClientError with 404 http_status returns true", + err: &urlNotFoundError, + want: true, + }, + { + name: "AnchoreAPIClientError with 405 http_status returns true", + err: &methodNotAllowedError, + want: true, + }, + { + name: "AnchoreAPIClientError with 401 http_status returns false", + err: &unAuthorizedError, + want: false, + }, + { + name: "Other errorMsg returns false", + err: errOther, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := ServerLacksAgentHealthAPISupport(tt.err) + assert.Equal(t, tt.want, result) + }) + } +} + +func TestUserLacksAPIPrivileges(t *testing.T) { + tests := []struct { + name string + err error + want bool + }{ + { + name: "AnchoreAPIClientError with 403 http_status returns true", + err: &insufficientPrivilegeError, + want: true, + }, + { + name: "AnchoreAPIClientError with 401 http_status returns false", + err: &unAuthorizedError, + want: false, + }, + { + name: "Other errorMsg returns false", + err: errOther, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := UserLacksAPIPrivileges(tt.err) + assert.Equal(t, tt.want, result) + }) + } +} + +func TestIncorrectCredentials(t *testing.T) { + tests := []struct { + name string + err error + want bool + }{ + { + name: "AnchoreAPIClientError with 401 http_status returns true", + err: &unAuthorizedError, + want: true, + }, + { + name: "AnchoreAPIClientError with non 403 http_status returns false", + err: &insufficientPrivilegeError, + want: false, + }, + { + name: "Other errorMsg returns false", + err: errOther, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := IncorrectCredentials(tt.err) + assert.Equal(t, tt.want, result) + }) + } +} diff --git a/internal/config/config.go b/internal/config/config.go index 9332532..a282c76 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -15,7 +15,6 @@ import ( "path" "strings" - "github.com/anchore/k8s-inventory/pkg/mode" "gopkg.in/yaml.v2" "github.com/adrg/xdg" @@ -24,6 +23,7 @@ import ( "github.com/spf13/viper" "github.com/anchore/k8s-inventory/internal" + "github.com/anchore/k8s-inventory/pkg/mode" ) const redacted = "******" @@ -37,8 +37,9 @@ type CliOnlyOptions struct { // All Application configurations type Application struct { ConfigPath string - Quiet bool `mapstructure:"quiet" json:"quiet,omitempty" yaml:"quiet"` - Log Logging `mapstructure:"log" json:"log,omitempty" yaml:"log"` + Quiet bool `mapstructure:"quiet" json:"quiet,omitempty" yaml:"quiet"` + Log Logging `mapstructure:"log" json:"log,omitempty" yaml:"log"` + Registration RegistrationOptions `mapstructure:"anchore-registration" json:"anchore-registration,omitempty" yaml:"anchore-registration"` CliOptions CliOnlyOptions Dev Development `mapstructure:"dev" json:"dev,omitempty" yaml:"dev"` KubeConfig KubeConf `mapstructure:"kubeconfig" json:"kubeconfig,omitempty" yaml:"kubeconfig"` @@ -54,12 +55,19 @@ type Application struct { Mode string `mapstructure:"mode" json:"mode,omitempty" yaml:"mode"` IgnoreNotRunning bool `mapstructure:"ignore-not-running" json:"ignore-not-running,omitempty" yaml:"ignore-not-running"` PollingIntervalSeconds int `mapstructure:"polling-interval-seconds" json:"polling-interval-seconds,omitempty" yaml:"polling-interval-seconds"` + HealthReportIntervalSeconds int `mapstructure:"health-report-interval-seconds" json:"health-report-interval-seconds,omitempty" yaml:"health-report-interval-seconds"` InventoryReportLimits InventoryReportLimits `mapstructure:"inventory-report-limits" json:"inventory-report-limits,omitempty" yaml:"inventory-report-limits"` MetadataCollection MetadataCollection `mapstructure:"metadata-collection" json:"metadata-collection,omitempty" yaml:"metadata-collection"` AnchoreDetails AnchoreInfo `mapstructure:"anchore" json:"anchore,omitempty" yaml:"anchore"` VerboseInventoryReports bool `mapstructure:"verbose-inventory-reports" json:"verbose-inventory-reports,omitempty" yaml:"verbose-inventory-reports"` } +type RegistrationOptions struct { + RegistrationID string `mapstructure:"registration-id" json:"registration-id,omitempty" yaml:"registration-id"` + IntegrationName string `mapstructure:"integration-name" json:"integration-name,omitempty" yaml:"integration-name"` + IntegrationDescription string `mapstructure:"integration-description" json:"integration-description,omitempty" yaml:"integration-description"` +} + // MissingTagConf details the policy for handling missing tags when reporting images type MissingTagConf struct { Policy string `mapstructure:"policy" json:"policy,omitempty" yaml:"policy"` @@ -160,6 +168,7 @@ func setNonCliDefaultValues(v *viper.Viper) { v.SetDefault("kubernetes.request-batch-size", 100) v.SetDefault("kubernetes.worker-pool-size", 100) v.SetDefault("ignore-not-running", true) + v.SetDefault("health-report-interval-seconds", 60) v.SetDefault("missing-registry-override", "") v.SetDefault("missing-tag-policy.policy", "digest") v.SetDefault("missing-tag-policy.tag", "UNKNOWN") diff --git a/internal/config/test-fixtures/snapshot/TestDefaultConfigString.golden b/internal/config/test-fixtures/snapshot/TestDefaultConfigString.golden index c3b656b..a9581d9 100644 --- a/internal/config/test-fixtures/snapshot/TestDefaultConfigString.golden +++ b/internal/config/test-fixtures/snapshot/TestDefaultConfigString.golden @@ -5,6 +5,10 @@ log: levelopt: debug level: debug file: ./anchore-k8s-inventory.log +anchore-registration: + registration-id: "" + integration-name: "" + integration-description: "" clioptions: configpath: ../../anchore-k8s-inventory.yaml verbosity: 0 @@ -44,6 +48,7 @@ runmode: 0 mode: adhoc ignore-not-running: true polling-interval-seconds: 300 +health-report-interval-seconds: 60 inventory-report-limits: namespaces: 0 metadata-collection: diff --git a/internal/config/test-fixtures/snapshot/TestEmptyConfigString.golden b/internal/config/test-fixtures/snapshot/TestEmptyConfigString.golden index 8f85b8b..171051e 100644 --- a/internal/config/test-fixtures/snapshot/TestEmptyConfigString.golden +++ b/internal/config/test-fixtures/snapshot/TestEmptyConfigString.golden @@ -5,6 +5,10 @@ log: levelopt: panic level: "" file: "" +anchore-registration: + registration-id: "" + integration-name: "" + integration-description: "" clioptions: configpath: "" verbosity: 0 @@ -44,6 +48,7 @@ runmode: 0 mode: "" ignore-not-running: false polling-interval-seconds: 0 +health-report-interval-seconds: 0 inventory-report-limits: namespaces: 0 metadata-collection: diff --git a/internal/config/test-fixtures/snapshot/TestSensitiveConfigJSON.golden b/internal/config/test-fixtures/snapshot/TestSensitiveConfigJSON.golden index b9da0c5..a788487 100644 --- a/internal/config/test-fixtures/snapshot/TestSensitiveConfigJSON.golden +++ b/internal/config/test-fixtures/snapshot/TestSensitiveConfigJSON.golden @@ -5,6 +5,8 @@ "level": "debug", "file": "./anchore-k8s-inventory.log" }, + "anchore-registration": { + }, "CliOptions": { "ConfigPath": "../../anchore-k8s-inventory.yaml", "Verbosity": 0 @@ -50,6 +52,7 @@ "mode": "adhoc", "ignore-not-running": true, "polling-interval-seconds": 300, + "health-report-interval-seconds": 60, "inventory-report-limits": {}, "metadata-collection": { "nodes": {}, diff --git a/internal/config/test-fixtures/snapshot/TestSensitiveConfigString.golden b/internal/config/test-fixtures/snapshot/TestSensitiveConfigString.golden index e72b4f9..8edcd03 100644 --- a/internal/config/test-fixtures/snapshot/TestSensitiveConfigString.golden +++ b/internal/config/test-fixtures/snapshot/TestSensitiveConfigString.golden @@ -5,6 +5,10 @@ log: levelopt: debug level: debug file: ./anchore-k8s-inventory.log +anchore-registration: + registration-id: "" + integration-name: "" + integration-description: "" clioptions: configpath: ../../anchore-k8s-inventory.yaml verbosity: 0 @@ -54,6 +58,7 @@ runmode: 0 mode: adhoc ignore-not-running: true polling-interval-seconds: 300 +health-report-interval-seconds: 60 inventory-report-limits: namespaces: 0 metadata-collection: diff --git a/internal/time/time.go b/internal/time/time.go new file mode 100644 index 0000000..58b984a --- /dev/null +++ b/internal/time/time.go @@ -0,0 +1,42 @@ +package time + +import ( + "encoding/json" + "errors" + "fmt" + "time" +) + +// time with json marshalling/unmarshalling support +const nanoSeconds = 1000000000 + +type Datetime struct { + time.Time +} + +type Duration struct { + time.Duration +} + +func (d Datetime) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf("\"%s\"", d.Format(time.RFC3339))), nil +} + +func (d *Duration) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf("%f", d.Seconds())), nil +} + +func (d *Duration) UnmarshalJSON(b []byte) error { + var v interface{} + if err := json.Unmarshal(b, &v); err != nil { + return err + } + switch value := v.(type) { + case float64: + // enterprise sends durations in seconds + d.Duration = time.Duration(value * nanoSeconds) + return nil + default: + return errors.New("invalid duration") + } +} diff --git a/internal/time/time_test.go b/internal/time/time_test.go new file mode 100644 index 0000000..4db8157 --- /dev/null +++ b/internal/time/time_test.go @@ -0,0 +1,154 @@ +package time + +import ( + "encoding/json" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +var ( + timestamp1 = Datetime{ + Time: time.Date(2024, time.April, 10, 12, 14, 16, 50, time.UTC), + } + timestamp2 = Datetime{ + Time: time.Date(2023, time.June, 5, 6, 7, 8, 25, time.UTC), + } + + timeDiff1, _ = time.ParseDuration("7446h7m8.000000025s") + negTimeDiff1, _ = time.ParseDuration("-7446h7m8.000000025s") + timeDiff2 = time.Second * 10 + negTimeDiff2 = time.Second * (-10) +) + +func TestDateTimeMarshalJSON(t *testing.T) { + tests := []struct { + name string + t Datetime + want []byte + }{ + { + name: "timestamp1", + t: timestamp1, + want: []byte("\"2024-04-10T12:14:16Z\""), + }, + { + name: "timestamp2", + t: timestamp2, + want: []byte("\"2023-06-05T06:07:08Z\""), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, _ := tt.t.MarshalJSON() + assert.Equal(t, tt.want, result) + }) + } +} + +func TestDurationMarshalJSON(t *testing.T) { + tests := []struct { + name string + d *Duration + want []byte + }{ + { + name: "Year long difference", + d: &Duration{ + Duration: timeDiff1, + }, + want: []byte("26806028.000000"), + }, + { + name: "Negative year long difference", + d: &Duration{ + Duration: negTimeDiff1, + }, + want: []byte("-26806028.000000"), + }, + { + name: "Sub-minute long difference", + d: &Duration{ + Duration: timeDiff2, + }, + want: []byte("10.000000"), + }, + { + name: "Negative sub-minute long difference", + d: &Duration{ + Duration: negTimeDiff2, + }, + want: []byte("-10.000000"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, _ := tt.d.MarshalJSON() + assert.Equal(t, tt.want, result) + }) + } +} + +func TestDurationUnmarshalJSON(t *testing.T) { + timeDiff1, _ = time.ParseDuration("7446h7m8.000000000s") + negTimeDiff1, _ = time.ParseDuration("-7446h7m8.000000000s") + + type want struct { + d Duration + err error + } + tests := []struct { + name string + dbytes []byte + want want + }{ + { + name: "Year long difference, precision capped", + dbytes: []byte("26806028.000000"), + want: want{ + d: Duration{ + Duration: timeDiff1, + }, + err: nil, + }, + }, + { + name: "Negative year long difference, precision capped", + dbytes: []byte("-26806028.000000"), + want: want{ + d: Duration{ + Duration: negTimeDiff1, + }, + err: nil, + }, + }, + { + name: "Sub-minute long difference", + dbytes: []byte("10.000000"), + want: want{ + d: Duration{ + Duration: timeDiff2, + }, + err: nil, + }, + }, + { + name: "Negative sub-minute long difference", + dbytes: []byte("-10.000000"), + want: want{ + d: Duration{ + Duration: negTimeDiff2, + }, + err: nil, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var d Duration + resultErr := json.Unmarshal(tt.dbytes, &d) + assert.Equal(t, tt.want.err, resultErr) + assert.Equal(t, tt.want.d, d) + }) + } +} diff --git a/pkg/healthreporter/healthreporter.go b/pkg/healthreporter/healthreporter.go new file mode 100644 index 0000000..f9fcddb --- /dev/null +++ b/pkg/healthreporter/healthreporter.go @@ -0,0 +1,186 @@ +package healthreporter + +import ( + "encoding/json" + "sync" + "time" + + "github.com/google/uuid" + + "github.com/anchore/k8s-inventory/internal/anchore" + "github.com/anchore/k8s-inventory/internal/config" + "github.com/anchore/k8s-inventory/internal/log" + jstime "github.com/anchore/k8s-inventory/internal/time" + intg "github.com/anchore/k8s-inventory/pkg/integration" +) + +const healthProtocolVersion = 1 +const healthDataVersion = 1 +const healthDataType = "k8s_inventory_agent" +const HealthReportAPIPathV2 = "v2/system/integrations/{{id}}/health-report" + +type HealthReport struct { + UUID string `json:"uuid,omitempty"` // uuid for this health report + ProtocolVersion int `json:"protocol_version,omitempty"` // protocol version for "common" part of health reporting + Timestamp jstime.Datetime `json:"timestamp,omitempty"` // timestamp for this health report in UTC().Format(time.RFC3339) + Uptime *jstime.Duration `json:"uptime,omitempty"` // running time of integration instance + HealthReportInterval int `json:"health_report_interval,omitempty"` // time in seconds between health reports + HealthData HealthData `json:"health_data,omitempty"` // K8s-inventory agent specific health data +} + +type HealthData struct { + Type string `json:"type,omitempty"` // type of health data + Version int `json:"version,omitempty"` // format version + Errors HealthReportErrors `json:"errors,omitempty"` // list of errors + // Anything below this line is specific to k8s-inventory-agent + AccountK8sInventoryReports AccountK8SInventoryReports `json:"account_k8s_inventory_reports,omitempty"` // latest inventory reports per account +} + +type HealthReportErrors []string + +// AccountK8SInventoryReports holds per account information about latest inventory reports from the same batch set +type AccountK8SInventoryReports map[string]InventoryReportInfo + +type InventoryReportInfo struct { + ReportTimestamp string `json:"report_timestamp"` // Timestamp for the inventory report that was batched + Account string `json:"account_name"` // Name of account to which the inventory report belongs + SentAsUser string `json:"sent_as_user"` // User that the inventory report was sent as + BatchSize int `json:"batch_size"` // Number of batches that the inventory report was sent in + LastSuccessfulIndex int `json:"last_successful_index"` // Index of last successfully sent batch, -1 if none + HasErrors bool `json:"has_errors"` // HasErrors is true if any of the batches had an error, false otherwise + Batches []BatchInfo `json:"batches"` // Information about each inventory report batch +} + +type BatchInfo struct { + BatchIndex int `json:"batch_index,omitempty"` // Index of this inventory report batch item + SendTimestamp jstime.Datetime `json:"send_timestamp,omitempty"` // Timestamp when the batch was sent, in UTC().Format(time.RFC3339) + Error string `json:"error,omitempty"` // Any error this batch encountered when sent +} + +// GatedReportInfo The go routine that generates the inventory report must inform the go routine +// that sends health reports about the *latest* sent inventory reports. +// A buffered channel is FIFO so the earliest inserted items are returned first. No new items can +// be added when the buffer is full. This means that the information about the latest sent health +// reports will have to be dropped in such situations. We would rather drop the information about +// the *oldest* sent health reports. +// We therefore use a map (key'ed by account) to store information about the latest sent inventory +// reports This map is shared by the go routine that generates inventory reports and the go +// routine that sends health reports. Access to the map is coordinated by a mutex. +type GatedReportInfo struct { + AccessGate sync.RWMutex + AccountInventoryReports AccountK8SInventoryReports +} + +type _NewUUID func() uuid.UUID + +type _Now func() time.Time + +func GetGatedReportInfo() *GatedReportInfo { + return &GatedReportInfo{ + AccountInventoryReports: make(AccountK8SInventoryReports), + } +} + +func PeriodicallySendHealthReport(cfg *config.Application, ch intg.Channels, gatedReportInfo *GatedReportInfo) { + // Wait for registration with Enterprise to be completed + integration := <-ch.IntegrationObj + log.Info("Health reporting started") + + ticker := time.NewTicker(time.Duration(cfg.HealthReportIntervalSeconds) * time.Second) + + for { + log.Infof("Waiting %d seconds to send health report...", cfg.HealthReportIntervalSeconds) + + _, _ = sendHealthReport(cfg, integration, gatedReportInfo, uuid.New, time.Now) + // log.Debugf("Start new health report: %s", <-ticker.C) + <-ticker.C + } +} + +func sendHealthReport(cfg *config.Application, integration *intg.Integration, gatedReportInfo *GatedReportInfo, newUUID _NewUUID, _now _Now) (*HealthReport, error) { + healthReportID := newUUID().String() + lastReports := GetAccountReportInfoNoBlocking(gatedReportInfo, cfg, _now) + + now := _now().UTC() + integration.Uptime = &jstime.Duration{Duration: now.Sub(integration.StartedAt.Time)} + healthReport := HealthReport{ + UUID: healthReportID, + ProtocolVersion: healthProtocolVersion, + Timestamp: jstime.Datetime{Time: now}, + Uptime: integration.Uptime, + HealthData: HealthData{ + Type: healthDataType, + Version: healthDataVersion, + Errors: make(HealthReportErrors, 0), + AccountK8sInventoryReports: lastReports, + }, + HealthReportInterval: cfg.HealthReportIntervalSeconds, + } + + log.Infof("Sending health report (uuid:%s) covering %d accounts", healthReport.UUID, len(healthReport.HealthData.AccountK8sInventoryReports)) + requestBody, err := json.Marshal(healthReport) + if err != nil { + log.Errorf("failed to serialize integration registration as JSON: %v", err) + return nil, err + } + _, err = anchore.Post(requestBody, integration.UUID, HealthReportAPIPathV2, cfg.AnchoreDetails, "health report") + if err != nil { + log.Errorf("Failed to send health report to Anchore: %v", err) + return nil, err + } + return &healthReport, nil +} + +func GetAccountReportInfoNoBlocking(gatedReportInfo *GatedReportInfo, cfg *config.Application, _now _Now) AccountK8SInventoryReports { + locked := gatedReportInfo.AccessGate.TryLock() + + if locked { + defer gatedReportInfo.AccessGate.Unlock() + + log.Debugf("Removing inventory report info for accounts that are no longer active") + accountsToRemove := make(map[string]bool) + now := _now().UTC() + inactiveAge := 2 * float64(cfg.PollingIntervalSeconds) + + for account, reportInfo := range gatedReportInfo.AccountInventoryReports { + for _, batchInfo := range reportInfo.Batches { + log.Debugf("Last inv.report (time:%s, account:%s, batch:%d/%d, sent:%s error:'%s')", + reportInfo.ReportTimestamp, account, batchInfo.BatchIndex, reportInfo.BatchSize, + batchInfo.SendTimestamp, batchInfo.Error) + reportTime, err := time.Parse(time.RFC3339, reportInfo.ReportTimestamp) + if err != nil { + log.Errorf("failed to parse report_timestamp: %v", err) + continue + } + if now.Sub(reportTime).Seconds() > inactiveAge { + accountsToRemove[account] = true + } + } + } + + for accountToRemove := range accountsToRemove { + log.Debugf("Accounts no longer considered active: %s", accountToRemove) + delete(gatedReportInfo.AccountInventoryReports, accountToRemove) + } + + return gatedReportInfo.AccountInventoryReports + } + log.Debugf("Unable to obtain mutex lock to get aocount inventory report information. Continuing.") + return AccountK8SInventoryReports{} +} + +func SetReportInfoNoBlocking(accountName string, count int, reportInfo InventoryReportInfo, gatedReportInfo *GatedReportInfo) { + log.Debugf("Setting report (%s) for account name '%s': %d/%d %s %s", reportInfo.ReportTimestamp, accountName, + reportInfo.Batches[count].BatchIndex, reportInfo.BatchSize, reportInfo.Batches[count].SendTimestamp, + reportInfo.Batches[count].Error) + locked := gatedReportInfo.AccessGate.TryLock() + if locked { + defer gatedReportInfo.AccessGate.Unlock() + gatedReportInfo.AccountInventoryReports[accountName] = reportInfo + } else { + // we prioritize no blocking over actually bookkeeping info for every sent inventory report + log.Debugf("Unable to obtain mutex lock to include inventory report timestamped %s for %s: %d/%d %s in health report. Continuing.", + reportInfo.ReportTimestamp, accountName, reportInfo.Batches[count].BatchIndex, reportInfo.BatchSize, + reportInfo.Batches[count].SendTimestamp) + } +} diff --git a/pkg/healthreporter/healthreporter_test.go b/pkg/healthreporter/healthreporter_test.go new file mode 100644 index 0000000..8a3d100 --- /dev/null +++ b/pkg/healthreporter/healthreporter_test.go @@ -0,0 +1,234 @@ +package healthreporter + +import ( + "fmt" + "github.com/anchore/k8s-inventory/internal/anchore" + "github.com/anchore/k8s-inventory/internal/config" + jstime "github.com/anchore/k8s-inventory/internal/time" + "github.com/anchore/k8s-inventory/pkg/integration" + "github.com/google/uuid" + "github.com/h2non/gock" + "github.com/stretchr/testify/assert" + "net/http" + "reflect" + "testing" + "time" +) + +const mutexLocked = int64(1 << iota) // mutex is locked + +var ( + now = time.Date(2024, 10, 4, 10, 11, 12, 0, time.Local) + timestamps = []time.Time{now.Add(time.Millisecond * 10), now.Add(time.Millisecond * 20), now.Add(time.Millisecond * 30)} + uuids = []uuid.UUID{uuid.New(), uuid.New()} + + reportInfo = InventoryReportInfo{ + ReportTimestamp: now.UTC().Format(time.RFC3339), + Account: "testAccount", + SentAsUser: "testAccountUser", + BatchSize: 1, + LastSuccessfulIndex: 1, + HasErrors: false, + Batches: []BatchInfo{ + { + BatchIndex: 0, + SendTimestamp: jstime.Datetime{Time: time.Now().UTC()}, + Error: "", + }, + }, + } + reportInfoExpired = InventoryReportInfo{ + ReportTimestamp: now.Add(time.Second * (-3800)).UTC().Format(time.RFC3339), + Account: "testAccount2", + SentAsUser: "testAccount2User", + BatchSize: 1, + LastSuccessfulIndex: 1, + HasErrors: false, + Batches: []BatchInfo{ + { + BatchIndex: 0, + SendTimestamp: jstime.Datetime{Time: time.Now().UTC()}, + Error: "", + }, + }, + } +) + +func TestSendHealthReport(t *testing.T) { + defer gock.Off() + + integrationUUID := uuid.New().String() + postURL := fmt.Sprintf("/v2/system/integrations/%s/health-report", integrationUUID) + type want struct { + healthReport *HealthReport + err error + } + tests := []struct { + name string + want want + }{ + { + name: "successful health report", + want: want{ + healthReport: &HealthReport{ + UUID: uuids[0].String(), + ProtocolVersion: 1, + Timestamp: jstime.Datetime{Time: timestamps[1].UTC()}, + Uptime: &jstime.Duration{Duration: time.Millisecond * 20}, + HealthData: HealthData{ + Type: healthDataType, + Version: healthDataVersion, + Errors: make(HealthReportErrors, 0), + AccountK8sInventoryReports: AccountK8SInventoryReports{ + reportInfo.Account: reportInfo, + }, + }, + HealthReportInterval: 60, + }, + err: nil, + }, + }, + { + name: "failed health report", + want: want{ + healthReport: nil, + err: &anchore.APIClientError{ + HTTPStatusCode: http.StatusUnauthorized, + Message: "401 Unauthorized response from Anchore (during health report)", + Path: postURL, + Method: "POST", + }, + }, + }, + } + for _, tt := range tests { + cfg := config.Application{ + AnchoreDetails: config.AnchoreInfo{ + URL: "https://ancho.re", + User: "admin", + }, + PollingIntervalSeconds: 30 * 60, + HealthReportIntervalSeconds: 60, + } + integrationInstance := &integration.Integration{ + UUID: integrationUUID, + StartedAt: jstime.Datetime{Time: now.UTC()}, + Uptime: &jstime.Duration{Duration: time.Millisecond * 20}, + HealthReportInterval: 60, + } + gatedReportInfo := GetGatedReportInfo() + gatedReportInfo.AccountInventoryReports["testAccount"] = reportInfo + i := 0 + newUUIDMock := func() uuid.UUID { + _uuid := uuids[i] + i++ + return _uuid + } + j := 0 + nowMock := func() time.Time { + timestamp := timestamps[j] + j++ + return timestamp + } + switch tt.name { + case "successful health report": + gock.New("https://ancho.re"). + Post(postURL). + Reply(200) + case "failed health report": + gock.New("https://ancho.re"). + Post(postURL). + Reply(http.StatusUnauthorized) + } + t.Run(tt.name, func(t *testing.T) { + result, resultErr := sendHealthReport(&cfg, integrationInstance, gatedReportInfo, newUUIDMock, nowMock) + if tt.want.err != nil { + assert.Equal(t, tt.want.err, resultErr) + assert.Nil(t, result) + } else { + assert.NoError(t, resultErr) + assert.Equal(t, tt.want.healthReport, result) + } + }) + } +} + +func TestGetAccountReportInfoNoBlockingWhenObtainingLockRemovesExpired(t *testing.T) { + gatedReportInfo := GatedReportInfo{ + AccountInventoryReports: make(AccountK8SInventoryReports, 2), + } + gatedReportInfo.AccountInventoryReports[reportInfo.Account] = reportInfo + gatedReportInfo.AccountInventoryReports[reportInfoExpired.Account] = reportInfoExpired + + cfg := config.Application{ + PollingIntervalSeconds: 30 * 60, + } + + nowMock := func() time.Time { + return now + } + + result := GetAccountReportInfoNoBlocking(&gatedReportInfo, &cfg, nowMock) + assert.Equal(t, len(result), 1) + assert.Contains(t, result, reportInfo.Account) + assert.Equal(t, len(gatedReportInfo.AccountInventoryReports), 1) + assert.Contains(t, gatedReportInfo.AccountInventoryReports, reportInfo.Account) +} + +func TestGetAccountReportInfoBlockingWhenNotObtainingLockExpiredUnaffected(t *testing.T) { + gatedReportInfo := GatedReportInfo{ + AccountInventoryReports: make(AccountK8SInventoryReports, 2), + } + gatedReportInfo.AccountInventoryReports[reportInfo.Account] = reportInfo + gatedReportInfo.AccountInventoryReports[reportInfoExpired.Account] = reportInfoExpired + gatedReportInfo.AccessGate.Lock() + + cfg := config.Application{ + PollingIntervalSeconds: 3 * 60, + } + + nowMock := func() time.Time { + return now + } + + result := GetAccountReportInfoNoBlocking(&gatedReportInfo, &cfg, nowMock) + assert.Equal(t, len(result), 0) + assert.Equal(t, len(gatedReportInfo.AccountInventoryReports), 2) + assert.Contains(t, gatedReportInfo.AccountInventoryReports, reportInfo.Account) + assert.Contains(t, gatedReportInfo.AccountInventoryReports, reportInfoExpired.Account) + // check mutex is still locked after operation + mutexState := reflect.ValueOf(&gatedReportInfo.AccessGate).Elem().FieldByName("w").FieldByName("state") + assert.Equal(t, mutexState.Int()&mutexLocked, mutexLocked) +} + +func TestSetReportInfoNoBlockingSetsWhenObtainingLock(t *testing.T) { + gatedReportInfo := GatedReportInfo{ + AccountInventoryReports: make(AccountK8SInventoryReports, 1), + } + accountName := "testAccount" + count := 0 + + SetReportInfoNoBlocking(accountName, count, reportInfo, &gatedReportInfo) + + assert.Equal(t, reportInfo, gatedReportInfo.AccountInventoryReports[accountName]) + // check mutex is unlocked after operation + mutexState := reflect.ValueOf(&gatedReportInfo.AccessGate).Elem().FieldByName("w").FieldByName("state") + assert.Equal(t, mutexState.Int()&mutexLocked, int64(0)) +} + +func TestSetReportInfoNoBlockingSkipsWhenLockAlreadyTaken(t *testing.T) { + gatedReportInfo := GatedReportInfo{ + AccountInventoryReports: make(AccountK8SInventoryReports, 1), + } + gatedReportInfo.AccessGate.Lock() + + accountName := "testAccount" + count := 0 + + SetReportInfoNoBlocking(accountName, count, reportInfo, &gatedReportInfo) + + assert.NotContains(t, gatedReportInfo.AccountInventoryReports, accountName) + // check mutex is still locked after operation + mutexState := reflect.ValueOf(&gatedReportInfo.AccessGate).Elem().FieldByName("w").FieldByName("state") + assert.Equal(t, mutexState.Int()&mutexLocked, mutexLocked) +} diff --git a/pkg/integration/integration.go b/pkg/integration/integration.go new file mode 100644 index 0000000..79e55dd --- /dev/null +++ b/pkg/integration/integration.go @@ -0,0 +1,409 @@ +package integration + +import ( + "context" + "encoding/json" + "fmt" + "github.com/anchore/k8s-inventory/pkg/client" + "github.com/google/uuid" + "github.com/hashicorp/go-version" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "os" + "time" + + "github.com/anchore/k8s-inventory/internal/anchore" + "github.com/anchore/k8s-inventory/internal/config" + "github.com/anchore/k8s-inventory/internal/log" + jstime "github.com/anchore/k8s-inventory/internal/time" +) + +var requiredAnchoreVersion, _ = version.NewVersion("5.11") + +var inventoryReportingActive = false + +const Type = "k8s_inventory_agent" +const RegisterAPIPathV2 = "v2/system/integrations/registration" +const AppVersionLabel = "app.kubernetes.io/version" + +type Channels struct { + IntegrationObj chan *Integration + HealthReportingEnabled chan bool + InventoryReportingEnabled chan bool +} + +// HealthStatus reflects the state of the Integration wrt any errors +// encountered when performing its tasks +type HealthStatus struct { + State string `json:"state,omitempty"` // state of the integration HEALTHY or UNHEALTHY + Reason string `json:"reason,omitempty"` + Details any `json:"details,omitempty"` +} + +// LifeCycleStatus reflects the state of the Integration from the perspective of Enterprise +type LifeCycleStatus struct { + State string `json:"state,omitempty"` // lifecycle state REGISTERED, ACTIVE, DEGRADED, DEACTIVATED + Reason string `json:"reason,omitempty"` + Details any `json:"details,omitempty"` + UpdatedAt jstime.Datetime `json:"updated_at,omitempty"` +} + +type Integration struct { + UUID string `json:"uuid,omitempty"` // uuid provided to this integration instance during + Type string `json:"type,omitempty"` // type of integration (e.g., 'anchore-k8s-agent') + Name string `json:"name,omitempty"` // name of the integration instance (e.g., k8s-agent-admin') + Description string `json:"description,omitempty"` // short description of integration instance + Version string `json:"version,omitempty"` // version of the integration instance + ReportedStatus *HealthStatus `json:"reported_status,omitempty"` // health status of the integration (Read-only) + IntegrationStatus *LifeCycleStatus `json:"integration_status,omitempty"` // lifecycle status of the integration (Read-only) + StartedAt jstime.Datetime `json:"started_at,omitempty"` // timestamp when integration instance was started in UTC().Format(time.RFC3339) + LastSeen *jstime.Datetime `json:"last_seen,omitempty"` // timestamp of last received health report from integration instance (Read-only) + Uptime *jstime.Duration `json:"uptime,omitempty"` // running time of integration instance + Username string `json:"username,omitempty"` // user that the integration instance authenticates as during registration + AccountName string `json:"account_name,omitempty"` // default account that the integration instance authenticates as during registration + ExplicitlyAccountBound []string `json:"explicitly_account_bound,omitempty"` // accounts that the integration instance is explicitly configured to handle + Accounts []string `json:"accounts,omitempty"` // names of accounts that the integration instance handled recently + Namespaces []string `json:"namespaces,omitempty"` // namespaces that the integration instance handles + Configuration map[string]interface{} `json:"configuration,omitempty"` // configuration for the integration instance + ClusterName string `json:"cluster_name,omitempty"` // name of cluster where the integration instance runs + Namespace string `json:"namespace,omitempty"` // uuid for namespace that the integration instance belongs to + HealthReportInterval int `json:"health_report_interval,omitempty"` // time in seconds between health reports + RegistrationID string `json:"registration_id,omitempty"` // uuid that integration used during registration + RegistrationInstanceID string `json:"registration_instance_id,omitempty"` // instance id used by the integration during registration +} + +type Registration struct { + RegistrationID string `json:"registration_id,omitempty"` // uyid that identifies integration during registration + RegistrationInstanceID string `json:"registration_instance_id,omitempty"` // identifier that make integration instance unique among its replicas during registration + Type string `json:"type,omitempty"` // type of integration (e.g., 'anchore-k8s-agent') + Name string `json:"name,omitempty"` // name of the integration instance (e.g., k8s-agent-admin') + Description string `json:"description,omitempty"` // short description of integration instance + Version string `json:"version,omitempty"` // version of the integration instance + StartedAt jstime.Datetime `json:"started_at,omitempty"` // timestamp when integration instance was started in UTC().Format(time.RFC3339) + Uptime *jstime.Duration `json:"uptime,omitempty"` // running time of integration instance + Username string `json:"username,omitempty"` // user that the integration instance authenticates as during registration + ExplicitlyAccountBound []string `json:"explicitly_account_bound,omitempty"` // accounts that the integration instance is explicitly configured to handle + Namespaces []string `json:"namespaces,omitempty"` // namespaces that the integration instance is explicitly configured to handle + Configuration *config.Application `json:"configuration,omitempty"` // configuration for the integration instance + ClusterName string `json:"cluster_name,omitempty"` // name of cluster where the integration instance runs + Namespace string `json:"namespace,omitempty"` // uuid for namespace that the integration instance belongs to + HealthReportInterval int `json:"health_report_interval,omitempty"` // time in seconds between health reports +} + +type _NewUUID func() uuid.UUID + +type _Now func() time.Time + +func PerformRegistration(appConfig *config.Application, ch Channels) (*Integration, error) { + defer closeChannels(ch) + + _, err := awaitVersion(appConfig.AnchoreDetails, ch, -1, 2*time.Second, 24*time.Hour) + if err != nil { + return nil, err + } + + namespace := os.Getenv("POD_NAMESPACE") + name := os.Getenv("HOSTNAME") + + k8sClient := getK8sClient(appConfig) + registrationInfo := getRegistrationInfo(appConfig, k8sClient, namespace, name, uuid.New, time.Now) + + // Register this agent with enterprise + registeredIntegration, err := register(registrationInfo, appConfig.AnchoreDetails, -1, + 2*time.Second, 10*time.Minute, time.Now) + if err != nil { + log.Errorf("Unable to register agent: %v", err) + return nil, err + } + + enableHealthReporting(ch, registeredIntegration) + + if !inventoryReportingActive { + enableInventoryReporting(ch) + } + + return registeredIntegration, nil +} + +func awaitVersion(anchoreDetails config.AnchoreInfo, ch Channels, maxRetry int, startBackoff, maxBackoff time.Duration) (*anchore.Version, error) { + attempt := 0 + for { + retry := false + + anchoreVersion, err := anchore.GetVersion(anchoreDetails) + if err == nil { + ver, vErr := version.NewVersion(anchoreVersion.Service.Version) + if vErr != nil { + log.Errorf("Failed to parse received service version: %v. Will try again in %s", vErr, startBackoff) + retry = true + } else { + log.Infof("Successfully determined service version: %s for Enterprise: %s", + anchoreVersion.Service.Version, anchoreDetails.URL) + if ver.GreaterThanOrEqual(requiredAnchoreVersion) { + log.Infof("Proceeding with integration registration since Enterprise v%s supports that", anchoreVersion.Service.Version) + return anchoreVersion, nil + } + if !inventoryReportingActive { + log.Infof("Proceeding without integration registration and health reporting since Enterprise v%s does not support that", + anchoreVersion.Service.Version) + enableInventoryReporting(ch) + } + retry = true + } + } + + attempt++ + if maxRetry >= 0 && attempt > maxRetry { + log.Infof("Failed to get Enterprise version after %d attempts", attempt) + return nil, fmt.Errorf("failed to get Enterprise version after %d attempts", attempt) + } + + if anchore.ServerIsOffline(err) { + log.Infof("Anchore is offline. Will try again in %s", startBackoff) + retry = true + } + + if retry { + time.Sleep(startBackoff) + if startBackoff < maxBackoff { + startBackoff = min(startBackoff*2, maxBackoff) + } + continue + } + + log.Errorf("Failed to get service version for Enterprise: %s, %v", anchoreDetails.URL, err) + return nil, err + } +} + +func GetChannels() Channels { + return Channels{ + IntegrationObj: make(chan *Integration), + HealthReportingEnabled: make(chan bool, 1), // buffered to prevent registration from blocking + InventoryReportingEnabled: make(chan bool), + } +} + +func closeChannels(ch Channels) { + close(ch.IntegrationObj) + close(ch.HealthReportingEnabled) + close(ch.InventoryReportingEnabled) +} + +func enableHealthReporting(ch Channels, integration *Integration) { + log.Info("Activating health reporting") + // signal health reporting to start by providing it with the integration + ch.IntegrationObj <- integration + // signal inventory reporting to populate health report info when generating inventory reports + ch.HealthReportingEnabled <- true +} + +func enableInventoryReporting(ch Channels) { + inventoryReportingActive = true + log.Info("Activating inventory reporting") + // signal inventory reporting to start + ch.InventoryReportingEnabled <- true +} + +func getK8sClient(appConfig *config.Application) *client.Client { + kubeconfig, err := client.GetKubeConfig(appConfig) + if err != nil { + log.Errorf("Failed to get Kubernetes config: %v", err) + return nil + } + + clientset, err := client.GetClientSet(kubeconfig) + if err != nil { + log.Errorf("Failed to get k8s client set: %v", err) + return nil + } + + return &client.Client{ + Clientset: clientset, + } +} + +func register(registrationInfo *Registration, anchoreDetails config.AnchoreInfo, maxRetry int, + startBackoff, maxBackoff time.Duration, now _Now) (*Integration, error) { + var err error + + attempt := 0 + for { + var registeredIntegration *Integration + + registeredIntegration, err = doRegister(registrationInfo, anchoreDetails, now) + if err == nil { + log.Infof("Successfully registered %s agent: %s (registration_id:%s / registration_instance_id:%s) with %s", + registrationInfo.Type, registrationInfo.Name, registrationInfo.RegistrationID, + registrationInfo.RegistrationInstanceID, anchoreDetails.URL) + log.Infof("This agent's integration uuid is %s", registeredIntegration.UUID) + return registeredIntegration, nil + } + + attempt++ + if maxRetry >= 0 && attempt > maxRetry { + log.Errorf("Failed to register agent (registration_id:%s / registration_instance_id:%s) after %d attempts", + registrationInfo.RegistrationID, registrationInfo.RegistrationInstanceID, attempt) + return nil, fmt.Errorf("failed to register after %d attempts", attempt) + } + + if anchore.ServerIsOffline(err) { + log.Infof("Anchore is offline. Will try again in %s", startBackoff) + time.Sleep(startBackoff) + if startBackoff < maxBackoff { + startBackoff = min(startBackoff*2, maxBackoff) + } + continue + } + + if anchore.UserLacksAPIPrivileges(err) { + log.Errorf("Specified user lacks required privileges to register and send health reports %v", err) + return nil, err + } + + if anchore.IncorrectCredentials(err) { + log.Errorf("Failed to register due to invalid credentials (wrong username or password") + return nil, err + } + + log.Errorf("Failed to register integration agent (registration_id:%s / regitration_instance_id:%s): %v", + registrationInfo.RegistrationID, registrationInfo.RegistrationInstanceID, err) + return nil, err + } +} + +func doRegister(registrationInfo *Registration, anchoreDetails config.AnchoreInfo, now _Now) (*Integration, error) { + log.Infof("Registering %s agent: %s (registration_id:%s / regitration_instance_id:%s) with %s", + registrationInfo.Type, registrationInfo.Name, registrationInfo.RegistrationID, + registrationInfo.RegistrationInstanceID, anchoreDetails.URL) + + registrationInfo.Uptime = &jstime.Duration{Duration: now().UTC().Sub(registrationInfo.StartedAt.Time)} + requestBody, err := json.Marshal(registrationInfo) + if err != nil { + return nil, fmt.Errorf("failed to serialize integration registration as JSON: %w", err) + } + responseBody, err := anchore.Post(requestBody, "", RegisterAPIPathV2, anchoreDetails, "integration registration") + if err != nil { + return nil, err + } + registeredIntegration := Integration{} + err = json.Unmarshal(*responseBody, ®isteredIntegration) + return ®isteredIntegration, err +} + +func getRegistrationInfo(appConfig *config.Application, k8sClient *client.Client, + namespace string, name string, newUUID _NewUUID, now _Now) *Registration { + var registrationID, registrationInstanceID, instanceName, appVersion, description string + + log.Debugf("Attempting to determine values from K8s Deployment for Pod: %s in Namespace: %s", + name, namespace) + registrationID, instanceName, appVersion = getInstanceDataFromK8s(k8sClient, namespace, name) + + if appConfig.Registration.RegistrationID != "" { + log.Debugf("Using registration_id specified in config: %s", appConfig.Registration.RegistrationID) + registrationID = appConfig.Registration.RegistrationID + } + + if registrationID == "" { + log.Debugf("The registration_id value is not valid. Generating UUIDv4 to use as registration_id") + registrationID = newUUID().String() + } + + if name != "" { + log.Debugf("Using registration_instance_id: %s", name) + registrationInstanceID = name + } else { + log.Debugf("Generating UUIDv4 to use as registration_instance_id") + registrationInstanceID = newUUID().String() + } + + if appConfig.Registration.IntegrationName != "" { + log.Debugf("Using name for integration specified in config: %s", appConfig.Registration.IntegrationName) + instanceName = appConfig.Registration.IntegrationName + } + + if appConfig.Registration.IntegrationDescription != "" { + log.Debugf("Using description for integration specified in config: %s", + appConfig.Registration.IntegrationDescription) + description = appConfig.Registration.IntegrationDescription + } + + log.Debugf("Integration registration_id: %s, registration_instance_id: %s, name: %s, description: %s", + registrationID, registrationInstanceID, instanceName, description) + + explicitlyAccountBound, namespaces := getAccountsAndNamespacesForAgent(appConfig) + + instance := Registration{ + RegistrationID: registrationID, + RegistrationInstanceID: registrationInstanceID, + Type: Type, + Name: instanceName, + Description: description, + Version: appVersion, + StartedAt: jstime.Datetime{Time: now().UTC()}, + Uptime: new(jstime.Duration), + Username: appConfig.AnchoreDetails.User, + ExplicitlyAccountBound: explicitlyAccountBound, + Namespaces: namespaces, + Configuration: nil, + ClusterName: appConfig.KubeConfig.Cluster, + Namespace: namespace, + HealthReportInterval: appConfig.HealthReportIntervalSeconds, + } + return &instance +} + +func getInstanceDataFromK8s(k8sClient *client.Client, namespace string, podName string) (string, string, string) { + if k8sClient == nil { + log.Errorf("Kubernetes client not initialized. Unable to interact with K8s cluster.") + return "", "", "" + } + opts := metav1.GetOptions{} + pod, err := k8sClient.Clientset.CoreV1().Pods(namespace).Get(context.Background(), podName, opts) + if err != nil { + log.Errorf("failed to get pod: %v", err) + return "", "", "" + } + replicaSetName := pod.ObjectMeta.OwnerReferences[0].Name + replicaSet, err := k8sClient.Clientset.AppsV1().ReplicaSets(namespace).Get(context.Background(), replicaSetName, opts) + if err != nil { + log.Errorf("failed to get replica set: %v", err) + return "", "", "" + } + deploymentName := replicaSet.ObjectMeta.OwnerReferences[0].Name + deployment, err := k8sClient.Clientset.AppsV1().Deployments(namespace).Get(context.Background(), deploymentName, opts) + if err != nil { + log.Errorf("failed to get deployment: %v", err) + return "", "", "" + } + + appVersion := deployment.Labels[AppVersionLabel] + registrationID := fmt.Sprint("", deployment.ObjectMeta.UID) + instanceName := deploymentName + log.Debugf("Determined integration values for agent from K8s, registration_id: %s, instance_name: %s, appVersion: %s", + registrationID, instanceName, appVersion) + return registrationID, instanceName, appVersion +} + +func getAccountsAndNamespacesForAgent(appConfig *config.Application) ([]string, []string) { + accountSet := make(map[string]bool) + namespaceSet := make(map[string]bool) + + // pick up accounts that are explicitly listed in the config + for account, accountRouteDetails := range appConfig.AccountRoutes { + accountSet[account] = true + for _, namespace := range accountRouteDetails.Namespaces { + namespaceSet[namespace] = true + } + } + accounts := make([]string, 0, len(accountSet)) + for account := range accountSet { + accounts = append(accounts, account) + } + + namespaces := make([]string, 0, len(namespaceSet)) + for namespace := range namespaceSet { + namespaces = append(namespaces, namespace) + } + + return accounts, namespaces +} diff --git a/pkg/integration/integration_test.go b/pkg/integration/integration_test.go new file mode 100644 index 0000000..fa744ab --- /dev/null +++ b/pkg/integration/integration_test.go @@ -0,0 +1,919 @@ +package integration + +import ( + "fmt" + "github.com/anchore/k8s-inventory/internal/anchore" + "github.com/anchore/k8s-inventory/internal/config" + jstime "github.com/anchore/k8s-inventory/internal/time" + "github.com/anchore/k8s-inventory/pkg/client" + "github.com/google/uuid" + "github.com/h2non/gock" + "github.com/stretchr/testify/assert" + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" + "net" + "net/http" + "net/url" + "os" + "slices" + "syscall" + "testing" + "time" +) + +var ( + versionMap511 = map[string]interface{}{ + "service": map[string]interface{}{ + "version": "5.11.0", + }, + "api": map[string]interface{}{ + "version": "2", + }, + "db": map[string]interface{}{ + "schema_version": "5110", + }, + } + + versionMap510 = map[string]interface{}{ + "service": map[string]interface{}{ + "version": "5.10.0", + }, + "api": map[string]interface{}{ + "version": "2", + }, + "db": map[string]interface{}{ + "schema_version": "510", + }, + } + + versionObj511 = anchore.Version{ + API: struct { + Version string `json:"version"` + }(struct { + Version string + }{"2"}), + DB: struct { + SchemaVersion string `json:"schema_version"` + }(struct { + SchemaVersion string + }{"5110"}), + Service: struct { + Version string `json:"version"` + }(struct { + Version string + }{"5.11.0"}), + } + + now = time.Date(2024, 10, 4, 10, 11, 12, 0, time.Local) + timestamps = []time.Time{now.Add(time.Second * 2), now.Add(time.Second * 4), now.Add(time.Second * 6)} + + integration = map[string]interface{}{ + "uuid": "000d1e60-cb05-4cce-8d1e-60cb052cce1f", + "type": "k8s_inventory_agent", + "name": "k8s-inv-agent", + "description": "k8s-agent with health reporting", + "version": "2.0", + "reported_status": map[string]interface{}{"state": "HEALTHY"}, + "integration_status": map[string]interface{}{"state": "REGISTERED"}, + "started_at": now.UTC().Format(time.RFC3339), + "last_seen": nil, + "uptime": 2, + "username": "account0User", + "account_name": "account0", + "explicitly_account_bound": []interface{}{}, + "accounts": []interface{}{}, + "namespaces": []interface{}{}, + "cluster_name": "Docker-Desktop", + "namespace": "default", + "health_report_interval": 60, + "registration_id": "de2c3c58-4c20-4d87-ac3c-584c201d875a", + "registration_instance_id": "45743315", + } + + integrationInstance = Integration{ + UUID: "000d1e60-cb05-4cce-8d1e-60cb052cce1f", + Type: "k8s_inventory_agent", + Name: "k8s-inv-agent", + Description: "k8s-agent with health reporting", + Version: "2.0", + ReportedStatus: nil, + IntegrationStatus: nil, + StartedAt: jstime.Datetime{Time: now.UTC()}, + LastSeen: nil, + Uptime: nil, + Username: "account0User", + AccountName: "account0", + ExplicitlyAccountBound: []string{}, + Accounts: []string{}, + Namespaces: []string{}, + ClusterName: "Docker-Desktop", + Namespace: "default", + HealthReportInterval: 60, + RegistrationID: "de2c3c58-4c20-4d87-ac3c-584c201d875a", + RegistrationInstanceID: "45743315", + } + + pod = v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + UID: "test-pod-uid", + Annotations: map[string]string{ + "test-annotation": "test-value", + }, + Labels: map[string]string{ + "app.kubernetes.io/managed-by": "Helm", + "app.kubernetes.io/name": "k8s-inventory", + "app.kubernetes.io/version": "1.7.0", + "helm.sh/chart": "k8s-inventory-0.5.0", + }, + Namespace: "test-namespace", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "ReplicaSet", + Name: "test-replicaset", + UID: "test-replicaset-uid", + }, + }, + }, + } + + replicaSet = appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-replicaset", + UID: "test-replicaset-uid", + Annotations: map[string]string{ + "meta.helm.sh/release-name": "my-k8s-inventory-release", + "meta.helm.sh/release-namespace": "test-namespace", + }, + Labels: map[string]string{ + "app.kubernetes.io/managed-by": "Helm", + "app.kubernetes.io/name": "k8s-inventory", + "app.kubernetes.io/version": "1.7.0", + "helm.sh/chart": "k8s-inventory-0.5.0", + }, + Namespace: "test-namespace", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Name: "test-deployment-k8s-inventory", + Kind: "Deployment", + UID: "test-deployment-uid", + }, + }, + }, + } + + deployment = appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment-k8s-inventory", + UID: "test-deployment-uid", + Annotations: map[string]string{ + "meta.helm.sh/release-name": "my-k8s-inventory-release", + "meta.helm.sh/release-namespace": "test-namespace", + }, + Labels: map[string]string{ + "app.kubernetes.io/managed-by": "Helm", + "app.kubernetes.io/name": "k8s-inventory", + "app.kubernetes.io/version": "1.7.0", + "helm.sh/chart": "k8s-inventory-0.5.0", + }, + Namespace: "test-namespace", + }, + } +) + +func TestAwaitVersion(t *testing.T) { + defer gock.Off() + + anchoreDetails := config.AnchoreInfo{ + URL: "https://ancho.re", + User: "admin", + } + type want struct { + version *anchore.Version + err error + } + tests := []struct { + name string + want want + }{ + { + name: "successful await version, 5.11.0", + want: want{ + version: &versionObj511, + err: nil, + }, + }, + { + name: "successful await version, <5.11.0", + want: want{ + version: nil, + err: fmt.Errorf("failed to get Enterprise version after 2 attempts"), + }, + }, + { + name: "enterprise offline on first attempt", + want: want{ + version: &versionObj511, + err: nil, + }, + }, + { + name: "other error", + want: want{ + version: nil, + err: &anchore.APIClientError{ + HTTPStatusCode: http.StatusBadRequest, + Message: "400 Bad Request response from Anchore (during version get)", + Path: "/version", + Method: "GET", + }, + }, + }, + } + for _, tt := range tests { + gock.Flush() + + ch := GetChannels() + valueSet := false + var inventoryReportingEnabledValue bool + + switch tt.name { + case "successful await version, 5.11.0": + gock.New("https://ancho.re"). + Get("/version"). + Reply(200). + JSON(versionMap511) + case "successful await version, <5.11.0": + gock.New("https://ancho.re"). + Get("/version"). + Persist(). + Reply(200). + JSON(versionMap510) + go func() { + var setBeforeClosed bool + inventoryReportingEnabledValue, setBeforeClosed = <-ch.InventoryReportingEnabled + valueSet = true + assert.True(t, setBeforeClosed) + }() + case "enterprise offline on first attempt": + gock.New("https://ancho.re"). + Get("/version"). + ReplyError(&url.Error{ + Op: "Post", + URL: "https://ancho.re/version", + Err: &net.OpError{ + Op: "dial", + Net: "tcp", + Source: nil, + Addr: &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 8228, + Zone: "", + }, + Err: &os.SyscallError{ + Syscall: "connect", + Err: syscall.ECONNREFUSED, + }, + }, + }) + gock.New("https://ancho.re"). + Get("/version"). + Reply(200). + JSON(versionMap511) + case "other error": + gock.New("https://ancho.re"). + Get("/version"). + Reply(http.StatusBadRequest) + } + t.Run(tt.name, func(t *testing.T) { + version, err := awaitVersion(anchoreDetails, ch, 1, 500*time.Millisecond, 10*time.Minute) + if tt.want.err != nil { + assert.Error(t, err) + assert.Equal(t, err, tt.want.err) + if tt.name == "successful await version, <5.11.0" { + // this is not a perfect way of testing pre 5.11 release case but better than nothing + assert.True(t, valueSet) + assert.True(t, inventoryReportingEnabledValue) + } + assert.Nil(t, version) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.want.version, version) + } + }) + } +} + +func TestCloseChannels(t *testing.T) { + ch := GetChannels() + closeChannels(ch) + result1, isNotClosed := <-ch.IntegrationObj + assert.Nil(t, result1) + assert.False(t, isNotClosed) + result2, isNotClosed := <-ch.HealthReportingEnabled + assert.False(t, result2) + assert.False(t, isNotClosed) + result3, isNotClosed := <-ch.InventoryReportingEnabled + assert.False(t, result3) + assert.False(t, isNotClosed) +} + +func TestEnableHealthReporting(t *testing.T) { + ch := GetChannels() + integrationInstance := &Integration{ + UUID: "some uuid", + } + go enableHealthReporting(ch, integrationInstance) + result1 := <-ch.IntegrationObj + result2 := <-ch.HealthReportingEnabled + assert.Equal(t, integrationInstance, result1) + assert.True(t, result2) +} + +func TestEnableInventoryReporting(t *testing.T) { + ch := GetChannels() + go enableInventoryReporting(ch) + result := <-ch.InventoryReportingEnabled + assert.True(t, result) +} + +func TestRegister(t *testing.T) { + defer gock.Off() + + type args struct { + uptime float64 + } + type want struct { + integration Integration + ReportedStatus HealthStatus + IntegrationStatus LifeCycleStatus + Uptime jstime.Duration + err error + nilIntegration bool + } + tests := []struct { + name string + args args + want want + }{ + { + name: "successful registration on first attempt", + args: args{ + uptime: 2.0, + }, + want: want{ + integration: integrationInstance, + ReportedStatus: HealthStatus{ + State: "HEALTHY", + }, + IntegrationStatus: LifeCycleStatus{ + State: "REGISTERED", + }, + Uptime: jstime.Duration{Duration: time.Second * 2}, + err: nil, + }, + }, + { + name: "enterprise offline on first attempt", + args: args{ + uptime: 4.0, + }, + want: want{ + integration: integrationInstance, + ReportedStatus: HealthStatus{ + State: "HEALTHY", + }, + IntegrationStatus: LifeCycleStatus{ + State: "REGISTERED", + }, + Uptime: jstime.Duration{Duration: time.Second * 4}, + err: nil, + }, + }, + { + name: "abort registration on max attempts", + args: args{ + uptime: 4.0, + }, + want: want{ + integration: Integration{}, + ReportedStatus: HealthStatus{}, + IntegrationStatus: LifeCycleStatus{}, + Uptime: jstime.Duration{Duration: time.Second * 4}, + err: fmt.Errorf("failed to register after %d attempts", 2), + }, + }, + { + name: "user lacks api privileges", + args: args{ + uptime: 2.0, + }, + want: want{ + integration: Integration{}, + ReportedStatus: HealthStatus{}, + IntegrationStatus: LifeCycleStatus{}, + Uptime: jstime.Duration{Duration: time.Second * 2}, + err: &anchore.APIClientError{ + HTTPStatusCode: http.StatusForbidden, + Message: "403 Forbidden response from Anchore (during integration registration)", + Path: "/v2/system/integrations/registration", + Method: "POST", + APIErrorDetails: &anchore.APIErrorDetails{ + Message: "Not authorized. Requires permissions: domain=account0 action=registerIntegration target=", + Detail: map[string]interface{}{}, + HTTPCode: http.StatusForbidden, + }, + }, + }, + }, + { + name: "wrong user credentials", + args: args{ + uptime: 2.0, + }, + want: want{ + integration: Integration{}, + ReportedStatus: HealthStatus{}, + IntegrationStatus: LifeCycleStatus{}, + Uptime: jstime.Duration{Duration: time.Second * 2}, + err: &anchore.APIClientError{ + HTTPStatusCode: http.StatusUnauthorized, + Message: "401 Unauthorized response from Anchore (during integration registration)", + Path: "/v2/system/integrations/registration", + Method: "POST", + }, + }, + }, + { + name: "other error", + args: args{ + uptime: 2.0, + }, + want: want{ + integration: Integration{}, + ReportedStatus: HealthStatus{}, + IntegrationStatus: LifeCycleStatus{}, + Uptime: jstime.Duration{Duration: time.Second * 2}, + err: &anchore.APIClientError{ + HTTPStatusCode: http.StatusBadRequest, + Message: "400 Bad Request response from Anchore (during integration registration)", + Path: "/v2/system/integrations/registration", + Method: "POST", + }, + }, + }, + } + for _, tt := range tests { + gock.Flush() + + anchoreDetails := config.AnchoreInfo{ + URL: "https://ancho.re", + User: "admin", + } + registrationInfo := &Registration{ + RegistrationID: "test-registration-id", + RegistrationInstanceID: "1111223344", + Type: Type, + Name: "test k8s inventory", + Description: "Description from config", + Version: "", + StartedAt: jstime.Datetime{Time: now.UTC()}, + Uptime: new(jstime.Duration), + Username: "admin", + ExplicitlyAccountBound: []string{"account3"}, + Namespaces: []string{"ns3"}, + Configuration: nil, + ClusterName: "k8s-cluster1", + Namespace: "test-namespace", + HealthReportInterval: 60, + } + integration["uptime"] = tt.args.uptime + + j := 0 + nowMock := func() time.Time { + timestamp := timestamps[j] + j++ + return timestamp + } + + switch tt.name { + case "successful registration on first attempt": + gock.New("https://ancho.re"). + Post("v2/system/integrations/registration"). + Reply(200). + JSON(integration) + case "enterprise offline on first attempt": + gock.New("https://ancho.re"). + Post("v2/system/integrations/registration"). + ReplyError(&url.Error{ + Op: "Post", + URL: "http://127.0.0.1:8228/v2/system/integrations/registration", + Err: &net.OpError{ + Op: "dial", + Net: "tcp", + Source: nil, + Addr: &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 8228, + Zone: "", + }, + Err: &os.SyscallError{ + Syscall: "connect", + Err: syscall.ECONNREFUSED, + }, + }, + }) + gock.New("https://ancho.re"). + Post("v2/system/integrations/registration"). + Reply(200). + JSON(integration) + case "abort registration on max attempts": + gock.New("https://ancho.re"). + Post("v2/system/integrations/registration"). + Persist(). + ReplyError(&url.Error{ + Op: "Post", + URL: "http://127.0.0.1:8228/v2/system/integrations/registration", + Err: &net.OpError{ + Op: "dial", + Net: "tcp", + Source: nil, + Addr: &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 8228, + Zone: "", + }, + Err: &os.SyscallError{ + Syscall: "connect", + Err: syscall.ECONNREFUSED, + }, + }, + }) + case "user lacks api privileges": + gock.New("https://ancho.re"). + Post("v2/system/integrations/registration"). + Reply(http.StatusForbidden). + JSON(map[string]interface{}{ + "message": "Not authorized. Requires permissions: domain=account0 action=registerIntegration target=", + "detail": map[string]interface{}{}, + "httpcode": http.StatusForbidden, + }) + case "wrong user credentials": + gock.New("https://ancho.re"). + Post("v2/system/integrations/registration"). + Reply(http.StatusUnauthorized) + case "other error": + gock.New("https://ancho.re"). + Post("v2/system/integrations/registration"). + Reply(http.StatusBadRequest) + } + t.Run(tt.name, func(t *testing.T) { + registeredIntegration, err := register(registrationInfo, anchoreDetails, 1, + 500*time.Millisecond, 10*time.Minute, nowMock) + if tt.want.err != nil { + assert.Error(t, err) + assert.Equal(t, err, tt.want.err) + assert.Nil(t, registeredIntegration) + } else { + assert.NoError(t, err) + if tt.want.nilIntegration { + assert.Nil(t, registeredIntegration) + } else { + integrationStatus := registeredIntegration.IntegrationStatus + registeredIntegration.IntegrationStatus = nil + reportedStatus := registeredIntegration.ReportedStatus + registeredIntegration.ReportedStatus = nil + uptime := registeredIntegration.Uptime + registeredIntegration.Uptime = nil + assert.Equal(t, tt.want.IntegrationStatus, *integrationStatus) + assert.Equal(t, tt.want.ReportedStatus, *reportedStatus) + assert.Equal(t, tt.want.Uptime, *uptime) + assert.Equal(t, tt.want.integration, *registeredIntegration) + } + assert.Equal(t, tt.want.Uptime, *registrationInfo.Uptime) + } + }) + } +} + +func TestGetRegistrationInfo(t *testing.T) { + uuids := []uuid.UUID{uuid.New(), uuid.New()} + timestamps := []time.Time{time.Now()} + + type args struct { + config *config.Application + c *client.Client + namespace string + name string + } + tests := []struct { + name string + args args + want *Registration + }{ + { + name: "Generates UUIDs for registration-id and registration-instance-id", + args: args{ + config: &config.Application{ + AnchoreDetails: config.AnchoreInfo{ + User: "admin", + }, + AccountRoutes: config.AccountRoutes{ + "account3": config.AccountRouteDetails{ + Namespaces: []string{"ns3"}, + }, + }, + KubeConfig: config.KubeConf{ + Cluster: "k8s-cluster1", + }, + HealthReportIntervalSeconds: 60, + }, + c: nil, + namespace: "test-namespace", + name: "", + }, + want: &Registration{ + RegistrationID: uuids[0].String(), + RegistrationInstanceID: uuids[1].String(), + Type: Type, + Name: "", + Description: "", + Version: "", + StartedAt: jstime.Datetime{Time: timestamps[0].UTC()}, + Uptime: new(jstime.Duration), + Username: "admin", + ExplicitlyAccountBound: []string{"account3"}, + Namespaces: []string{"ns3"}, + Configuration: nil, + ClusterName: "k8s-cluster1", + Namespace: "test-namespace", + HealthReportInterval: 60, + }, + }, + { + name: "Values from anchore-registration in app config", + args: args{ + config: &config.Application{ + AnchoreDetails: config.AnchoreInfo{ + User: "admin", + }, + AccountRoutes: config.AccountRoutes{ + "account3": config.AccountRouteDetails{ + Namespaces: []string{"ns3"}, + }, + }, + KubeConfig: config.KubeConf{ + Cluster: "k8s-cluster1", + }, + Registration: config.RegistrationOptions{ + RegistrationID: "test-registration-id", + IntegrationName: "test k8s inventory", + IntegrationDescription: "Description from config", + }, + HealthReportIntervalSeconds: 60, + }, + c: nil, + namespace: "test-namespace", + name: "1111223344", + }, + want: &Registration{ + RegistrationID: "test-registration-id", + RegistrationInstanceID: "1111223344", + Type: Type, + Name: "test k8s inventory", + Description: "Description from config", + Version: "", + StartedAt: jstime.Datetime{Time: timestamps[0].UTC()}, + Uptime: new(jstime.Duration), + Username: "admin", + ExplicitlyAccountBound: []string{"account3"}, + Namespaces: []string{"ns3"}, + Configuration: nil, + ClusterName: "k8s-cluster1", + Namespace: "test-namespace", + HealthReportInterval: 60, + }, + }, + { + name: "Values from k8s", + args: args{ + config: &config.Application{ + AnchoreDetails: config.AnchoreInfo{ + User: "admin", + }, + AccountRoutes: config.AccountRoutes{ + "account3": config.AccountRouteDetails{ + Namespaces: []string{"ns3"}, + }, + }, + KubeConfig: config.KubeConf{ + Cluster: "k8s-cluster1", + }, + Registration: config.RegistrationOptions{}, + HealthReportIntervalSeconds: 60, + }, + c: &client.Client{ + Clientset: fake.NewSimpleClientset(&pod, &replicaSet, &deployment), + }, + namespace: "test-namespace", + name: "test-pod", + }, + want: &Registration{ + RegistrationID: "test-deployment-uid", + RegistrationInstanceID: "test-pod", + Type: Type, + Name: "test-deployment-k8s-inventory", + Description: "", + Version: "1.7.0", + StartedAt: jstime.Datetime{Time: timestamps[0].UTC()}, + Uptime: new(jstime.Duration), + Username: "admin", + ExplicitlyAccountBound: []string{"account3"}, + Namespaces: []string{"ns3"}, + Configuration: nil, + ClusterName: "k8s-cluster1", + Namespace: "test-namespace", + HealthReportInterval: 60, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + i := 0 + NewUUIDMock := func() uuid.UUID { + _uuid := uuids[i] + i++ + return _uuid + } + j := 0 + nowMock := func() time.Time { + timestamp := timestamps[j] + j++ + return timestamp + } + result := getRegistrationInfo(tt.args.config, tt.args.c, tt.args.namespace, + tt.args.name, NewUUIDMock, nowMock) + assert.NotNil(t, result) + assert.Equal(t, tt.want, result) + }) + } +} + +func TestGetInstanceDataFromK8s(t *testing.T) { + type args struct { + c *client.Client + namespace string + podName string + } + type want struct { + registrationID string + instanceName string + appVersion string + } + tests := []struct { + name string + args args + want want + }{ + { + name: "successful get instance data from k8s", + args: args{ + c: &client.Client{ + Clientset: fake.NewSimpleClientset(&pod, &replicaSet, &deployment), + }, + namespace: "test-namespace", + podName: "test-pod", + }, + want: want{ + registrationID: "test-deployment-uid", + instanceName: "k8s-inventory", + appVersion: "1.7.0", + }, + }, + { + name: "nil client", + args: args{ + c: nil, + namespace: "test-namespace", + podName: "test-pod", + }, + want: want{ + registrationID: "", + instanceName: "", + appVersion: "", + }, + }, + { + name: "no pod", + args: args{ + c: &client.Client{ + Clientset: fake.NewSimpleClientset(), + }, + namespace: "test-namespace", + podName: "test-pod", + }, + want: want{ + registrationID: "", + instanceName: "", + appVersion: "", + }, + }, + { + name: "no replicaSet", + args: args{ + c: &client.Client{ + Clientset: fake.NewSimpleClientset(&pod), + }, + namespace: "test-namespace", + podName: "test-pod", + }, + want: want{ + registrationID: "", + instanceName: "", + appVersion: "", + }, + }, + { + name: "no deployment", + args: args{ + c: &client.Client{ + Clientset: fake.NewSimpleClientset(&pod, &replicaSet), + }, + namespace: "test-namespace", + podName: "test-pod", + }, + want: want{ + registrationID: "", + instanceName: "", + appVersion: "", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + resultRegID, resultInstName, resultAppVersion := getInstanceDataFromK8s(tt.args.c, tt.args.namespace, tt.args.podName) + assert.Equal(t, tt.want.registrationID, resultRegID) + assert.NotNil(t, tt.want.instanceName, resultInstName) + assert.NotNil(t, tt.want.appVersion, resultAppVersion) + }) + } +} + +func TestGetAccountsAndNamespacesForAgent(t *testing.T) { + type args struct { + config *config.Application + } + type want struct { + accountNames []string + namespaces []string + } + tests := []struct { + name string + args + want want + }{ + { + name: "empty account routes", + args: args{ + config: &config.Application{}, + }, + want: want{ + accountNames: []string{}, + namespaces: []string{}, + }, + }, + { + name: "populated account routes", + args: args{ + config: &config.Application{ + AccountRoutes: config.AccountRoutes{ + "account1": config.AccountRouteDetails{ + Namespaces: []string{"ns1", "ns2"}, + }, + "account2": config.AccountRouteDetails{}, + "account3": config.AccountRouteDetails{ + Namespaces: []string{"ns3"}, + }, + }, + }, + }, + want: want{ + accountNames: []string{"account1", "account2", "account3"}, + namespaces: []string{"ns1", "ns2", "ns3"}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + resultAccountNames, resultNamespaces := getAccountsAndNamespacesForAgent(tt.args.config) + slices.Sort(resultAccountNames) + assert.Equal(t, tt.want.accountNames, resultAccountNames) + slices.Sort(resultNamespaces) + assert.Equal(t, tt.want.namespaces, resultNamespaces) + }) + } +} diff --git a/pkg/lib.go b/pkg/lib.go index dfb8b53..32dc467 100644 --- a/pkg/lib.go +++ b/pkg/lib.go @@ -7,20 +7,22 @@ import ( "encoding/json" "errors" "fmt" + jstime "github.com/anchore/k8s-inventory/internal/time" + "github.com/anchore/k8s-inventory/pkg/integration" "os" "regexp" "time" - "github.com/anchore/k8s-inventory/pkg/reporter" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "github.com/anchore/k8s-inventory/internal/config" "github.com/anchore/k8s-inventory/internal/log" "github.com/anchore/k8s-inventory/pkg/client" + "github.com/anchore/k8s-inventory/pkg/healthreporter" "github.com/anchore/k8s-inventory/pkg/inventory" "github.com/anchore/k8s-inventory/pkg/logger" + "github.com/anchore/k8s-inventory/pkg/reporter" ) type ReportItem struct { @@ -49,7 +51,7 @@ func reportToStdout(report inventory.Report) error { return nil } -func HandleReport(report inventory.Report, cfg *config.Application, account string) error { +func HandleReport(report inventory.Report, reportInfo *healthreporter.InventoryReportInfo, cfg *config.Application, account string) error { if cfg.VerboseInventoryReports { err := reportToStdout(report) if err != nil { @@ -76,6 +78,7 @@ func HandleReport(report inventory.Report, cfg *config.Application, account stri } if anchoreDetails.IsValid() { + reportInfo.SentAsUser = anchoreDetails.User if err := reporter.Post(report, anchoreDetails); err != nil { if errors.Is(err, reporter.ErrAnchoreAccountDoesNotExist) { return err @@ -91,7 +94,14 @@ func HandleReport(report inventory.Report, cfg *config.Application, account stri // PeriodicallyGetInventoryReport periodically retrieve image results and report/output them according to the configuration. // Note: Errors do not cause the function to exit, since this is periodically running -func PeriodicallyGetInventoryReport(cfg *config.Application) { +// +//nolint:funlen, gocognit +func PeriodicallyGetInventoryReport(cfg *config.Application, ch integration.Channels, gatedReportInfo *healthreporter.GatedReportInfo) { + // Wait for registration with Enterprise to be disabled or completed + <-ch.InventoryReportingEnabled + log.Info("Inventory reporting started") + healthReportingEnabled := false + // Fire off a ticker that reports according to a configurable polling interval ticker := time.NewTicker(time.Duration(cfg.PollingIntervalSeconds) * time.Second) @@ -101,20 +111,56 @@ func PeriodicallyGetInventoryReport(cfg *config.Application) { log.Errorf("Failed to get Inventory Report: %w", err) } else { for account, reportsForAccount := range reports { + reportInfo := healthreporter.InventoryReportInfo{ + Account: account, + BatchSize: len(reportsForAccount), + LastSuccessfulIndex: -1, + Batches: make([]healthreporter.BatchInfo, 0), + HasErrors: false, + } for count, report := range reportsForAccount { log.Infof("Sending Inventory Report to Anchore Account %s, %d of %d", account, count+1, len(reportsForAccount)) - err := HandleReport(report, cfg, account) + + reportInfo.ReportTimestamp = report.Timestamp + batchInfo := healthreporter.BatchInfo{ + SendTimestamp: jstime.Datetime{Time: time.Now().UTC()}, + BatchIndex: count + 1, + } + + err := HandleReport(report, &reportInfo, cfg, account) if errors.Is(err, reporter.ErrAnchoreAccountDoesNotExist) { + // record this error for the health report even if the retry works + batchInfo.Error = fmt.Sprintf("%s (%s) | ", err.Error(), account) + reportInfo.HasErrors = true + // Retry with default account retryAccount := cfg.AnchoreDetails.Account if cfg.AccountRouteByNamespaceLabel.DefaultAccount != "" { retryAccount = cfg.AccountRouteByNamespaceLabel.DefaultAccount } log.Warnf("Error sending to Anchore Account %s, sending to default account", account) - err = HandleReport(report, cfg, retryAccount) + err = HandleReport(report, &reportInfo, cfg, retryAccount) } if err != nil { log.Errorf("Failed to handle Inventory Report: %w", err) + // append the error to any error that happened during a retry, so we record both failures + batchInfo.Error += err.Error() + reportInfo.HasErrors = true + } else { + reportInfo.LastSuccessfulIndex = count + 1 + } + + select { + case isEnabled, isNotClosed := <-ch.HealthReportingEnabled: + if isNotClosed { + healthReportingEnabled = isEnabled + } + log.Infof("Health reporting enabled: %t", healthReportingEnabled) + default: + } + if healthReportingEnabled { + reportInfo.Batches = append(reportInfo.Batches, batchInfo) + healthreporter.SetReportInfoNoBlocking(account, count, reportInfo, gatedReportInfo) } } } diff --git a/skaffold.yaml b/skaffold.yaml index 4072691..f669bf2 100644 --- a/skaffold.yaml +++ b/skaffold.yaml @@ -23,6 +23,7 @@ deploy: k8sInventory.quiet: false k8sInventory.verboseInventoryReports: true k8sInventory.pollingIntervalSeconds: 60 + k8sInventory.healthReportIntervalSeconds: 60 k8sInventory.anchore.url: "http://host.docker.internal:8228" k8sInventory.anchore.user: "admin" k8sInventory.anchore.password: "foobar"