Skip to content

Commit

Permalink
more wip
Browse files Browse the repository at this point in the history
Signed-off-by: Ivan Milchev <[email protected]>
  • Loading branch information
imilchev committed Jan 28, 2025
1 parent 626c8fc commit 7f73217
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 33 deletions.
13 changes: 6 additions & 7 deletions explorer/scan/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func DiscoverAssets(ctx context.Context, req *AssetDiscoveryRequest) (*Discovere
}

// for all discovered assets, we apply mondoo-specific labels and annotations that come from the root asset
discoverAssets(rootAssetWithRuntime, resolvedRootAsset, discoveredAssets, runtimeLabels, req.Upstream, req.Recording)
discoverAssets(rootAssetWithRuntime, resolvedRootAsset, discoveredAssets, runtimeLabels, req.Upstream, req.Recording, req.Depth, 0)
}

// if there is exactly one asset, assure that the --asset-name is used
Expand All @@ -173,19 +173,19 @@ func DiscoverAssets(ctx context.Context, req *AssetDiscoveryRequest) (*Discovere
return discoveredAssets, nil
}

func discoverAssets(rootAssetWithRuntime *AssetWithRuntime, resolvedRootAsset *inventory.Asset, discoveredAssets *DiscoveredAssets, runtimeLabels map[string]string, upstream *upstream.UpstreamConfig, recording llx.Recording) {
func discoverAssets(rootAssetWithRuntime *AssetWithRuntime, resolvedRootAsset *inventory.Asset, discoveredAssets *DiscoveredAssets, runtimeLabels map[string]string, upstream *upstream.UpstreamConfig, recording llx.Recording, depth uint, maxDepth uint) {
defer logger.FuncDur(time.Now(), "explorer.discoverAssets")

// It is possible that we did not discover any assets under the root asset. In that case the inventory
// would be nil and we can return
if rootAssetWithRuntime.Runtime.Provider.Connection.Inventory == nil {
if rootAssetWithRuntime.Runtime.Provider.Connection.Inventory == nil || maxDepth == depth {
return
}

pool := workerpool.New[*AssetWithRuntime](workers)
pool.Start()
defer pool.Close()

depth++
// for all discovered assets, we apply mondoo-specific labels and annotations that come from the root asset
for _, asset := range rootAssetWithRuntime.Runtime.Provider.Connection.Inventory.Spec.Assets {
pool.Submit(func() (*AssetWithRuntime, error) {
Expand Down Expand Up @@ -220,10 +220,9 @@ func discoverAssets(rootAssetWithRuntime *AssetWithRuntime, resolvedRootAsset *i
assetWithRuntime.Runtime.Close()
continue
}

discoverAssets(assetWithRuntime, resolvedRootAsset, discoveredAssets, runtimeLabels, upstream, recording)
discoverAssets(assetWithRuntime, resolvedRootAsset, discoveredAssets, runtimeLabels, upstream, recording, depth, maxDepth)
} else {
discoverAssets(assetWithRuntime, resolvedRootAsset, discoveredAssets, runtimeLabels, upstream, recording)
discoverAssets(assetWithRuntime, resolvedRootAsset, discoveredAssets, runtimeLabels, upstream, recording, depth, maxDepth)
assetWithRuntime.Runtime.Close()
}
}
Expand Down
54 changes: 31 additions & 23 deletions explorer/scan/local_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,14 @@ func (s *LocalScanner) loopyLoop(ctx context.Context, job *Job, processedAssets
Inv: job.Inventory,
Upstream: upstream,
Recording: s.recording,
Depth: 1,
}

assetNames := []string{}
for _, a := range job.Inventory.GetSpec().GetAssets() {
assetNames = append(assetNames, a.Name)
}
log.Warn().Msgf("discovering assets for %s...", strings.Join(assetNames, ", "))
discoveredAssets, err := DiscoverAssets(ctx, discoveryReq)
if err != nil {
return err
Expand All @@ -253,43 +260,27 @@ func (s *LocalScanner) loopyLoop(ctx context.Context, job *Job, processedAssets
newAssets.Add(asset.Asset, asset.Runtime)
}

log.Warn().Msgf("discovered %d assets", len(newAssets.Assets))

if len(newAssets.Assets) == 0 {
return nil
}

if err := s.scanAssets(ctx, newAssets, job, reporter, upstream); err != nil {
if err := s.scanAssets(ctx, newAssets, processedAssets, job, reporter, upstream); err != nil {
return err
}
for pid := range discoveredAssets.platformIds {
processedAssets[pid] = struct{}{}
}

for _, asset := range newAssets.Assets {
err := s.loopyLoop(ctx, &Job{
Inventory: &inventory.Inventory{
Spec: &inventory.InventorySpec{
Assets: []*inventory.Asset{asset.Asset},
},
},
Bundle: job.Bundle,
DoRecord: job.DoRecord,
QueryPackFilters: job.QueryPackFilters,
Props: job.Props,
}, processedAssets, reporter, upstream)

// Close the runtime for the asset once we are done with it
if asset.Runtime != nil {
asset.Runtime.Close()
}
if err != nil {
return err
}
}
// for _, asset := range newAssets.Assets {

// }

return nil
}

func (s *LocalScanner) scanAssets(ctx context.Context, discoveredAssets *DiscoveredAssets, job *Job, reporter *AggregateReporter, upstream *upstream.UpstreamConfig) error {
func (s *LocalScanner) scanAssets(ctx context.Context, discoveredAssets *DiscoveredAssets, processedAssets map[string]struct{}, job *Job, reporter *AggregateReporter, upstream *upstream.UpstreamConfig) error {
// if we had asset errors we want to place them into the reporter
for i := range discoveredAssets.Errors {
reporter.AddScanError(discoveredAssets.Errors[i].Asset, discoveredAssets.Errors[i].Err)
Expand Down Expand Up @@ -418,6 +409,7 @@ func (s *LocalScanner) scanAssets(ctx context.Context, discoveredAssets *Discove
asset = discoveredAsset
}

log.Warn().Msgf("scanning asset %s", asset.Name)
p := &progress.MultiProgressAdapter{Key: asset.PlatformIds[0], Multi: multiprogress}
s.RunAssetJob(&AssetJob{
DoRecord: job.DoRecord,
Expand All @@ -432,8 +424,24 @@ func (s *LocalScanner) scanAssets(ctx context.Context, discoveredAssets *Discove
runtime: runtime,
})

// TODO: discover sub-assets here instead
err := s.loopyLoop(ctx, &Job{
Inventory: &inventory.Inventory{
Spec: &inventory.InventorySpec{
Assets: []*inventory.Asset{asset},
},
},
Bundle: job.Bundle,
DoRecord: job.DoRecord,
QueryPackFilters: job.QueryPackFilters,
Props: job.Props,
}, processedAssets, reporter, upstream)
// runtimes are single-use only. Close them once they are done.
runtime.Close()
if err != nil {
reporter.AddScanError(asset, err)
}

}
}()
wg.Wait()
Expand Down
2 changes: 1 addition & 1 deletion providers/k8s/connection/api/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (c *Connection) resources(kind string, name string, namespace string) (*sha
if err != nil {
return nil, err
}
log.Debug().Msgf("found %d resource objects", len(objs))
log.Warn().Str("kind", kind).Str("name", name).Str("namespace", namespace).Msgf("found %d resource objects", len(objs))

objs, err = resources.FilterResource(resType, objs, name, namespace)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion providers/k8s/connection/shared/resources/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (d *Discovery) GetAllResources(ctx context.Context, resTypes *ApiResourceIn

log.Debug().Msg("waiting for all queries to return")
wg.Wait()
log.Debug().Msgf("query api resources completed: objects=%d, error=%v", len(out), collectErr)
log.Warn().Msgf("query api resources completed: objects=%d, error=%v", len(out), collectErr)
return out, collectErr
}

Expand Down
8 changes: 7 additions & 1 deletion providers/k8s/resources/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ func Discover(runtime *plugin.Runtime, features cnquery.Features) (*inventory.In

invConfig := conn.InventoryConfig()

if invConfig.DelayDiscovery {
return in, nil
}

res, err := runtime.CreateResource(runtime, "k8s", nil)
if err != nil {
return nil, err
Expand Down Expand Up @@ -219,6 +223,8 @@ func discoverNamespaces(
if err != nil {
return nil, err
}
inv := invConfig.Clone()
inv.DelayDiscovery = true
assetList = append(assetList, &inventory.Asset{
PlatformIds: []string{
shared.NewNamespacePlatformId(clusterId, ns.Name, string(ns.UID)),
Expand All @@ -228,7 +234,7 @@ func discoverNamespaces(
Labels: labels,
// We don't want a parent connection so there is no central cache for the resources
// for the complete cluster. We only cache resources for a single namespace
Connections: []*inventory.Config{invConfig.Clone()},
Connections: []*inventory.Config{inv},
Category: conn.Asset().Category,
})
if od != nil {
Expand Down

0 comments on commit 7f73217

Please sign in to comment.