Skip to content

Commit

Permalink
Merge pull request #233 from anchore/rework-batch-workflow
Browse files Browse the repository at this point in the history
feat: rework batch workflow
  • Loading branch information
bradleyjones authored Jun 5, 2024
2 parents 67398c8 + 9c58ceb commit de9bf7b
Show file tree
Hide file tree
Showing 3 changed files with 531 additions and 29 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
.tmp/
*.tmp
coverage.txt
cover.cov

# Binaries for programs and plugins
*.exe
Expand Down
119 changes: 91 additions & 28 deletions pkg/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"os"
"regexp"
"time"
Expand Down Expand Up @@ -36,7 +35,8 @@ type channels struct {
stopper chan struct{}
}

type AccountRoutedReports map[string][]inventory.Report
type AccountRoutedReports map[string]inventory.Report
type BatchedReports map[string][]inventory.Report

func reportToStdout(report inventory.Report) error {
enc := json.NewEncoder(os.Stdout)
Expand Down Expand Up @@ -342,29 +342,18 @@ func GetNamespacesBatches(namespaces []inventory.Namespace, batchSize int) [][]i
return batches
}

func GetInventoryReports(cfg *config.Application) (AccountRoutedReports, error) {
func GetInventoryReports(cfg *config.Application) (BatchedReports, error) {
log.Info("Starting image inventory collection")

reports := AccountRoutedReports{}
batchSize := cfg.InventoryReportLimits.Namespaces
if batchSize > 0 {
log.Infof("Batching namespaces into groups of %d", batchSize)
}

namespaces, _ := GetAllNamespaces(cfg)

if len(cfg.AccountRoutes) == 0 && cfg.AccountRouteByNamespaceLabel.LabelKey == "" {
totalExpectedBatches := int(math.Ceil(float64(len(namespaces)) / float64(batchSize)))
for batchCount, batch := range GetNamespacesBatches(namespaces, batchSize) {
if batchSize > 0 {
log.Infof("Collecting batch %d of %d for account %s", batchCount+1, totalExpectedBatches, cfg.AnchoreDetails.Account)
}
batchNamespacesReport, err := GetInventoryReportForNamespaces(cfg, batch)
if err != nil {
return AccountRoutedReports{}, err
}
reports[cfg.AnchoreDetails.Account] = append(reports[cfg.AnchoreDetails.Account], batchNamespacesReport)
allNamespacesReport, err := GetInventoryReportForNamespaces(cfg, namespaces)
if err != nil {
return BatchedReports{}, err
}
reports[cfg.AnchoreDetails.Account] = allNamespacesReport
} else {
accountRoutesForAllNamespaces := GetAccountRoutedNamespaces(cfg.AnchoreDetails.Account, namespaces, cfg.AccountRoutes, cfg.AccountRouteByNamespaceLabel)

Expand All @@ -378,21 +367,95 @@ func GetInventoryReports(cfg *config.Application) (AccountRoutedReports, error)

// Get inventory reports for each account
for account, namespaces := range accountRoutesForAllNamespaces {
totalExpectedBatches := int(math.Ceil(float64(len(namespaces)) / float64(batchSize)))
for batchCount, batch := range GetNamespacesBatches(namespaces, batchSize) {
if batchSize > 0 {
log.Infof("Collecting inventory batch %d of %d for account %s", batchCount+1, totalExpectedBatches, account)
}
accountReport, err := GetInventoryReportForNamespaces(cfg, batch)
if err != nil {
return AccountRoutedReports{}, err
accountReport, err := GetInventoryReportForNamespaces(cfg, namespaces)
if err != nil {
return BatchedReports{}, err
}
reports[account] = accountReport
}
}

return getBatchedInventoryReports(reports, cfg.InventoryReportLimits.Namespaces), nil
}

//nolint:gocognit,funlen
func getBatchedInventoryReports(reports AccountRoutedReports, batchSize int) BatchedReports {
batchedReports := BatchedReports{}
if batchSize <= 0 {
for account, report := range reports {
batchedReports[account] = append(batchedReports[account], report)
}
return batchedReports
}

log.Infof("Batching namespaces into groups of %d", batchSize)
for account, accountReport := range reports {
if len(accountReport.Namespaces) <= batchSize {
batchedReports[account] = append(batchedReports[account], accountReport)
continue
}
namespaceBatches := make([][]inventory.Namespace, 0)
// Construct batches of namespaces
for i := 0; i < len(accountReport.Namespaces); i += batchSize {
end := i + batchSize
if end > len(accountReport.Namespaces) {
end = len(accountReport.Namespaces)
}
namespaceBatches = append(namespaceBatches, accountReport.Namespaces[i:end])
}

nodeMap := make(map[string]inventory.Node)
for _, node := range accountReport.Nodes {
nodeMap[node.UID] = node
}
podMap := make(map[string]inventory.Pod)
for _, pod := range accountReport.Pods {
podMap[pod.UID] = pod
}
containersByPod := make(map[string][]inventory.Container)
for _, container := range accountReport.Containers {
containersByPod[container.PodUID] = append(containersByPod[container.PodUID], container)
}
podsByNamespace := make(map[string][]inventory.Pod)
for _, pod := range accountReport.Pods {
podsByNamespace[pod.NamespaceUID] = append(podsByNamespace[pod.NamespaceUID], pod)
}
containersByNamespace := make(map[string][]inventory.Container)
for _, container := range accountReport.Containers {
namespaceUID := podMap[container.PodUID].NamespaceUID
containersByNamespace[namespaceUID] = append(containersByNamespace[namespaceUID], container)
}

// Construct reports for each batch
for _, batch := range namespaceBatches {
batchedPodsSlice := []inventory.Pod{}
batchedContainersSlice := []inventory.Container{}
batchedNodesMap := map[string]inventory.Node{}
for _, ns := range batch {
batchedPodsSlice = append(batchedPodsSlice, podsByNamespace[ns.UID]...)
batchedContainersSlice = append(batchedContainersSlice, containersByNamespace[ns.UID]...)
for _, pod := range batchedPodsSlice {
batchedNodesMap[pod.NodeUID] = nodeMap[pod.NodeUID]
}
reports[account] = append(reports[account], accountReport)
}
batchedNodesSlice := []inventory.Node{}
for _, node := range batchedNodesMap {
batchedNodesSlice = append(batchedNodesSlice, node)
}
batchedReport := inventory.Report{
Timestamp: accountReport.Timestamp,
Containers: batchedContainersSlice,
Pods: batchedPodsSlice,
Namespaces: batch,
Nodes: batchedNodesSlice,
ServerVersionMetadata: accountReport.ServerVersionMetadata,
ClusterName: accountReport.ClusterName,
}
batchedReports[account] = append(batchedReports[account], batchedReport)
}
}

return reports, nil
return batchedReports
}

func processNamespace(
Expand Down
Loading

0 comments on commit de9bf7b

Please sign in to comment.