Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make osq runner responsive to registration updates #2007

Open
wants to merge 23 commits into
base: main
Choose a base branch
from

Conversation

zackattack01
Copy link
Contributor

@zackattack01 zackattack01 commented Dec 17, 2024

  • reworks interfaces for runner change detection
    • knapsack's querier is currently implemented by our runner, but we need the runner to do more here. This adds an OsqRunner interface and which encompasses the previous InstanceQuerier interface and adds our new RegistrationChangeHandler requirements
    • adds an UpdateRegistrationIDs method to the runner which will detect any changes and restart the instances accordingly

To allow for a graceful restart of all known instances after registration IDs has been updated, a few tricky changes were required that I'd like more eyes on. context:

  • within our Shutdown method, we currently close the shutdown channel to signal our shutdown intent. This works well but isn't compatible with needing to read from that channel again when the runner would stay up (e.g. updated registration ID restarts). resetting the channel did not feel correct and introduces data races due to the way we need to thread/use wait groups here
  • I initially tried reworking things to have alternate shutdown patterns for reloading/restarting vs actually shutting down, and adding a secondary reload channel - but we would still need the shutdown channel to stay open
  • updating to just send on the shutdown channel and leave it open breaks all shutdown functionality, because there is no way to sync the timing required here across multiple instances. send will block until read on the unbuffered channel, but depending on the state of any given instance at the time the shutdown is called, we may not have that select block open (we block on instance.Exited() above that)
  • so, we need r.shutdown to hold that message- the reason this worked as expected with close(r.shutdown) is because that channel will read a zero value and fire immediately whenever it is read next, even if the select wasn't open before the close was called
  • the only way I could think to get around this was to instead make that a buffered channel, and ensure we send one shutdown message per instance to see

If anyone has alternate suggestions please let me know!

@zackattack01 zackattack01 force-pushed the zack/runner_registration_ids branch from a1d2603 to 01e4063 Compare December 19, 2024 17:35
@zackattack01 zackattack01 force-pushed the zack/runner_registration_ids branch from 01e4063 to 2d74749 Compare December 19, 2024 18:27
@@ -49,6 +53,31 @@ func New(k types.Knapsack, serviceClient service.KolideService, opts ...OsqueryI
}

func (r *Runner) Run() error {
for {
// if our instances ever exit unexpectedly, return immediately
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment comment isn't accurate? I think runRegisteredInstances only returns if a shutdown was requested. And it only returns an error if a) shutdown was requested and b) we were trying to restart one or more instances during that time and c) hadn't successfully restarted one of them yet.

Either way, why would we return the error here instead of checking rerunRequired first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a good point thank you. we'd probably want to rerun regardless if required, I will update that comment and get this fixed up!

@zackattack01 zackattack01 marked this pull request as ready for review January 24, 2025 18:47
Copy link
Contributor

@RebeccaMahany RebeccaMahany left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense to me! Really nice.

pkg/osquery/runtime/runner.go Outdated Show resolved Hide resolved
pkg/osquery/runtime/runner.go Show resolved Hide resolved
return nil
}

r.slogger.Log(context.TODO(), slog.LevelDebug,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit -- I think reasonable to log this at the info level (so that we ship this log to the cloud)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh good call, will do!

@@ -205,6 +253,13 @@ func (r *Runner) Query(query string) ([]map[string]string, error) {
}

func (r *Runner) Interrupt(_ error) {
if r.interrupted.Load() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it'd be useful to also do r.rerunRequired.Store(false) here? I'm thinking about the case where UpdateRegistrationIDs is called and then Interrupt is called while the shutdown/restart is ongoing. Could maybe add a test case for this as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohh yeah that seems wise. will do both!

instances map[string]*OsqueryInstance // maps registration ID to currently-running instance
instanceLock sync.Mutex // locks access to `instances` to avoid e.g. restarting an instance that isn't running yet
slogger *slog.Logger
knapsack types.Knapsack
serviceClient service.KolideService // shared service client for communication between osquery instance and Kolide SaaS
settingsWriter settingsStoreWriter // writes to startup settings store
opts []OsqueryInstanceOption // global options applying to all osquery instances
shutdown chan struct{}
shutdown chan struct{} // buffered shutdown channel for to enable shutting down to restart or exit
rerunRequired atomic.Bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a quick comment here? // if set, the runner will restart all instances after Shutdown is called, instead of exiting or similar

Co-authored-by: Rebecca Mahany-Horton <[email protected]>
@@ -27,14 +28,16 @@ type settingsStoreWriter interface {

type Runner struct {
registrationIds []string // we expect to run one instance per registration ID
regIDLock sync.Mutex // locks access to registrationIds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discovered that if we are locking for a single value, there is an atmoic.Value or atmoic.Pointer that can be used, like:

	var registrationIds atomic.Value
	registrationIds.Store([]string{"a", "b", "c"})
	theIds := registrationIds.Load().([]string)

or

	var registrationIds atomic.Pointer[[]string]
	registrationIds.Store(&[]string{"a", "b", "c"})
	theIds := registrationIds.Load()

the internets generally say that atmoics are faster ... but I doubt that it makes any noticeable difference for us, also pointer to an array feels weird and I'm not sure of the implications

feel free to not act on this, I don't know if it's any better

Comment on lines +278 to 280
for range r.instances {
r.shutdown <- struct{}{}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about giving each instance it's own shutdown channel to avoid the buffered channel?

Suggested change
for range r.instances {
r.shutdown <- struct{}{}
}
for instance := range r.instances {
close(instance.shutdown)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yeah i'm gonna give this a try, ty! I'm actually seeing some interesting behavior while trying to write a test for Interrupt during restart per Becca's comment here and I wonder if this wouldn't clear things up a little. the behavior I'm seeing in the test is that the interrupt can end up ignored if timed correctly and I think It's related to the buffered channel use so far

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants