Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make osq runner responsive to registration updates #2007

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
0c31928
rework interfaces for runner change detection
zackattack01 Dec 17, 2024
81e4209
gofmt
zackattack01 Dec 17, 2024
e028a65
restarts seem to be working this way
zackattack01 Dec 18, 2024
b9e1918
shift to using buffered shutdown channel, rework
zackattack01 Dec 19, 2024
2d74749
cleanup and add comment
zackattack01 Dec 19, 2024
db0b434
don't parallel test for runtime
zackattack01 Dec 20, 2024
81d84d9
PR feedback: add registrationID lock, and fix error handling flow whe…
zackattack01 Dec 20, 2024
a4168fb
fix merge conflict
zackattack01 Dec 20, 2024
c20b160
put tests back to parallel
zackattack01 Dec 20, 2024
b5451ff
Merge branch 'main' of https://github.com/kolide/launcher into zack/r…
zackattack01 Dec 23, 2024
c20d34a
update var names from rebase
zackattack01 Dec 23, 2024
6e9172a
Merge branch 'main' of github.com:kolide/launcher into zack/runner_re…
zackattack01 Jan 14, 2025
b6029da
update tests for new mocking and cleanup patterns
zackattack01 Jan 15, 2025
25edcce
pull in main and fix conflicts
zackattack01 Jan 24, 2025
bccda00
fix data races, add comments
zackattack01 Jan 24, 2025
506e14d
Merge branch 'main' into zack/runner_registration_ids
zackattack01 Jan 24, 2025
49654cb
Merge branch 'main' into zack/runner_registration_ids
zackattack01 Jan 24, 2025
4060380
fix up tests with new startupsettingswriter mocks
zackattack01 Jan 27, 2025
2c70a29
Merge branch 'main' into zack/runner_registration_ids
zackattack01 Jan 27, 2025
c6592cd
Merge branch 'main' into zack/runner_registration_ids
zackattack01 Jan 28, 2025
f551f6b
don't acquire locks for runner's String method
zackattack01 Jan 28, 2025
5180768
Merge branch 'main' into zack/runner_registration_ids
zackattack01 Jan 28, 2025
658307c
Update pkg/osquery/runtime/runner.go
zackattack01 Jan 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func runLauncher(ctx context.Context, cancel func(), multiSlogger, systemMultiSl
osqueryruntime.WithAugeasLensFunction(augeas.InstallLenses),
)
runGroup.Add("osqueryRunner", osqueryRunner.Run, osqueryRunner.Interrupt)
k.SetInstanceQuerier(osqueryRunner)
k.SetInstanceRunner(osqueryRunner)

versionInfo := version.Version()
k.SystemSlogger().Log(ctx, slog.LevelInfo,
Expand Down
20 changes: 14 additions & 6 deletions ee/agent/knapsack/knapsack.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type knapsack struct {

slogger, systemSlogger *multislogger.MultiSlogger

querier types.InstanceQuerier
osqRunner types.OsqRunner

// This struct is a work in progress, and will be iteratively added to as needs arise.
}
Expand Down Expand Up @@ -87,23 +87,31 @@ func (k *knapsack) AddSlogHandler(handler ...slog.Handler) {
k.systemSlogger.AddHandler(handler...)
}

// Osquery instance querier
func (k *knapsack) SetInstanceQuerier(q types.InstanceQuerier) {
k.querier = q
// Osquery instance runner
func (k *knapsack) SetInstanceRunner(r types.OsqRunner) {
k.osqRunner = r
}

// RegistrationTracker interface methods
func (k *knapsack) RegistrationIDs() []string {
return []string{types.DefaultRegistrationID}
}

func (k *knapsack) SetRegistrationIDs(registrationIDs []string) error {
if k.osqRunner == nil {
return nil
}

return k.osqRunner.UpdateRegistrationIDs(registrationIDs)
}

// InstanceStatuses returns the current status of each osquery instance.
// It performs a healthcheck against each existing instance.
func (k *knapsack) InstanceStatuses() map[string]types.InstanceStatus {
if k.querier == nil {
if k.osqRunner == nil {
return nil
}
return k.querier.InstanceStatuses()
return k.osqRunner.InstanceStatuses()
}

// BboltDB interface methods
Expand Down
2 changes: 1 addition & 1 deletion ee/agent/types/knapsack.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Knapsack interface {
Slogger
RegistrationTracker
InstanceQuerier
SetInstanceQuerier(q InstanceQuerier)
SetInstanceRunner(r OsqRunner)
// LatestOsquerydPath finds the path to the latest osqueryd binary, after accounting for updates.
LatestOsquerydPath(ctx context.Context) string
// ReadEnrollSecret returns the enroll secret value, checking in various locations.
Expand Down
26 changes: 22 additions & 4 deletions ee/agent/types/mocks/knapsack.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ee/agent/types/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ const (
// data may be provided by e.g. a control server subsystem.
type RegistrationTracker interface {
RegistrationIDs() []string
SetRegistrationIDs(registrationIDs []string) error
}
13 changes: 13 additions & 0 deletions ee/agent/types/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package types

type (
// RegistrationChangeHandler is implemented by pkg/osquery/runtime/runner.go
RegistrationChangeHandler interface {
UpdateRegistrationIDs(registrationIDs []string) error
}

OsqRunner interface {
RegistrationChangeHandler
InstanceQuerier
}
)
94 changes: 84 additions & 10 deletions pkg/osquery/runtime/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"errors"
"fmt"
"log/slog"
"slices"
"sync"
"sync/atomic"
"time"

"github.com/kolide/launcher/ee/agent/flags/keys"
Expand All @@ -26,8 +28,9 @@ type Runner struct {
knapsack types.Knapsack
serviceClient service.KolideService // shared service client for communication between osquery instance and Kolide SaaS
opts []OsqueryInstanceOption // global options applying to all osquery instances
shutdown chan struct{}
interrupted bool
shutdown chan struct{} // buffered shutdown channel for to enable shutting down to restart or exit
zackattack01 marked this conversation as resolved.
Show resolved Hide resolved
rerunRequired atomic.Bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a quick comment here? // if set, the runner will restart all instances after Shutdown is called, instead of exiting or similar

interrupted atomic.Bool
}

func New(k types.Knapsack, serviceClient service.KolideService, opts ...OsqueryInstanceOption) *Runner {
Expand All @@ -37,8 +40,9 @@ func New(k types.Knapsack, serviceClient service.KolideService, opts ...OsqueryI
slogger: k.Slogger().With("component", "osquery_runner"),
knapsack: k,
serviceClient: serviceClient,
shutdown: make(chan struct{}),
opts: opts,
// the buffer length is arbitrarily set at 100, this number just needs to be higher than the total possible instances
shutdown: make(chan struct{}, 100),
opts: opts,
}

k.RegisterChangeObserver(runner,
Expand All @@ -49,6 +53,31 @@ func New(k types.Knapsack, serviceClient service.KolideService, opts ...OsqueryI
}

func (r *Runner) Run() error {
for {
// if our instances ever exit unexpectedly, return immediately
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment comment isn't accurate? I think runRegisteredInstances only returns if a shutdown was requested. And it only returns an error if a) shutdown was requested and b) we were trying to restart one or more instances during that time and c) hadn't successfully restarted one of them yet.

Either way, why would we return the error here instead of checking rerunRequired first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a good point thank you. we'd probably want to rerun regardless if required, I will update that comment and get this fixed up!

if err := r.runRegisteredInstances(); err != nil {
return err
}

// if we're in a state that required re-running all registered instances,
// reset the field and do that
if r.rerunRequired.Load() {
r.rerunRequired.Store(false)
continue
}

// otherwise, exit cleanly
return nil
}
}

func (r *Runner) runRegisteredInstances() error {
// clear the internal instances to add back in fresh as we runInstance,
// this prevents old instances from sticking around if a registrationID is ever removed
r.instanceLock.Lock()
r.instances = make(map[string]*OsqueryInstance)
r.instanceLock.Unlock()

// Create a group to track the workers running each instance
wg, ctx := errgroup.WithContext(context.Background())

Expand Down Expand Up @@ -186,6 +215,13 @@ func (r *Runner) Query(query string) ([]map[string]string, error) {
}

func (r *Runner) Interrupt(_ error) {
if r.interrupted.Load() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it'd be useful to also do r.rerunRequired.Store(false) here? I'm thinking about the case where UpdateRegistrationIDs is called and then Interrupt is called while the shutdown/restart is ongoing. Could maybe add a test case for this as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohh yeah that seems wise. will do both!

// Already shut down, nothing else to do
return
}

r.interrupted.Store(true)

if err := r.Shutdown(); err != nil {
r.slogger.Log(context.TODO(), slog.LevelWarn,
"could not shut down runner on interrupt",
Expand All @@ -197,13 +233,12 @@ func (r *Runner) Interrupt(_ error) {
// Shutdown instructs the runner to permanently stop the running instance (no
// restart will be attempted).
func (r *Runner) Shutdown() error {
if r.interrupted {
// Already shut down, nothing else to do
return nil
// ensure one shutdown is sent for each instance to read
r.instanceLock.Lock()
for range r.instances {
r.shutdown <- struct{}{}
}
Comment on lines +278 to 280
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about giving each instance it's own shutdown channel to avoid the buffered channel?

Suggested change
for range r.instances {
r.shutdown <- struct{}{}
}
for instance := range r.instances {
close(instance.shutdown)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yeah i'm gonna give this a try, ty! I'm actually seeing some interesting behavior while trying to write a test for Interrupt during restart per Becca's comment here and I wonder if this wouldn't clear things up a little. the behavior I'm seeing in the test is that the interrupt can end up ignored if timed correctly and I think It's related to the buffered channel use so far


r.interrupted = true
close(r.shutdown)
r.instanceLock.Unlock()

if err := r.triggerShutdownForInstances(); err != nil {
return fmt.Errorf("triggering shutdown for instances during runner shutdown: %w", err)
Expand Down Expand Up @@ -333,3 +368,42 @@ func (r *Runner) InstanceStatuses() map[string]types.InstanceStatus {

return instanceStatuses
}

// UpdateRegistrationIDs detects any changes between the new and stored registration IDs,
// and resets the runner instances for the new registrationIDs if required
func (r *Runner) UpdateRegistrationIDs(newRegistrationIDs []string) error {
slices.Sort(newRegistrationIDs)
existingRegistrationIDs := r.registrationIds
slices.Sort(existingRegistrationIDs)

if slices.Equal(newRegistrationIDs, existingRegistrationIDs) {
r.slogger.Log(context.TODO(), slog.LevelDebug,
"skipping runner restarts for updated registration IDs, no changes detected",
)

return nil
}

r.slogger.Log(context.TODO(), slog.LevelDebug,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit -- I think reasonable to log this at the info level (so that we ship this log to the cloud)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh good call, will do!

"detected changes to registrationIDs, will restart runner instances",
"previous_registration_ids", existingRegistrationIDs,
"new_registration_ids", newRegistrationIDs,
)

// we know there are changes, safe to update the internal registrationIDs now
r.registrationIds = newRegistrationIDs
RebeccaMahany marked this conversation as resolved.
Show resolved Hide resolved
// mark rerun as required so that we can safely shutdown all workers and have the changes
// picked back up from within the main Run function
r.rerunRequired.Store(true)

if err := r.Shutdown(); err != nil {
r.slogger.Log(context.TODO(), slog.LevelWarn,
"could not shut down runner instances for restart after registration changes",
"err", err,
)

return err
}

return nil
}
Loading
Loading