diff --git a/anchore-k8s-inventory.yaml b/anchore-k8s-inventory.yaml index 6445dd6..5641590 100644 --- a/anchore-k8s-inventory.yaml +++ b/anchore-k8s-inventory.yaml @@ -14,6 +14,17 @@ log: # enable/disable checking for application updates on startup check-for-app-update: true +registration: + # The id to register the agent as with Enterprise, so Enterprise can map the agent to its integration uuid. + # If left unspecified, the agent will attempt to set registration-id to the uid of the K8s Deployment for the agent. + # If that fails (e.g., if the agent is not deployed on K8s), the agent will generate a UUID to use as registration-id. + registration-id: + # The name that the agent should have. If left unspecified, the agent will attempt to set it to the name of the K8s + # Deployment for the agent. If that fails it will be empty. + integration-name: + # A short description for the agent + integration-description: + kubeconfig: path: cluster: docker-desktop @@ -117,6 +128,9 @@ ignore-not-running: true # Only respected if mode is periodic polling-interval-seconds: 300 +# Only respected if mode is periodic +health-report-interval-seconds: 60 + # Batch Request configuration inventory-report-limits: namespaces: 0 # default of 0 means no limit per report diff --git a/cmd/root.go b/cmd/root.go index aaa151f..bd42046 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -3,11 +3,12 @@ package cmd import ( "errors" "fmt" - "os" - "runtime/pprof" - + "github.com/anchore/k8s-inventory/pkg/healthreporter" + "github.com/anchore/k8s-inventory/pkg/integration" "github.com/anchore/k8s-inventory/pkg/mode" "github.com/anchore/k8s-inventory/pkg/reporter" + "os" + "runtime/pprof" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -45,9 +46,29 @@ var rootCmd = &cobra.Command{ os.Exit(1) } + instance, err := integration.PerformRegistration(appConfig) + if err != nil { + os.Exit(1) + } + switch appConfig.RunMode { case mode.PeriodicPolling: - pkg.PeriodicallyGetInventoryReport(appConfig) + neverDone := make(chan bool, 1) + + var gatedReportInfo *healthreporter.GatedReportInfo + if !integration.HealthReportingDisabled { + gatedReportInfo = &healthreporter.GatedReportInfo{ + AccountInventoryReports: make(healthreporter.AccountK8SInventoryReports, 0), + } + + go healthreporter.PeriodicallySendHealthReport(appConfig, instance, gatedReportInfo) + } else { + gatedReportInfo = nil + } + + go pkg.PeriodicallyGetInventoryReport(appConfig, gatedReportInfo) + + <-neverDone default: reports, err := pkg.GetInventoryReports(appConfig) if appConfig.Dev.ProfileCPU { @@ -58,10 +79,11 @@ var rootCmd = &cobra.Command{ os.Exit(1) } anErrorOccurred := false + reportInfo := healthreporter.InventoryReportInfo{} for account, reportsForAccount := range reports { for count, report := range reportsForAccount { log.Infof("Sending Inventory Report to Anchore Account %s, %d of %d", account, count+1, len(reportsForAccount)) - err = pkg.HandleReport(report, appConfig, account) + err = pkg.HandleReport(report, &reportInfo, appConfig, account) if errors.Is(err, reporter.ErrAnchoreAccountDoesNotExist) { // Retry with default account retryAccount := appConfig.AnchoreDetails.Account @@ -69,7 +91,7 @@ var rootCmd = &cobra.Command{ retryAccount = appConfig.AccountRouteByNamespaceLabel.DefaultAccount } log.Warnf("Error sending to Anchore Account %s, sending to default account", account) - err = pkg.HandleReport(report, appConfig, retryAccount) + err = pkg.HandleReport(report, &reportInfo, appConfig, retryAccount) } if err != nil { log.Errorf("Failed to handle Image Results: %+v", err) diff --git a/internal/anchore/anchoreclient.go b/internal/anchore/anchoreclient.go new file mode 100644 index 0000000..3750e1c --- /dev/null +++ b/internal/anchore/anchoreclient.go @@ -0,0 +1,250 @@ +package anchore + +import ( + "bytes" + "crypto/tls" + "encoding/json" + "errors" + "fmt" + "github.com/anchore/k8s-inventory/internal/config" + "github.com/anchore/k8s-inventory/internal/log" + "github.com/anchore/k8s-inventory/internal/tracker" + "github.com/h2non/gock" + "io" + "net/http" + "net/url" + "os" + "strings" + "syscall" + "time" +) + +type ControllerErrorDetails struct { + Type string `json:"type"` + Title string `json:"title"` + Detail string `json:"detail"` + Status int `json:"status"` +} + +type APIErrorDetails struct { + Message string `json:"message"` + Detail map[string]interface{} `json:"detail"` + HTTPCode int `json:"httpcode"` +} + +type APIClientError struct { + HTTPStatusCode int + Message string + Path string + Method string + Body *[]byte + APIErrorDetails *APIErrorDetails + ControllerErrorDetails *ControllerErrorDetails +} + +func (e *APIClientError) Error() string { + return fmt.Sprintf("API errorMsg(%d): %s Path: %q %v %v", e.HTTPStatusCode, e.Message, e.Path, + e.APIErrorDetails, e.ControllerErrorDetails) +} + +func Post(requestBody []byte, id string, path string, anchoreDetails config.AnchoreInfo, operation string) (*[]byte, error) { + defer tracker.TrackFunctionTime(time.Now(), fmt.Sprintf("Sent %s request to Anchore", operation)) + + log.Debugf("Performing %s to Anchore using endpoint: %s", operation, strings.Replace(path, "{{id}}", id, 1)) + + client := getClient(anchoreDetails) + + anchoreURL, err := getURL(anchoreDetails, path, id) + if err != nil { + return nil, err + } + + request, err := getPostRequest(anchoreDetails, anchoreURL, requestBody, operation) + if err != nil { + return nil, err + } + + return doPost(client, request, operation) +} + +func getClient(anchoreDetails config.AnchoreInfo) *http.Client { + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: anchoreDetails.HTTP.Insecure}, + } // #nosec G402 + + client := &http.Client{ + Transport: tr, + Timeout: time.Duration(anchoreDetails.HTTP.TimeoutSeconds) * time.Second, + } + gock.InterceptClient(client) // Required to use gock for testing custom client + + return client +} + +func getURL(anchoreDetails config.AnchoreInfo, path string, id string) (string, error) { + anchoreURL, err := url.Parse(anchoreDetails.URL) + if err != nil { + return "", fmt.Errorf("failed to build path (%s) url: %w", path, err) + } + + anchoreURL.Path += strings.Replace(path, "{{id}}", id, 1) + return anchoreURL.String(), nil +} + +func getPostRequest(anchoreDetails config.AnchoreInfo, endpointURL string, reqBody []byte, operation string) (*http.Request, error) { + request, err := http.NewRequest("POST", endpointURL, bytes.NewBuffer(reqBody)) + if err != nil { + return nil, fmt.Errorf("failed to prepare %s request to Anchore: %w", operation, err) + } + + request.SetBasicAuth(anchoreDetails.User, anchoreDetails.Password) + request.Header.Set("Content-Type", "application/json") + request.Header.Set("x-anchore-account", anchoreDetails.Account) + return request, nil +} + +func doPost(client *http.Client, request *http.Request, operation string) (*[]byte, error) { + response, err := client.Do(request) + if err != nil { + return nil, err + } + defer response.Body.Close() + + err = checkHTTPErrors(response, operation) + if err != nil { + return nil, err + } + + responseBody, _ := getBody(response, operation) + return responseBody, nil +} + +func checkHTTPErrors(response *http.Response, operation string) error { + switch { + case response.StatusCode >= 400 && response.StatusCode <= 599: + msg := fmt.Sprintf("%s response from Anchore (during %s)", response.Status, operation) + log.Errorf(msg) + + respBody, _ := getBody(response, operation) + if respBody == nil { + return &APIClientError{Message: msg, Path: response.Request.URL.Path, Method: response.Request.Method, + Body: nil, HTTPStatusCode: response.StatusCode} + } + + // Depending on where an errorMsg is discovered during request processing on the server, the + // errorMsg information in the response will be either an APIErrorDetails or a ControllerErrorDetails + apiError := APIErrorDetails{} + err := json.Unmarshal(*respBody, &apiError) + if err == nil { + return &APIClientError{Message: msg, Path: response.Request.URL.Path, Method: response.Request.Method, + Body: nil, HTTPStatusCode: response.StatusCode, APIErrorDetails: &apiError} + } + + controllerError := ControllerErrorDetails{} + err = json.Unmarshal(*respBody, &controllerError) + if err == nil { + return &APIClientError{Message: msg, Path: response.Request.URL.Path, Method: response.Request.Method, + Body: nil, HTTPStatusCode: response.StatusCode, ControllerErrorDetails: &controllerError} + } + + return &APIClientError{Message: msg, Path: response.Request.URL.Path, Method: response.Request.Method, + Body: nil, HTTPStatusCode: response.StatusCode} + case response.StatusCode < 200 || response.StatusCode > 299: + msg := fmt.Sprintf("failed to perform %s to Anchore: %+v", operation, response) + log.Debugf(msg) + return &APIClientError{Message: msg, Path: response.Request.URL.Path, Method: response.Request.Method, + Body: nil, HTTPStatusCode: response.StatusCode} + } + return nil +} + +func getBody(response *http.Response, operation string) (*[]byte, error) { + responseBody, err := io.ReadAll(response.Body) + if err != nil { + errMsg := fmt.Sprintf("failed to read %s response body from Anchore:", operation) + log.Debugf("%s %v", operation, errMsg) + return nil, fmt.Errorf("%s %w", errMsg, err) + } + + // Check we received a valid JSON response from Anchore, this will help catch + // any redirect responses where it returns HTML login pages e.g. Enterprise + // running behind cloudflare where a login page is returned with the status 200 + if len(responseBody) > 0 && !json.Valid(responseBody) { + log.Debugf("Anchore %s response body: %s", operation, string(responseBody)) + return nil, fmt.Errorf("%s response from Anchore is not valid json: %+v", operation, response) + } + return &responseBody, nil +} + +func ServerIsOffline(err error) bool { + if os.IsTimeout(err) { + return true + } + + if errors.Is(err, syscall.ECONNREFUSED) { + return true + } + + if errors.Is(err, syscall.ECONNRESET) { + return true + } + + var apiClientError *APIClientError + if errors.As(err, &apiClientError) { + if apiClientError.HTTPStatusCode == http.StatusBadGateway || + apiClientError.HTTPStatusCode == http.StatusServiceUnavailable || + apiClientError.HTTPStatusCode == http.StatusGatewayTimeout { + return true + } + } + + return false +} + +func ServerLacksAgentHealthAPISupport(err error) bool { + var apiClientError *APIClientError + if errors.As(err, &apiClientError) { + if apiClientError.ControllerErrorDetails == nil { + return false + } + + if apiClientError.HTTPStatusCode == http.StatusNotFound && + strings.Contains(apiClientError.ControllerErrorDetails.Detail, "The requested URL was not found") { + return true + } + + if apiClientError.HTTPStatusCode == http.StatusMethodNotAllowed && + apiClientError.ControllerErrorDetails.Detail == "Method Not Allowed" { + return true + } + } + + return false +} + +func UserLacksAPIPrivileges(err error) bool { + var apiClientError *APIClientError + + if errors.As(err, &apiClientError) { + if apiClientError.APIErrorDetails == nil { + return false + } + + if apiClientError.HTTPStatusCode == http.StatusForbidden && + strings.Contains(apiClientError.APIErrorDetails.Message, "Not authorized. Requires permissions") { + return true + } + } + return false +} + +func IncorrectCredentials(err error) bool { + // This covers user that does not exist or incorrect password for user + var apiClientError *APIClientError + + if errors.As(err, &apiClientError) && apiClientError.HTTPStatusCode == http.StatusUnauthorized { + return true + } + + return false +} diff --git a/internal/anchore/anchoreclient_test.go b/internal/anchore/anchoreclient_test.go new file mode 100644 index 0000000..a6d6141 --- /dev/null +++ b/internal/anchore/anchoreclient_test.go @@ -0,0 +1,535 @@ +package anchore + +import ( + "fmt" + "github.com/anchore/k8s-inventory/internal/config" + "github.com/h2non/gock" + "github.com/stretchr/testify/assert" + "net" + "net/http" + "net/url" + "os" + "syscall" + "testing" +) + +type httpError struct { + err string + timeout bool +} + +func (e *httpError) Error() string { return e.err } +func (e *httpError) Timeout() bool { return e.timeout } + +var ( + integration = map[string]interface{}{ + "uuid": "000d1e60-cb05-4cce-8d1e-60cb052cce1f", + "type": "anchore_k8s_inventory_agent", + "name": "k8s-inv-agent", + "description": "k8s-agent with health reporting", + "version": "2.0", + "reported_status": map[string]interface{}{"state": "HEALTHY"}, + "integration_status": map[string]interface{}{"state": "REGISTERED"}, + "started_at": "2024-04-10T12:14:16Z", + // "last_seen": nil + "uptime": "2.04", + "username": "admin", + "account_name": "admin", + // "explicitly_account_bound": interface{}{}, + // "accounts":, + // "namespaces:, + "cluster_name": "Docker-Desktop", + "namespace": "default", + "health-report-interval": 60, + "registration_id": "de2c3c58-4c20-4d87-ac3c-584c201d875a", + "registration_instance_id": "45743315", + } + + errOther = fmt.Errorf("other errorMsg") + + connectionTimeoutError = url.Error{ + Op: "Post", + URL: "http://127.0.0.1:8228/v2/system/integrations/registration", + Err: &httpError{err: "net/http: timeout awaiting response headers", timeout: true}, + } + + connectionRefusedError = url.Error{ + Op: "Post", + URL: "http://127.0.0.1:8228/v2/system/integrations/registration", + Err: &net.OpError{ + Op: "dial", + Net: "tcp", + Source: nil, + Addr: &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 8228, + Zone: "", + }, + Err: &os.SyscallError{ + Syscall: "connect", + Err: syscall.ECONNREFUSED, + }, + }, + } + + connectionResetError = url.Error{ + Op: "Post", + URL: "http://127.0.0.1:8228/v2/system/integrations/registration", + Err: &net.OpError{ + Op: "read", + Net: "tcp", + Source: &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 62122, + Zone: "", + }, + Addr: &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 8228, + Zone: "", + }, + Err: &os.SyscallError{ + Syscall: "read", + Err: syscall.ECONNRESET, + }, + }, + } + + badGatewayError = APIClientError{ + HTTPStatusCode: http.StatusBadGateway, + Message: "Bad Gateway", + Path: "v2/system/integrations/registration", + Method: "POST", + } + + serviceUnavailableError = APIClientError{ + HTTPStatusCode: http.StatusServiceUnavailable, + Message: "Service Unavailable", + Path: "v2/system/integrations/registration", + Method: "POST", + } + + gatewayTimeoutError = APIClientError{ + HTTPStatusCode: http.StatusGatewayTimeout, + Message: "Gateway Timeout", + Path: "v2/system/integrations/registration", + Method: "POST", + } + + urlNotFoundError = APIClientError{ + HTTPStatusCode: http.StatusNotFound, + Message: "404 Not Found response from Anchore (during integration registration)", + Path: "/v2/system/integrations/registration", + Method: "POST", + ControllerErrorDetails: &ControllerErrorDetails{ + Type: "about:blank", + Title: "Not Found", + Detail: "The requested URL was not found on the server. If you entered the URL manually please check your spelling and try again.", + Status: http.StatusNotFound, + }, + } + + methodNotAllowedError = APIClientError{ + HTTPStatusCode: http.StatusMethodNotAllowed, + Message: "405 Method Not Allowed response from Anchore (during integration registration)", + Path: "v2/system/integrations/registration", + Method: "POST", + ControllerErrorDetails: &ControllerErrorDetails{ + Type: "about:blank", + Title: "Method Not Allowed", + Detail: "Method Not Allowed", + Status: http.StatusMethodNotAllowed, + }, + } + + unAuthorizedError = APIClientError{ + HTTPStatusCode: http.StatusUnauthorized, + Message: "401 Unauthorized response from Anchore (during integration registration)", + Path: "v2/system/integrations/registration", + Method: "POST", + } + + insufficientPrivilegeError = APIClientError{ + HTTPStatusCode: http.StatusForbidden, + Message: "403 Forbidden response from Anchore (during integration registration)", + Path: "v2/system/integrations/registration", + Method: "POST", + APIErrorDetails: &APIErrorDetails{ + Message: "Not authorized. Requires permissions: domain=account0 action=registerIntegration target=", + Detail: map[string]interface{}{}, + HTTPCode: http.StatusForbidden, + }, + } + + anchoreDetails = config.AnchoreInfo{ + URL: "https://ancho.re", + User: "admin", + Password: "foobar", + Account: "account0", + } +) + +func TestPost(t *testing.T) { + defer gock.Off() + + type args struct { + requestBody []byte + id string + path string + anchoreDetails config.AnchoreInfo + operation string + } + tests := []struct { + name string + args args + wantErr error + }{ + { + name: "successful registration", + args: args{ + requestBody: []byte(`{"id":"1"}`), + id: "", + path: "v2/system/integrations/registration", + anchoreDetails: anchoreDetails, + operation: "integration registration", + }, + }, + { + name: "401 error", + args: args{ + requestBody: []byte(`{"id":"1"}`), + id: "", + path: "v2/system/integrations/registration", + anchoreDetails: anchoreDetails, + operation: "integration registration", + }, + wantErr: &unAuthorizedError, + }, + { + name: "403 error", + args: args{ + requestBody: []byte(`{"id":"1"}`), + id: "", + path: "v2/system/integrations/registration", + anchoreDetails: anchoreDetails, + operation: "integration registration", + }, + wantErr: &insufficientPrivilegeError, + }, + { + name: "404 error", + args: args{ + requestBody: []byte(`{"id":"1"}`), + id: "", + path: "v2/system/integrations/registration", + anchoreDetails: anchoreDetails, + operation: "integration registration", + }, + wantErr: &urlNotFoundError, + }, + } + for _, tt := range tests { + switch tt.name { + case "successful registration": + gock.New("https://ancho.re"). + Post("v2/system/integrations/registration"). + Reply(200). + JSON(integration) + case "401 error": + gock.New("https://ancho.re"). + Post("v2/system/integrations/registration"). + Reply(401) + case "403 error": + gock.New("https://ancho.re"). + Post("v2/system/integrations/registration"). + Reply(403). + JSON(map[string]interface{}{ + "message": "Not authorized. Requires permissions: domain=account0 action=registerIntegration target=", + "detail": map[string]interface{}{}, + "httpcode": http.StatusForbidden, + }) + case "404 error": + gock.New("https://ancho.re"). + Post("v2/system/integrations/registration"). + Reply(404). + JSON(map[string]interface{}{ + "type": "about:blank", + "title": "Not Found", + "detail": "The requested URL was not found on the server. If you entered the URL manually please check your spelling and try again.", + "status": http.StatusNotFound, + }) + } + t.Run(tt.name, func(t *testing.T) { + result, err := Post(tt.args.requestBody, tt.args.id, tt.args.path, tt.args.anchoreDetails, tt.args.operation) + if tt.wantErr != nil { + assert.Error(t, err) + assert.Nil(t, result) + } else { + assert.NoError(t, err) + assert.NotNil(t, result) + } + }) + } +} + +func TestGetUrl(t *testing.T) { + type args struct { + anchoreDetails config.AnchoreInfo + url string + uuid string + } + type want struct { + expectedURL string + errorMsg string + } + tests := []struct { + name string + args args + want want + }{ + { + name: "Registration url", + args: args{ + anchoreDetails: anchoreDetails, + url: "v2/system/integrations/registration", + uuid: "", + }, + want: want{ + expectedURL: "https://ancho.re/v2/system/integrations/registration", + errorMsg: "", + }, + }, + { + name: "Health report url", + args: args{ + anchoreDetails: anchoreDetails, + url: "v2/system/integrations/{{id}}/health-report", + uuid: "0ec44439-d091-4bf2-8444-39d0916bf220", + }, + want: want{ + expectedURL: "https://ancho.re/v2/system/integrations/0ec44439-d091-4bf2-8444-39d0916bf220/health-report", + errorMsg: "", + }, + }, + { + name: "faulty url", + args: args{ + anchoreDetails: config.AnchoreInfo{ + URL: "htt$ps://ancho.re", + User: "admin", + Password: "foobar", + }, + url: "v2/system/integrations/{{id}}/health-report", + uuid: "0ec44439-d091-4bf2-8444-39d0916bf220", + }, + want: want{ + expectedURL: "", + errorMsg: "failed to build path (v2/system/integrations/{{id}}/health-report)", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := getURL(tt.args.anchoreDetails, tt.args.url, tt.args.uuid) + assert.Equal(t, tt.want.expectedURL, result) + if tt.want.errorMsg != "" { + assert.ErrorContains(t, err, tt.want.errorMsg) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestGetPostRequest(t *testing.T) { + type args struct { + url string + reqBody []byte + } + tests := []struct { + name string + args args + want error + }{ + { + name: "Good path", + args: args{ + url: "http://localhost:8228/v2/system/integrations/registration", + reqBody: make([]byte, 0), + }, + want: nil, + }, + { + name: "Bad path", + args: args{ + url: "_http://localhost:8228/v2/system/integrations/registration", + reqBody: nil, + }, + want: &url.Error{}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := getPostRequest(anchoreDetails, tt.args.url, tt.args.reqBody, "register integration") + if tt.want != nil { + assert.Nil(t, result) + assert.Error(t, err, tt.want) + } else { + assert.NoError(t, err) + assert.Equal(t, result.Header.Get("content-type"), "application/json") + assert.Contains(t, result.Header.Get("Authorization"), "Basic") + assert.Contains(t, result.Header.Get("X-Anchore-Account"), "account0") + } + }) + } +} + +func TestAnchoreIsOffline(t *testing.T) { + tests := []struct { + name string + err error + want bool + }{ + { + name: "Connection timeout returns true", + err: &connectionTimeoutError, + want: true, + }, + { + name: "Connection refused errorMsg returns true", + err: &connectionRefusedError, + want: true, + }, + { + name: "Connection reset errorMsg returns true", + err: &connectionResetError, + want: true, + }, + { + name: "AnchoreAPIClientError with 502 http_status returns true", + err: &badGatewayError, + want: true, + }, + { + name: "AnchoreAPIClientError with 503 http_status returns true", + err: &serviceUnavailableError, + want: true, + }, + { + name: "AnchoreAPIClientError with 504 http_status returns true", + err: &gatewayTimeoutError, + want: true, + }, + { + name: "AnchoreAPIClientError with 401 http_status returns false", + err: &unAuthorizedError, + want: false, + }, + { + name: "Other errorMsg returns false", + err: errOther, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := ServerIsOffline(tt.err) + assert.Equal(t, tt.want, result) + }) + } +} + +func TestAnchoreLacksAgentHealthAPISupport(t *testing.T) { + tests := []struct { + name string + err error + want bool + }{ + { + name: "AnchoreAPIClientError with 404 http_status returns true", + err: &urlNotFoundError, + want: true, + }, + { + name: "AnchoreAPIClientError with 405 http_status returns true", + err: &methodNotAllowedError, + want: true, + }, + { + name: "AnchoreAPIClientError with 401 http_status returns false", + err: &unAuthorizedError, + want: false, + }, + { + name: "Other errorMsg returns false", + err: errOther, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := ServerLacksAgentHealthAPISupport(tt.err) + assert.Equal(t, tt.want, result) + }) + } +} + +func TestUserLacksAPIPrivileges(t *testing.T) { + tests := []struct { + name string + err error + want bool + }{ + { + name: "AnchoreAPIClientError with 403 http_status returns true", + err: &insufficientPrivilegeError, + want: true, + }, + { + name: "AnchoreAPIClientError with 401 http_status returns false", + err: &unAuthorizedError, + want: false, + }, + { + name: "Other errorMsg returns false", + err: errOther, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := UserLacksAPIPrivileges(tt.err) + assert.Equal(t, tt.want, result) + }) + } +} + +func TestIncorrectCredentials(t *testing.T) { + tests := []struct { + name string + err error + want bool + }{ + { + name: "AnchoreAPIClientError with 401 http_status returns true", + err: &unAuthorizedError, + want: true, + }, + { + name: "AnchoreAPIClientError with non 403 http_status returns false", + err: &insufficientPrivilegeError, + want: false, + }, + { + name: "Other errorMsg returns false", + err: errOther, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := IncorrectCredentials(tt.err) + assert.Equal(t, tt.want, result) + }) + } +} diff --git a/internal/config/config.go b/internal/config/config.go index 9332532..d01d657 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -15,7 +15,6 @@ import ( "path" "strings" - "github.com/anchore/k8s-inventory/pkg/mode" "gopkg.in/yaml.v2" "github.com/adrg/xdg" @@ -24,6 +23,7 @@ import ( "github.com/spf13/viper" "github.com/anchore/k8s-inventory/internal" + "github.com/anchore/k8s-inventory/pkg/mode" ) const redacted = "******" @@ -37,8 +37,9 @@ type CliOnlyOptions struct { // All Application configurations type Application struct { ConfigPath string - Quiet bool `mapstructure:"quiet" json:"quiet,omitempty" yaml:"quiet"` - Log Logging `mapstructure:"log" json:"log,omitempty" yaml:"log"` + Quiet bool `mapstructure:"quiet" json:"quiet,omitempty" yaml:"quiet"` + Log Logging `mapstructure:"log" json:"log,omitempty" yaml:"log"` + Registration RegistrationOptions `mapstructure:"registration" json:"registration,omitempty" yaml:"registration"` CliOptions CliOnlyOptions Dev Development `mapstructure:"dev" json:"dev,omitempty" yaml:"dev"` KubeConfig KubeConf `mapstructure:"kubeconfig" json:"kubeconfig,omitempty" yaml:"kubeconfig"` @@ -54,12 +55,19 @@ type Application struct { Mode string `mapstructure:"mode" json:"mode,omitempty" yaml:"mode"` IgnoreNotRunning bool `mapstructure:"ignore-not-running" json:"ignore-not-running,omitempty" yaml:"ignore-not-running"` PollingIntervalSeconds int `mapstructure:"polling-interval-seconds" json:"polling-interval-seconds,omitempty" yaml:"polling-interval-seconds"` + HealthReportIntervalSeconds int `mapstructure:"health-report-interval-seconds" json:"health-report-interval-seconds,omitempty" yaml:"health-report-interval-seconds"` InventoryReportLimits InventoryReportLimits `mapstructure:"inventory-report-limits" json:"inventory-report-limits,omitempty" yaml:"inventory-report-limits"` MetadataCollection MetadataCollection `mapstructure:"metadata-collection" json:"metadata-collection,omitempty" yaml:"metadata-collection"` AnchoreDetails AnchoreInfo `mapstructure:"anchore" json:"anchore,omitempty" yaml:"anchore"` VerboseInventoryReports bool `mapstructure:"verbose-inventory-reports" json:"verbose-inventory-reports,omitempty" yaml:"verbose-inventory-reports"` } +type RegistrationOptions struct { + RegistrationID string `mapstructure:"registration-id" json:"registration-id,omitempty" yaml:"registration-id"` + IntegrationName string `mapstructure:"integration-name" json:"integration-name,omitempty" yaml:"integration-name"` + IntegrationDescription string `mapstructure:"integration-description" json:"integration-description,omitempty" yaml:"integration-description"` +} + // MissingTagConf details the policy for handling missing tags when reporting images type MissingTagConf struct { Policy string `mapstructure:"policy" json:"policy,omitempty" yaml:"policy"` @@ -150,6 +158,8 @@ func setNonCliDefaultValues(v *viper.Viper) { v.SetDefault("log.level", "") v.SetDefault("log.file", "") v.SetDefault("log.structured", false) + v.SetDefault("registration.always-use-fallback-register-id", false) + v.SetDefault("registration.fallback-register-name", "anchore_k8s_inventory_agent") v.SetDefault("dev.profile-cpu", false) v.SetDefault("anchore.account", "admin") v.SetDefault("kubeconfig.anchore.account", "admin") @@ -160,6 +170,7 @@ func setNonCliDefaultValues(v *viper.Viper) { v.SetDefault("kubernetes.request-batch-size", 100) v.SetDefault("kubernetes.worker-pool-size", 100) v.SetDefault("ignore-not-running", true) + v.SetDefault("health-report-interval-seconds", 60) v.SetDefault("missing-registry-override", "") v.SetDefault("missing-tag-policy.policy", "digest") v.SetDefault("missing-tag-policy.tag", "UNKNOWN") diff --git a/internal/config/test-fixtures/snapshot/TestDefaultConfigString.golden b/internal/config/test-fixtures/snapshot/TestDefaultConfigString.golden index c3b656b..5df1c03 100644 --- a/internal/config/test-fixtures/snapshot/TestDefaultConfigString.golden +++ b/internal/config/test-fixtures/snapshot/TestDefaultConfigString.golden @@ -5,6 +5,10 @@ log: levelopt: debug level: debug file: ./anchore-k8s-inventory.log +registration: + registration-id: "" + integration-name: "" + integration-description: "" clioptions: configpath: ../../anchore-k8s-inventory.yaml verbosity: 0 @@ -44,6 +48,7 @@ runmode: 0 mode: adhoc ignore-not-running: true polling-interval-seconds: 300 +health-report-interval-seconds: 60 inventory-report-limits: namespaces: 0 metadata-collection: diff --git a/internal/config/test-fixtures/snapshot/TestEmptyConfigString.golden b/internal/config/test-fixtures/snapshot/TestEmptyConfigString.golden index 8f85b8b..9a997fa 100644 --- a/internal/config/test-fixtures/snapshot/TestEmptyConfigString.golden +++ b/internal/config/test-fixtures/snapshot/TestEmptyConfigString.golden @@ -5,6 +5,10 @@ log: levelopt: panic level: "" file: "" +registration: + registration-id: "" + integration-name: "" + integration-description: "" clioptions: configpath: "" verbosity: 0 @@ -44,6 +48,7 @@ runmode: 0 mode: "" ignore-not-running: false polling-interval-seconds: 0 +health-report-interval-seconds: 0 inventory-report-limits: namespaces: 0 metadata-collection: diff --git a/internal/config/test-fixtures/snapshot/TestSensitiveConfigJSON.golden b/internal/config/test-fixtures/snapshot/TestSensitiveConfigJSON.golden index b9da0c5..2c110e5 100644 --- a/internal/config/test-fixtures/snapshot/TestSensitiveConfigJSON.golden +++ b/internal/config/test-fixtures/snapshot/TestSensitiveConfigJSON.golden @@ -5,6 +5,8 @@ "level": "debug", "file": "./anchore-k8s-inventory.log" }, + "registration": { + }, "CliOptions": { "ConfigPath": "../../anchore-k8s-inventory.yaml", "Verbosity": 0 @@ -50,6 +52,7 @@ "mode": "adhoc", "ignore-not-running": true, "polling-interval-seconds": 300, + "health-report-interval-seconds": 60, "inventory-report-limits": {}, "metadata-collection": { "nodes": {}, diff --git a/internal/config/test-fixtures/snapshot/TestSensitiveConfigString.golden b/internal/config/test-fixtures/snapshot/TestSensitiveConfigString.golden index e72b4f9..b9af385 100644 --- a/internal/config/test-fixtures/snapshot/TestSensitiveConfigString.golden +++ b/internal/config/test-fixtures/snapshot/TestSensitiveConfigString.golden @@ -5,6 +5,10 @@ log: levelopt: debug level: debug file: ./anchore-k8s-inventory.log +registration: + registration-id: "" + integration-name: "" + integration-description: "" clioptions: configpath: ../../anchore-k8s-inventory.yaml verbosity: 0 @@ -54,6 +58,7 @@ runmode: 0 mode: adhoc ignore-not-running: true polling-interval-seconds: 300 +health-report-interval-seconds: 60 inventory-report-limits: namespaces: 0 metadata-collection: diff --git a/internal/time/time.go b/internal/time/time.go new file mode 100644 index 0000000..00ca5f4 --- /dev/null +++ b/internal/time/time.go @@ -0,0 +1,53 @@ +package time + +import ( + "encoding/json" + "errors" + "fmt" + "time" +) + +// time with json marshalling/unmarshalling support +const nanoSeconds = 1000000000 + +type Datetime struct { + time.Time +} +type Duration struct { + time.Duration +} + +func Now() Datetime { + return Datetime{time.Now()} +} + +func (d Datetime) UTC() Datetime { + return Datetime{d.Time.UTC()} +} + +func (d Datetime) Sub(d2 Datetime) *Duration { + return &Duration{d.Time.Sub(d2.Time)} +} + +func (d Datetime) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf("\"%s\"", d.Format(time.RFC3339))), nil +} + +func (d *Duration) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf("%f", d.Seconds())), nil +} + +func (d *Duration) UnmarshalJSON(b []byte) error { + var v interface{} + if err := json.Unmarshal(b, &v); err != nil { + return err + } + switch value := v.(type) { + case float64: + // enterprise sends durations in seconds + d.Duration = time.Duration(value * nanoSeconds) + return nil + default: + return errors.New("invalid duration") + } +} diff --git a/internal/time/time_test.go b/internal/time/time_test.go new file mode 100644 index 0000000..2319d8a --- /dev/null +++ b/internal/time/time_test.go @@ -0,0 +1,215 @@ +package time + +import ( + "encoding/json" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +var ( + timestamp1 = Datetime{ + Time: time.Date(2024, time.April, 10, 12, 14, 16, 50, time.UTC), + } + timestamp2 = Datetime{ + Time: time.Date(2023, time.June, 5, 6, 7, 8, 25, time.UTC), + } + timestamp3 = Datetime{ + Time: time.Date(2024, time.April, 10, 12, 14, 6, 50, time.UTC), + } + timeDiff1, _ = time.ParseDuration("7446h7m8.000000025s") + negTimeDiff1, _ = time.ParseDuration("-7446h7m8.000000025s") + timeDiff2, _ = time.ParseDuration("10s") + negTimeDiff2, _ = time.ParseDuration("-10s") +) + +func TestSub(t *testing.T) { + type args struct { + t1 Datetime + t2 Datetime + } + tests := []struct { + name string + args args + want *Duration + }{ + { + name: "Year long difference", + args: args{ + t1: timestamp1, + t2: timestamp2, + }, + want: &Duration{ + Duration: timeDiff1, + }, + }, + { + name: "Negative year long difference", + args: args{ + t1: timestamp2, + t2: timestamp1, + }, + want: &Duration{ + Duration: negTimeDiff1, + }, + }, + { + name: "Sub-minute long difference", + args: args{ + t1: timestamp1, + t2: timestamp3, + }, + want: &Duration{ + Duration: timeDiff2, + }, + }, + { + name: "Negative sub-minute long difference", + args: args{ + t1: timestamp3, + t2: timestamp1, + }, + want: &Duration{ + Duration: negTimeDiff2, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := tt.args.t1.Sub(tt.args.t2) + assert.Equal(t, tt.want, result) + }) + } +} + +func TestDateTimeMarshalJSON(t *testing.T) { + tests := []struct { + name string + t Datetime + want []byte + }{ + { + name: "timestamp1", + t: timestamp1, + want: []byte("\"2024-04-10T12:14:16Z\""), + }, + { + name: "timestamp2", + t: timestamp2, + want: []byte("\"2023-06-05T06:07:08Z\""), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, _ := tt.t.MarshalJSON() + assert.Equal(t, tt.want, result) + }) + } +} + +func TestDurationMarshalJSON(t *testing.T) { + tests := []struct { + name string + d *Duration + want []byte + }{ + { + name: "Year long difference", + d: &Duration{ + Duration: timeDiff1, + }, + want: []byte("26806028.000000"), + }, + { + name: "Negative year long difference", + d: &Duration{ + Duration: negTimeDiff1, + }, + want: []byte("-26806028.000000"), + }, + { + name: "Sub-minute long difference", + d: &Duration{ + Duration: timeDiff2, + }, + want: []byte("10.000000"), + }, + { + name: "Negative sub-minute long difference", + d: &Duration{ + Duration: negTimeDiff2, + }, + want: []byte("-10.000000"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, _ := tt.d.MarshalJSON() + assert.Equal(t, tt.want, result) + }) + } +} + +func TestDurationUnmarshalJSON(t *testing.T) { + timeDiff1, _ = time.ParseDuration("7446h7m8.000000000s") + negTimeDiff1, _ = time.ParseDuration("-7446h7m8.000000000s") + + type want struct { + d Duration + err error + } + tests := []struct { + name string + dbytes []byte + want want + }{ + { + name: "Year long difference, precision capped", + dbytes: []byte("26806028.000000"), + want: want{ + d: Duration{ + Duration: timeDiff1, + }, + err: nil, + }, + }, + { + name: "Negative year long difference, precision capped", + dbytes: []byte("-26806028.000000"), + want: want{ + d: Duration{ + Duration: negTimeDiff1, + }, + err: nil, + }, + }, + { + name: "Sub-minute long difference", + dbytes: []byte("10.000000"), + want: want{ + d: Duration{ + Duration: timeDiff2, + }, + err: nil, + }, + }, + { + name: "Negative sub-minute long difference", + dbytes: []byte("-10.000000"), + want: want{ + d: Duration{ + Duration: negTimeDiff2, + }, + err: nil, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var d Duration + resultErr := json.Unmarshal(tt.dbytes, &d) + assert.Equal(t, tt.want.err, resultErr) + assert.Equal(t, tt.want.d, d) + }) + } +} diff --git a/pkg/healthreporter/healthreporter.go b/pkg/healthreporter/healthreporter.go new file mode 100644 index 0000000..1cdec71 --- /dev/null +++ b/pkg/healthreporter/healthreporter.go @@ -0,0 +1,168 @@ +package healthreporter + +import ( + "encoding/json" + "sync" + "time" + + "github.com/google/uuid" + + "github.com/anchore/k8s-inventory/internal/anchore" + "github.com/anchore/k8s-inventory/internal/config" + "github.com/anchore/k8s-inventory/internal/log" + jstime "github.com/anchore/k8s-inventory/internal/time" + intg "github.com/anchore/k8s-inventory/pkg/integration" +) + +const healthProtocolVersion = 1 +const healthDataVersion = 1 +const healthDataType = "anchore_k8s_inventory_agent" +const HealthReportAPIPathV2 = "v2/system/integrations/{{id}}/health-report" + +type HealthReport struct { + UUID string `json:"uuid,omitempty"` // uuid for this health report + ProtocolVersion int `json:"protocol_version,omitempty"` // protocol version for "common" part of health reporting + Timestamp jstime.Datetime `json:"timestamp,omitempty"` // timestamp for this health report in UTC().Format(time.RFC3339) + Uptime *jstime.Duration `json:"uptime,omitempty"` // running time of integration instance + HealthReportInterval int `json:"health_report_interval,omitempty"` // time in seconds between health reports + HealthData HealthData `json:"health_data,omitempty"` // K8s-inventory agent specific health data +} + +type HealthData struct { + Type string `json:"type,omitempty"` // type of health data + Version int `json:"version,omitempty"` // format version + Errors HealthReportErrors `json:"errors,omitempty"` // list of errors + // Anything below this line is specific to k8s-inventory-agent + AccountK8sInventoryReports AccountK8SInventoryReports `json:"account_k8s_inventory_reports,omitempty"` // latest inventory reports per account +} + +type HealthReportErrors []string + +// AccountK8SInventoryReports holds per account information about latest inventory reports from the same batch set +type AccountK8SInventoryReports map[string]InventoryReportInfo + +type InventoryReportInfo struct { + ReportTimestamp string `json:"report_timestamp"` // Timestamp for the inventory report that was batched + Account string `json:"account_name"` // Name of account to which the inventory report belongs + SentAsUser string `json:"sent_as_user"` // User that the inventory report was sent as + BatchSize int `json:"batch_size"` // Number of batches that the inventory report was sent in + LastSuccessfulIndex int `json:"last_successful_index"` // Index of last successfully sent batch, -1 if none + HasErrors bool `json:"has_errors"` // HasErrors is true if any of the batches had an error, false otherwise + Batches []BatchInfo `json:"batches"` // Information about each inventory report batch +} + +type BatchInfo struct { + BatchIndex int `json:"batch_index,omitempty"` // Index of this inventory report batch item + SendTimestamp jstime.Datetime `json:"send_timestamp,omitempty"` // Timestamp when the batch was sent, in UTC().Format(time.RFC3339) + Error string `json:"error,omitempty"` // Any error this batch encountered when sent +} + +// GatedReportInfo The go routine that generates the inventory report must inform the go routine +// that sends health reports about the *latest* sent inventory reports. +// A buffered channel is FIFO so the earliest inserted items are returned first. No new items can +// be added when the buffer is full. This means that the information about the latest sent health +// reports will have to be dropped in such situations. We would rather drop the information about +// the *oldest* sent health reports. +// We therefore use a map (key'ed by account) to store information about the latest sent inventory +// reports This map is shared by the go routine that generates inventory reports and the go +// routine that sends health reports. Access to the map is coordinated by a mutex. +type GatedReportInfo struct { + AccessGate sync.RWMutex + AccountInventoryReports AccountK8SInventoryReports +} + +func PeriodicallySendHealthReport(cfg *config.Application, integration *intg.Integration, gatedReportInfo *GatedReportInfo) { + ticker := time.NewTicker(time.Duration(cfg.HealthReportIntervalSeconds) * time.Second) + + for { + log.Infof("Waiting %d seconds to send health report...", cfg.HealthReportIntervalSeconds) + + healthReportID := uuid.New().String() + lastReports := GetAccountReportInfoNoBlocking(gatedReportInfo, cfg) + + now := jstime.Now().UTC() + integration.Uptime = now.Sub(integration.StartedAt) + healthReport := HealthReport{ + UUID: healthReportID, + ProtocolVersion: healthProtocolVersion, + Timestamp: now, + Uptime: integration.Uptime, + HealthData: HealthData{ + Type: healthDataType, + Version: healthDataVersion, + Errors: make(HealthReportErrors, 0), // any errors are only reported in lastReports + AccountK8sInventoryReports: lastReports, + }, + HealthReportInterval: cfg.HealthReportIntervalSeconds, + } + + log.Infof("Sending health report (uuid:%s) covering %d accounts", healthReport.UUID, len(healthReport.HealthData.AccountK8sInventoryReports)) + requestBody, err := json.Marshal(healthReport) + if err != nil { + log.Errorf("failed to serialize integration registration as JSON: %v", err) + } else { + log.Debugf("Size of healthreport as marshalled json %d bytes", len(requestBody)) // Bob. End of Remove this + _, err = anchore.Post(requestBody, integration.UUID, HealthReportAPIPathV2, cfg.AnchoreDetails, "health report") + if err != nil { + log.Errorf("Failed to send health report to Anchore: %v", err) + } + } + + // log.Debugf("Start new health report: %s", <-ticker.C) + <-ticker.C + } +} + +func GetAccountReportInfoNoBlocking(gatedReportInfo *GatedReportInfo, cfg *config.Application) AccountK8SInventoryReports { + locked := gatedReportInfo.AccessGate.TryLock() + + if locked { + defer gatedReportInfo.AccessGate.Unlock() + + log.Info("Removing inventory report info for accounts that are no longer active") + accountsToRemove := make(map[string]bool) + now := jstime.Now().UTC() + inactiveAge := 2 * float64(cfg.PollingIntervalSeconds) + + for account, reportInfo := range gatedReportInfo.AccountInventoryReports { + for _, batchInfo := range reportInfo.Batches { + log.Debugf("Last inv.report (time:%s, account:%s, batch:%d/%d, sent:%s error:'%s')", + reportInfo.ReportTimestamp, account, batchInfo.BatchIndex, reportInfo.BatchSize, + batchInfo.SendTimestamp, batchInfo.Error) + reportTime, err := time.Parse(time.RFC3339, reportInfo.ReportTimestamp) + if err != nil { + log.Errorf("failed to parse report_timestamp: %v", err) + continue + } + if now.Sub(jstime.Datetime{Time: reportTime}).Seconds() > inactiveAge { + accountsToRemove[account] = true + } + } + } + + for accountToRemove := range accountsToRemove { + log.Debugf("Accounts no longer considered active: %s", accountToRemove) + delete(gatedReportInfo.AccountInventoryReports, accountToRemove) + } + + return gatedReportInfo.AccountInventoryReports + } + log.Debugf("Unable to obtain mutex lock to get aocount inventory report information. Continuing.") + return AccountK8SInventoryReports{} +} + +func SetReportInfoNoBlocking(accountName string, count int, reportInfo InventoryReportInfo, gatedReportInfo *GatedReportInfo) { + log.Debugf("Setting report (%s) for account name '%s': %d/%d %s %s", reportInfo.ReportTimestamp, accountName, + reportInfo.Batches[count].BatchIndex, reportInfo.BatchSize, reportInfo.Batches[count].SendTimestamp, + reportInfo.Batches[count].Error) + locked := gatedReportInfo.AccessGate.TryLock() + if locked { + defer gatedReportInfo.AccessGate.Unlock() + gatedReportInfo.AccountInventoryReports[accountName] = reportInfo + } else { + // we prioritize no blocking over actually bookkeeping info for every sent inventory report + log.Debugf("Unable to obtain mutex lock to include inventory report timestamped %s for %s: %d/%d %s in health report. Continuing.", + reportInfo.ReportTimestamp, accountName, reportInfo.Batches[count].BatchIndex, reportInfo.BatchSize, + reportInfo.Batches[count].SendTimestamp) + } +} diff --git a/pkg/healthreporter/healthreporter_test.go b/pkg/healthreporter/healthreporter_test.go new file mode 100644 index 0000000..bde7436 --- /dev/null +++ b/pkg/healthreporter/healthreporter_test.go @@ -0,0 +1,119 @@ +package healthreporter + +import ( + "github.com/anchore/k8s-inventory/internal/config" + jstime "github.com/anchore/k8s-inventory/internal/time" + "github.com/stretchr/testify/assert" + "reflect" + "testing" + "time" +) + +const mutexLocked = int64(1 << iota) // mutex is locked + +var now = time.Now().UTC() + +var ( + reportInfo = InventoryReportInfo{ + ReportTimestamp: now.Format(time.RFC3339), + Account: "testAccount", + SentAsUser: "testAccountUser", + BatchSize: 1, + LastSuccessfulIndex: 1, + HasErrors: false, + Batches: []BatchInfo{ + { + BatchIndex: 0, + SendTimestamp: jstime.Now(), + Error: "", + }, + }, + } + reportInfoExpired = InventoryReportInfo{ + ReportTimestamp: now.Add(time.Second * (-3800)).Format(time.RFC3339), + Account: "testAccount2", + SentAsUser: "testAccount2User", + BatchSize: 1, + LastSuccessfulIndex: 1, + HasErrors: false, + Batches: []BatchInfo{ + { + BatchIndex: 0, + SendTimestamp: jstime.Now(), + Error: "", + }, + }, + } +) + +func TestGetAccountReportInfoNoBlockingWhenObtainingLockRemovesExpired(t *testing.T) { + gatedReportInfo := GatedReportInfo{ + AccountInventoryReports: make(AccountK8SInventoryReports, 2), + } + gatedReportInfo.AccountInventoryReports[reportInfo.Account] = reportInfo + gatedReportInfo.AccountInventoryReports[reportInfoExpired.Account] = reportInfoExpired + + cfg := config.Application{ + PollingIntervalSeconds: 30 * 60, + } + + result := GetAccountReportInfoNoBlocking(&gatedReportInfo, &cfg) + assert.Equal(t, len(result), 1) + assert.Contains(t, result, reportInfo.Account) + assert.Equal(t, len(gatedReportInfo.AccountInventoryReports), 1) + assert.Contains(t, gatedReportInfo.AccountInventoryReports, reportInfo.Account) +} + +func TestGetAccountReportInfoBlockingWhenNotObtainingLockExpiredUnaffected(t *testing.T) { + gatedReportInfo := GatedReportInfo{ + AccountInventoryReports: make(AccountK8SInventoryReports, 2), + } + gatedReportInfo.AccountInventoryReports[reportInfo.Account] = reportInfo + gatedReportInfo.AccountInventoryReports[reportInfoExpired.Account] = reportInfoExpired + gatedReportInfo.AccessGate.Lock() + + cfg := config.Application{ + PollingIntervalSeconds: 3 * 60, + } + + result := GetAccountReportInfoNoBlocking(&gatedReportInfo, &cfg) + assert.Equal(t, len(result), 0) + assert.Equal(t, len(gatedReportInfo.AccountInventoryReports), 2) + assert.Contains(t, gatedReportInfo.AccountInventoryReports, reportInfo.Account) + assert.Contains(t, gatedReportInfo.AccountInventoryReports, reportInfoExpired.Account) + // check mutex is still locked after operation + mutexState := reflect.ValueOf(&gatedReportInfo.AccessGate).Elem().FieldByName("w").FieldByName("state") + assert.Equal(t, mutexState.Int()&mutexLocked, mutexLocked) +} + +func TestSetReportInfoNoBlockingSetsWhenObtainingLock(t *testing.T) { + gatedReportInfo := GatedReportInfo{ + AccountInventoryReports: make(AccountK8SInventoryReports, 1), + } + accountName := "testAccount" + count := 0 + + SetReportInfoNoBlocking(accountName, count, reportInfo, &gatedReportInfo) + + assert.Equal(t, reportInfo, gatedReportInfo.AccountInventoryReports[accountName]) + // check mutex is unlocked after operation + mutexState := reflect.ValueOf(&gatedReportInfo.AccessGate).Elem().FieldByName("w").FieldByName("state") + assert.Equal(t, mutexState.Int()&mutexLocked, int64(0)) +} + +func TestSetReportInfoNoBlockingSkipsWhenLockAlreadyTaken(t *testing.T) { + gatedReportInfo := GatedReportInfo{ + AccountInventoryReports: make(AccountK8SInventoryReports, 1), + } + gatedReportInfo.AccessGate.Lock() + + accountName := "testAccount" + count := 0 + + SetReportInfoNoBlocking(accountName, count, reportInfo, &gatedReportInfo) + + assert.NotContains(t, gatedReportInfo.AccountInventoryReports, accountName) + // check mutex is still locked after operation + mutexState := reflect.ValueOf(&gatedReportInfo.AccessGate).Elem().FieldByName("w").FieldByName("state") + assert.Equal(t, mutexState.Int()&mutexLocked, mutexLocked) +} diff --git a/pkg/integration/integration.go b/pkg/integration/integration.go new file mode 100644 index 0000000..c2ce975 --- /dev/null +++ b/pkg/integration/integration.go @@ -0,0 +1,299 @@ +package integration + +import ( + "context" + "encoding/json" + "fmt" + "github.com/anchore/k8s-inventory/pkg/client" + "github.com/google/uuid" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "os" + "time" + + "github.com/anchore/k8s-inventory/internal/anchore" + "github.com/anchore/k8s-inventory/internal/config" + "github.com/anchore/k8s-inventory/internal/log" + jstime "github.com/anchore/k8s-inventory/internal/time" +) + +var HealthReportingDisabled = false + +const Type = "anchore_k8s_inventory_agent" +const RegisterAPIPathV2 = "v2/system/integrations/registration" +const AppVersionLabel = "app.kubernetes.io/version" + +// HealthStatus reflects the state of the Integration wrt any errors +// encountered when performing its tasks +type HealthStatus struct { + State string `json:"state,omitempty"` // state of the integration HEALTHY or UNHEALTHY + Reason string `json:"reason,omitempty"` + Details any `json:"details,omitempty"` +} + +// LifeCycleStatus reflects the state of the Integration from the perspective of Enterprise +type LifeCycleStatus struct { + State string `json:"state,omitempty"` // lifecycle state REGISTERED, ACTIVE, DEGRADED, DEACTIVATED + Reason string `json:"reason,omitempty"` + Details any `json:"details,omitempty"` + UpdatedAt jstime.Datetime `json:"updated_at,omitempty"` +} + +type Integration struct { + UUID string `json:"uuid,omitempty"` // uuid provided to this integration instance during + Type string `json:"type,omitempty"` // type of integration (e.g., 'anchore-k8s-agent') + Name string `json:"name,omitempty"` // name of the integration instance (e.g., k8s-agent-admin') + Description string `json:"description,omitempty"` // short description of integration instance + Version string `json:"version,omitempty"` // version of the integration instance + ReportedStatus *HealthStatus `json:"reported_status,omitempty"` // health status of the integration (Read-only) + IntegrationStatus *LifeCycleStatus `json:"integration_status,omitempty"` // lifecycle status of the integration (Read-only) + StartedAt jstime.Datetime `json:"started_at,omitempty"` // timestamp when integration instance was started in UTC().Format(time.RFC3339) + LastSeen *jstime.Datetime `json:"last_seen,omitempty"` // timestamp of last received health report from integration instance (Read-only) + Uptime *jstime.Duration `json:"uptime,omitempty"` // running time of integration instance + Username string `json:"username,omitempty"` // user that the integration instance authenticates as during registration + AccountName string `json:"account_name,omitempty"` // default account that the integration instance authenticates as during registration + ExplicitlyAccountBound []string `json:"explicitly_account_bound,omitempty"` // accounts that the integration instance is explicitly configured to handle + Accounts []string `json:"accounts,omitempty"` // names of accounts that the integration instance handled recently + Namespaces []string `json:"namespaces,omitempty"` // namespaces that the integration instance handles + // Configuration *config.Application `json:"configuration,omitempty"` // configuration for the integration instance + ClusterName string `json:"cluster_name,omitempty"` // name of cluster where the integration instance runs + Namespace string `json:"namespace,omitempty"` // uuid for namespace that the integration instance belongs to + HealthReportInterval int `json:"health_report_interval,omitempty"` // time in seconds between health reports + RegistrationID string `json:"registration_id,omitempty"` // uuid that integration used during registration + RegistrationInstanceID string `json:"registration_instance_id,omitempty"` // instance id used by the integration during registration +} + +type Registration struct { + RegistrationID string `json:"registration_id,omitempty"` // uyid that identifies integration during registration + RegistrationInstanceID string `json:"registration_instance_id,omitempty"` // identifier that make integration instance unique among its replicas during registration + Type string `json:"type,omitempty"` // type of integration (e.g., 'anchore-k8s-agent') + Name string `json:"name,omitempty"` // name of the integration instance (e.g., k8s-agent-admin') + Description string `json:"description,omitempty"` // short description of integration instance + Version string `json:"version,omitempty"` // version of the integration instance + StartedAt jstime.Datetime `json:"started_at,omitempty"` // timestamp when integration instance was started in UTC().Format(time.RFC3339) + Uptime *jstime.Duration `json:"uptime,omitempty"` // running time of integration instance + Username string `json:"username,omitempty"` // user that the integration instance authenticates as during registration + ExplicitlyAccountBound []string `json:"explicitly_account_bound,omitempty"` // accounts that the integration instance is explicitly configured to handle + Namespaces []string `json:"namespaces,omitempty"` // namespaces that the integration instance is explicitly configured to handle + Configuration *config.Application `json:"configuration,omitempty"` // configuration for the integration instance + ClusterName string `json:"cluster_name,omitempty"` // name of cluster where the integration instance runs + Namespace string `json:"namespace,omitempty"` // uuid for namespace that the integration instance belongs to + HealthReportInterval int `json:"health_report_interval,omitempty"` // time in seconds between health reports +} + +func PerformRegistration(appConfig *config.Application) (*Integration, error) { + namespace := os.Getenv("POD_NAMESPACE") + name := os.Getenv("HOSTNAME") + + registrationInfo := getRegistrationInfo(appConfig, namespace, name) + + // Register this agent with enterprise + registeredIntegration, err := register(registrationInfo, appConfig.AnchoreDetails, -1, + 2*time.Second, 10*time.Minute) + if err != nil { + log.Errorf("Unable to register agent: %v", err) + return nil, err + } + return registeredIntegration, nil +} + +func register(registrationInfo *Registration, anchoreDetails config.AnchoreInfo, maxRetry int, + startBackoff, maxBackoff time.Duration) (*Integration, error) { + var err error + + attempt := 0 + for { + var registeredIntegration *Integration + + registeredIntegration, err = doRegister(registrationInfo, anchoreDetails) + if err == nil { + log.Infof("Successfully registered %s agent: %s (registration_id:%s / registration_instance_id:%s) with %s", + registrationInfo.Type, registrationInfo.Name, registrationInfo.RegistrationID, + registrationInfo.RegistrationInstanceID, anchoreDetails.URL) + log.Infof("This agent's integration uuid is %s", registeredIntegration.UUID) + return registeredIntegration, nil + } + + attempt++ + if maxRetry >= 0 && attempt > maxRetry { + log.Infof("Failed to register agent (registration_id:%s / registration_instance_id:%s) after %d attempts", + registrationInfo.RegistrationID, registrationInfo.RegistrationInstanceID, attempt) + return nil, fmt.Errorf("failed to register after %d attempts", attempt) + } + + if anchore.ServerIsOffline(err) { + log.Infof("Anchore is offline. Will try again in %s", startBackoff) + time.Sleep(startBackoff) + if startBackoff < maxBackoff { + startBackoff *= 2 + } + continue + } + + if anchore.ServerLacksAgentHealthAPISupport(err) { + // TBD: exit or disable health reporting and let inventory reporting start? + log.Warnf("Anchore does not support Integration registration and health reporting API") + log.Infof("Proceeding without health reporting") + HealthReportingDisabled = true + return nil, nil + } + + if anchore.UserLacksAPIPrivileges(err) { + log.Errorf("Specified user lacks required privileges to register and send health reports %v", err) + return nil, err + } + + if anchore.IncorrectCredentials(err) { + log.Errorf("Failed to register due to invalid credentials (wrong username or password") + return nil, err + } + + log.Errorf("Failed to register integration agent (registration_id:%s / regitration_instance_id:%s): %s", + registrationInfo.RegistrationID, registrationInfo.RegistrationInstanceID, err) + return nil, err + } +} + +func doRegister(registrationInfo *Registration, anchoreDetails config.AnchoreInfo) (*Integration, error) { + log.Infof("Registering %s agent: %s (registration_id:%s / regitration_instance_id:%s) with %s", + registrationInfo.Type, registrationInfo.Name, registrationInfo.RegistrationID, + registrationInfo.RegistrationInstanceID, anchoreDetails.URL) + requestBody, err := json.Marshal(registrationInfo) + if err != nil { + return nil, fmt.Errorf("failed to serialize integration registration as JSON: %w", err) + } + responseBody, err := anchore.Post(requestBody, "", RegisterAPIPathV2, anchoreDetails, "integration registration") + if err != nil { + return nil, err + } + registeredIntegration := Integration{} + err = json.Unmarshal(*responseBody, ®isteredIntegration) + return ®isteredIntegration, err +} + +func getRegistrationInfo(appConfig *config.Application, namespace string, name string) *Registration { + var registrationID, registrationInstanceID, instanceName, version, description string + + if appConfig.Registration.RegistrationID != "" { + log.Infof("Registration_id is specified in config: %s", appConfig.Registration.RegistrationID) + registrationID = appConfig.Registration.RegistrationID + } else { + log.Infof("Attempting to determine registration_id from K8s Deployment for Pod: %s in Namespace: %s", + name, namespace) + registrationID, instanceName, version = getInstanceDataFromK8s(appConfig, namespace, name) + } + if registrationID == "" { + log.Infof("The registration_id value is not valid. Generating UUIDv4 to use as registration_id") + registrationID = uuid.New().String() + } + + if name != "" { + log.Infof("Using registration_instance_id: %s", name) + registrationInstanceID = name + } else { + log.Infof("Generating UUIDv4 to use as registration_instance_id") + registrationInstanceID = uuid.New().String() + } + + if appConfig.Registration.IntegrationName != "" { + log.Infof("Name for integration is specified in config: %s", appConfig.Registration.IntegrationName) + instanceName = appConfig.Registration.IntegrationName + } + + if appConfig.Registration.IntegrationDescription != "" { + log.Infof("Description for integration is specified in config: %s", + appConfig.Registration.IntegrationDescription) + description = appConfig.Registration.IntegrationDescription + } + + log.Infof("Integration registration_id: %s, registration_instance_id: %s, name: %s, description: %s", + registrationID, registrationInstanceID, instanceName, description) + + explicitlyAccountBound, namespaces := getAccountsAndNamespacesForAgent(appConfig) + + instance := Registration{ + RegistrationID: registrationID, + RegistrationInstanceID: registrationInstanceID, + Type: Type, + Name: instanceName, + Description: description, + Version: version, + StartedAt: jstime.Now().UTC(), + Uptime: new(jstime.Duration), + Username: appConfig.AnchoreDetails.User, + ExplicitlyAccountBound: explicitlyAccountBound, + Namespaces: namespaces, + Configuration: nil, + ClusterName: appConfig.KubeConfig.Cluster, + Namespace: namespace, + HealthReportInterval: appConfig.HealthReportIntervalSeconds, + } + return &instance +} + +func getAccountsAndNamespacesForAgent(appConfig *config.Application) ([]string, []string) { + accountSet := make(map[string]bool) + namespaceSet := make(map[string]bool) + + // pick up accounts that are explicitly listed in the config + for account, accountRouteDetails := range appConfig.AccountRoutes { + accountSet[account] = true + for _, namespace := range accountRouteDetails.Namespaces { + namespaceSet[namespace] = true + } + } + accounts := make([]string, 0, len(accountSet)) + for account := range accountSet { + accounts = append(accounts, account) + } + + namespaces := make([]string, 0, len(namespaceSet)) + for namespace := range namespaceSet { + namespaces = append(namespaces, namespace) + } + + return accounts, namespaces +} + +func getInstanceDataFromK8s(appConfig *config.Application, namespace string, podName string) (string, string, string) { + kubeconfig, err := client.GetKubeConfig(appConfig) + if err != nil { + log.Errorf("Failed to get Kubernetes config: %v", err) + return "", "", "" + } + + clientset, err := client.GetClientSet(kubeconfig) + if err != nil { + log.Errorf("failed to get k8s client set: %v", err) + return "", "", "" + } + + k8sClient := client.Client{ + Clientset: clientset, + } + + opts := metav1.GetOptions{} + pod, err := k8sClient.Clientset.CoreV1().Pods(namespace).Get(context.Background(), podName, opts) + if err != nil { + log.Errorf("failed to get pod: %v", err) + return "", "", "" + } + replicaSetName := pod.ObjectMeta.OwnerReferences[0].Name + replicaSet, err := k8sClient.Clientset.AppsV1().ReplicaSets(namespace).Get(context.Background(), replicaSetName, opts) + if err != nil { + log.Errorf("failed to get replica set: %v", err) + return "", "", "" + } + deploymentName := replicaSet.ObjectMeta.OwnerReferences[0].Name + deployment, err := k8sClient.Clientset.AppsV1().Deployments(namespace).Get(context.Background(), deploymentName, opts) + if err != nil { + log.Errorf("failed to get deployment: %v", err) + return "", "", "" + } + + appVersion := deployment.Labels[AppVersionLabel] + registrationID := fmt.Sprint("", deployment.ObjectMeta.UID) + instanceName := deploymentName + log.Infof("Determined integration values for agent from K8s, registration_id: %s, instance_name: %s, appVersion: %s", + registrationID, instanceName, appVersion) + return registrationID, instanceName, appVersion +} diff --git a/pkg/lib.go b/pkg/lib.go index dfb8b53..463f930 100644 --- a/pkg/lib.go +++ b/pkg/lib.go @@ -7,20 +7,22 @@ import ( "encoding/json" "errors" "fmt" + "github.com/anchore/k8s-inventory/pkg/integration" "os" "regexp" "time" - "github.com/anchore/k8s-inventory/pkg/reporter" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "github.com/anchore/k8s-inventory/internal/config" "github.com/anchore/k8s-inventory/internal/log" + jstime "github.com/anchore/k8s-inventory/internal/time" "github.com/anchore/k8s-inventory/pkg/client" + "github.com/anchore/k8s-inventory/pkg/healthreporter" "github.com/anchore/k8s-inventory/pkg/inventory" "github.com/anchore/k8s-inventory/pkg/logger" + "github.com/anchore/k8s-inventory/pkg/reporter" ) type ReportItem struct { @@ -49,7 +51,7 @@ func reportToStdout(report inventory.Report) error { return nil } -func HandleReport(report inventory.Report, cfg *config.Application, account string) error { +func HandleReport(report inventory.Report, reportInfo *healthreporter.InventoryReportInfo, cfg *config.Application, account string) error { if cfg.VerboseInventoryReports { err := reportToStdout(report) if err != nil { @@ -76,6 +78,7 @@ func HandleReport(report inventory.Report, cfg *config.Application, account stri } if anchoreDetails.IsValid() { + reportInfo.SentAsUser = anchoreDetails.User if err := reporter.Post(report, anchoreDetails); err != nil { if errors.Is(err, reporter.ErrAnchoreAccountDoesNotExist) { return err @@ -91,7 +94,7 @@ func HandleReport(report inventory.Report, cfg *config.Application, account stri // PeriodicallyGetInventoryReport periodically retrieve image results and report/output them according to the configuration. // Note: Errors do not cause the function to exit, since this is periodically running -func PeriodicallyGetInventoryReport(cfg *config.Application) { +func PeriodicallyGetInventoryReport(cfg *config.Application, gatedReportInfo *healthreporter.GatedReportInfo) { // Fire off a ticker that reports according to a configurable polling interval ticker := time.NewTicker(time.Duration(cfg.PollingIntervalSeconds) * time.Second) @@ -101,20 +104,47 @@ func PeriodicallyGetInventoryReport(cfg *config.Application) { log.Errorf("Failed to get Inventory Report: %w", err) } else { for account, reportsForAccount := range reports { + reportInfo := healthreporter.InventoryReportInfo{ + Account: account, + BatchSize: len(reportsForAccount), + LastSuccessfulIndex: -1, + Batches: make([]healthreporter.BatchInfo, 0), + HasErrors: false, + } for count, report := range reportsForAccount { log.Infof("Sending Inventory Report to Anchore Account %s, %d of %d", account, count+1, len(reportsForAccount)) - err := HandleReport(report, cfg, account) + + reportInfo.ReportTimestamp = report.Timestamp + batchInfo := healthreporter.BatchInfo{ + SendTimestamp: jstime.Now().UTC(), + BatchIndex: count + 1, + } + + err := HandleReport(report, &reportInfo, cfg, account) if errors.Is(err, reporter.ErrAnchoreAccountDoesNotExist) { + // record this error for the health report even if the retry works + batchInfo.Error = fmt.Sprintf("%s (%s) | ", err.Error(), account) + reportInfo.HasErrors = true + // Retry with default account retryAccount := cfg.AnchoreDetails.Account if cfg.AccountRouteByNamespaceLabel.DefaultAccount != "" { retryAccount = cfg.AccountRouteByNamespaceLabel.DefaultAccount } log.Warnf("Error sending to Anchore Account %s, sending to default account", account) - err = HandleReport(report, cfg, retryAccount) + err = HandleReport(report, &reportInfo, cfg, retryAccount) } if err != nil { log.Errorf("Failed to handle Inventory Report: %w", err) + // append the error to any error that happened during a retry, so we record both failures + batchInfo.Error += err.Error() + reportInfo.HasErrors = true + } else { + reportInfo.LastSuccessfulIndex = count + 1 + } + if !integration.HealthReportingDisabled { + reportInfo.Batches = append(reportInfo.Batches, batchInfo) + healthreporter.SetReportInfoNoBlocking(account, count, reportInfo, gatedReportInfo) } } } diff --git a/skaffold.yaml b/skaffold.yaml index 4072691..f669bf2 100644 --- a/skaffold.yaml +++ b/skaffold.yaml @@ -23,6 +23,7 @@ deploy: k8sInventory.quiet: false k8sInventory.verboseInventoryReports: true k8sInventory.pollingIntervalSeconds: 60 + k8sInventory.healthReportIntervalSeconds: 60 k8sInventory.anchore.url: "http://host.docker.internal:8228" k8sInventory.anchore.user: "admin" k8sInventory.anchore.password: "foobar"