From 0c31928357fa88eb76137e4ca013b9cecbcb52f1 Mon Sep 17 00:00:00 2001 From: zack olson Date: Tue, 17 Dec 2024 16:19:07 -0500 Subject: [PATCH 01/13] rework interfaces for runner change detection --- cmd/launcher/launcher.go | 2 +- ee/agent/knapsack/knapsack.go | 20 ++++++++++++++------ ee/agent/types/knapsack.go | 2 +- ee/agent/types/mocks/knapsack.go | 26 ++++++++++++++++++++++---- ee/agent/types/registration.go | 3 ++- ee/agent/types/runner.go | 13 +++++++++++++ pkg/osquery/runtime/runner.go | 5 +++++ 7 files changed, 58 insertions(+), 13 deletions(-) create mode 100644 ee/agent/types/runner.go diff --git a/cmd/launcher/launcher.go b/cmd/launcher/launcher.go index 6175ee713..36165bbc6 100644 --- a/cmd/launcher/launcher.go +++ b/cmd/launcher/launcher.go @@ -373,7 +373,7 @@ func runLauncher(ctx context.Context, cancel func(), multiSlogger, systemMultiSl osqueryruntime.WithAugeasLensFunction(augeas.InstallLenses), ) runGroup.Add("osqueryRunner", osqueryRunner.Run, osqueryRunner.Interrupt) - k.SetInstanceQuerier(osqueryRunner) + k.SetInstanceRunner(osqueryRunner) versionInfo := version.Version() k.SystemSlogger().Log(ctx, slog.LevelInfo, diff --git a/ee/agent/knapsack/knapsack.go b/ee/agent/knapsack/knapsack.go index f52c63ce1..e28dad5b1 100644 --- a/ee/agent/knapsack/knapsack.go +++ b/ee/agent/knapsack/knapsack.go @@ -39,7 +39,7 @@ type knapsack struct { slogger, systemSlogger *multislogger.MultiSlogger - querier types.InstanceQuerier + osqRunner types.OsqRunner // This struct is a work in progress, and will be iteratively added to as needs arise. } @@ -87,9 +87,9 @@ func (k *knapsack) AddSlogHandler(handler ...slog.Handler) { k.systemSlogger.AddHandler(handler...) } -// Osquery instance querier -func (k *knapsack) SetInstanceQuerier(q types.InstanceQuerier) { - k.querier = q +// Osquery instance runner +func (k *knapsack) SetInstanceRunner(r types.OsqRunner) { + k.osqRunner = r } // RegistrationTracker interface methods @@ -97,13 +97,21 @@ func (k *knapsack) RegistrationIDs() []string { return []string{types.DefaultRegistrationID} } +func (k *knapsack) SetRegistrationIDs(registrationIDs []string) error { + if k.osqRunner == nil { + return nil + } + + return k.osqRunner.UpdateRegistrationIDs(registrationIDs) +} + // InstanceStatuses returns the current status of each osquery instance. // It performs a healthcheck against each existing instance. func (k *knapsack) InstanceStatuses() map[string]types.InstanceStatus { - if k.querier == nil { + if k.osqRunner == nil { return nil } - return k.querier.InstanceStatuses() + return k.osqRunner.InstanceStatuses() } // BboltDB interface methods diff --git a/ee/agent/types/knapsack.go b/ee/agent/types/knapsack.go index 45a441cb1..d00f7c602 100644 --- a/ee/agent/types/knapsack.go +++ b/ee/agent/types/knapsack.go @@ -11,7 +11,7 @@ type Knapsack interface { Slogger RegistrationTracker InstanceQuerier - SetInstanceQuerier(q InstanceQuerier) + SetInstanceRunner(r OsqRunner) // LatestOsquerydPath finds the path to the latest osqueryd binary, after accounting for updates. LatestOsquerydPath(ctx context.Context) string // ReadEnrollSecret returns the enroll secret value, checking in various locations. diff --git a/ee/agent/types/mocks/knapsack.go b/ee/agent/types/mocks/knapsack.go index 58bea406d..bfa7688dd 100644 --- a/ee/agent/types/mocks/knapsack.go +++ b/ee/agent/types/mocks/knapsack.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.45.0. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package mocks @@ -1570,9 +1570,9 @@ func (_m *Knapsack) SetInsecureTransportTLS(insecure bool) error { return r0 } -// SetInstanceQuerier provides a mock function with given fields: q -func (_m *Knapsack) SetInstanceQuerier(q types.InstanceQuerier) { - _m.Called(q) +// SetInstanceRunner provides a mock function with given fields: r +func (_m *Knapsack) SetInstanceRunner(r types.OsqRunner) { + _m.Called(r) } // SetKolideServerURL provides a mock function with given fields: url @@ -1760,6 +1760,24 @@ func (_m *Knapsack) SetPinnedOsquerydVersion(version string) error { return r0 } +// SetRegistrationIDs provides a mock function with given fields: registrationIDs +func (_m *Knapsack) SetRegistrationIDs(registrationIDs []string) error { + ret := _m.Called(registrationIDs) + + if len(ret) == 0 { + panic("no return value specified for SetRegistrationIDs") + } + + var r0 error + if rf, ok := ret.Get(0).(func([]string) error); ok { + r0 = rf(registrationIDs) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // SetSystrayRestartEnabled provides a mock function with given fields: enabled func (_m *Knapsack) SetSystrayRestartEnabled(enabled bool) error { ret := _m.Called(enabled) diff --git a/ee/agent/types/registration.go b/ee/agent/types/registration.go index 979889e6c..0d8a91022 100644 --- a/ee/agent/types/registration.go +++ b/ee/agent/types/registration.go @@ -9,4 +9,5 @@ const ( // data may be provided by e.g. a control server subsystem. type RegistrationTracker interface { RegistrationIDs() []string -} + SetRegistrationIDs(registrationIDs []string) error +} \ No newline at end of file diff --git a/ee/agent/types/runner.go b/ee/agent/types/runner.go new file mode 100644 index 000000000..36768fba6 --- /dev/null +++ b/ee/agent/types/runner.go @@ -0,0 +1,13 @@ +package types + +type ( + // RegistrationChangeHandler is implemented by pkg/osquery/runtime/runner.go + RegistrationChangeHandler interface { + UpdateRegistrationIDs(registrationIDs []string) error + } + + OsqRunner interface { + RegistrationChangeHandler + InstanceQuerier + } +) diff --git a/pkg/osquery/runtime/runner.go b/pkg/osquery/runtime/runner.go index 3f926c46d..fe6493faf 100644 --- a/pkg/osquery/runtime/runner.go +++ b/pkg/osquery/runtime/runner.go @@ -333,3 +333,8 @@ func (r *Runner) InstanceStatuses() map[string]types.InstanceStatus { return instanceStatuses } + +func (r *Runner) UpdateRegistrationIDs(registrationIDs []string) error { + // TODO: detect any difference in reg IDs and shut down/spin up instances accordingly + return nil +} From 81e4209c5164d555b09614d16bd6b0941b0feb27 Mon Sep 17 00:00:00 2001 From: zack olson Date: Tue, 17 Dec 2024 16:36:05 -0500 Subject: [PATCH 02/13] gofmt --- ee/agent/types/registration.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ee/agent/types/registration.go b/ee/agent/types/registration.go index 0d8a91022..1c39ef1f7 100644 --- a/ee/agent/types/registration.go +++ b/ee/agent/types/registration.go @@ -10,4 +10,4 @@ const ( type RegistrationTracker interface { RegistrationIDs() []string SetRegistrationIDs(registrationIDs []string) error -} \ No newline at end of file +} From e028a65dad2f1e467fa4475c29ded81c915647ba Mon Sep 17 00:00:00 2001 From: zack olson Date: Wed, 18 Dec 2024 17:02:10 -0500 Subject: [PATCH 03/13] restarts seem to be working this way --- ee/agent/types/registration.go | 1 + pkg/osquery/runtime/runner.go | 68 ++++++++++++- pkg/osquery/runtime/runtime_test.go | 151 ++++++++++++++++++++++++++++ 3 files changed, 218 insertions(+), 2 deletions(-) diff --git a/ee/agent/types/registration.go b/ee/agent/types/registration.go index 1c39ef1f7..e6563e195 100644 --- a/ee/agent/types/registration.go +++ b/ee/agent/types/registration.go @@ -11,3 +11,4 @@ type RegistrationTracker interface { RegistrationIDs() []string SetRegistrationIDs(registrationIDs []string) error } + \ No newline at end of file diff --git a/pkg/osquery/runtime/runner.go b/pkg/osquery/runtime/runner.go index fe6493faf..424bdd524 100644 --- a/pkg/osquery/runtime/runner.go +++ b/pkg/osquery/runtime/runner.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log/slog" + "slices" "sync" "time" @@ -27,6 +28,7 @@ type Runner struct { serviceClient service.KolideService // shared service client for communication between osquery instance and Kolide SaaS opts []OsqueryInstanceOption // global options applying to all osquery instances shutdown chan struct{} + rerunRequired bool interrupted bool } @@ -38,6 +40,7 @@ func New(k types.Knapsack, serviceClient service.KolideService, opts ...OsqueryI knapsack: k, serviceClient: serviceClient, shutdown: make(chan struct{}), + rerunRequired: false, opts: opts, } @@ -49,6 +52,31 @@ func New(k types.Knapsack, serviceClient service.KolideService, opts ...OsqueryI } func (r *Runner) Run() error { + for { + // if our instances ever exit unexpectedly, return immediately + if err := r.runRegisteredInstances(); err != nil { + return err + } + + // if we're in a state that required re-running all registered instances, + // reset the field and do that + if r.rerunRequired { + r.rerunRequired = false + continue + } + + // otherwise, exit cleanly + return nil + } +} + +func (r *Runner) runRegisteredInstances() error { + // clear the internal instances to add back in fresh as we runInstance, + // this prevents old instances from sticking around if a registrationID is ever removed + r.instanceLock.Lock() + r.instances = make(map[string]*OsqueryInstance) + r.instanceLock.Unlock() + // Create a group to track the workers running each instance wg, ctx := errgroup.WithContext(context.Background()) @@ -334,7 +362,43 @@ func (r *Runner) InstanceStatuses() map[string]types.InstanceStatus { return instanceStatuses } -func (r *Runner) UpdateRegistrationIDs(registrationIDs []string) error { - // TODO: detect any difference in reg IDs and shut down/spin up instances accordingly +func (r *Runner) UpdateRegistrationIDs(newRegistrationIDs []string) error { + slices.Sort(newRegistrationIDs) + existingRegistrationIDs := r.registrationIds + slices.Sort(existingRegistrationIDs) + + if slices.Equal(newRegistrationIDs, existingRegistrationIDs) { + r.slogger.Log(context.TODO(), slog.LevelDebug, + "skipping runner restarts for updated registration IDs, no changes detected", + ) + + return nil + } + + r.slogger.Log(context.TODO(), slog.LevelDebug, + "detected changes to registrationIDs, will restart runner instances", + "previous_registration_ids", existingRegistrationIDs, + "new_registration_ids", newRegistrationIDs, + ) + + // we know there are changes, safe to update the internal registrationIDs now + r.registrationIds = newRegistrationIDs + // mark rerun as required so that we can safely shutdown all workers and have the changes + // picked back up from within the main Run function + r.rerunRequired = true + + if err := r.Shutdown(); err != nil { + r.slogger.Log(context.TODO(), slog.LevelWarn, + "could not shut down runner instances for restart after registration changes", + "err", err, + ) + + return err + } + + // reset the shutdown channel and interrupted state + r.shutdown = make(chan struct{}) + r.interrupted = false + return nil } diff --git a/pkg/osquery/runtime/runtime_test.go b/pkg/osquery/runtime/runtime_test.go index 498cb22c3..8b61d1e46 100644 --- a/pkg/osquery/runtime/runtime_test.go +++ b/pkg/osquery/runtime/runtime_test.go @@ -593,6 +593,157 @@ func TestExtensionIsCleanedUp(t *testing.T) { <-timer1.C } +func TestMultipleInstancesWithUpdatedRegistrationIDs(t *testing.T) { + t.Parallel() + rootDirectory := testRootDirectory(t) + + logBytes, slogger := setUpTestSlogger() + + k := typesMocks.NewKnapsack(t) + k.On("RegistrationIDs").Return([]string{types.DefaultRegistrationID}) + k.On("OsqueryHealthcheckStartupDelay").Return(0 * time.Second).Maybe() + k.On("WatchdogEnabled").Return(false) + k.On("RegisterChangeObserver", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) + k.On("Slogger").Return(slogger) + k.On("LatestOsquerydPath", mock.Anything).Return(testOsqueryBinaryDirectory) + k.On("RootDirectory").Return(rootDirectory).Maybe() + k.On("OsqueryFlags").Return([]string{}) + k.On("OsqueryVerbose").Return(true) + k.On("LoggingInterval").Return(5 * time.Minute).Maybe() + k.On("LogMaxBytesPerBatch").Return(0).Maybe() + k.On("Transport").Return("jsonrpc").Maybe() + k.On("ReadEnrollSecret").Return("", nil).Maybe() + setUpMockStores(t, k) + serviceClient := mockServiceClient() + + runner := New(k, serviceClient) + + // Start the instance + go runner.Run() + waitHealthy(t, runner, logBytes) + + // Confirm the default instance was started + require.Contains(t, runner.instances, types.DefaultRegistrationID) + require.NotNil(t, runner.instances[types.DefaultRegistrationID].stats) + require.NotEmpty(t, runner.instances[types.DefaultRegistrationID].stats.StartTime, "start time should be added to default instance stats on start up") + require.NotEmpty(t, runner.instances[types.DefaultRegistrationID].stats.ConnectTime, "connect time should be added to default instance stats on start up") + + // confirm only the default instance has started + require.Equal(t, 1, len(runner.instances)) + + // Confirm instance statuses are reported correctly + instanceStatuses := runner.InstanceStatuses() + require.Contains(t, instanceStatuses, types.DefaultRegistrationID) + require.Equal(t, instanceStatuses[types.DefaultRegistrationID], types.InstanceStatusHealthy) + + // Add in an extra instance + extraRegistrationId := ulid.New() + runner.UpdateRegistrationIDs([]string{types.DefaultRegistrationID, extraRegistrationId}) + waitHealthy(t, runner, logBytes) + updatedInstanceStatuses := runner.InstanceStatuses() + // verify that rerunRequired has been reset for any future changes + require.False(t, runner.rerunRequired) + // now verify both instances are reported + require.Equal(t, 2, len(runner.instances)) + require.Contains(t, updatedInstanceStatuses, types.DefaultRegistrationID) + require.Contains(t, updatedInstanceStatuses, extraRegistrationId) + // Confirm the additional instance was started and is healthy + require.NotNil(t, runner.instances[extraRegistrationId].stats) + require.NotEmpty(t, runner.instances[extraRegistrationId].stats.StartTime, "start time should be added to secondary instance stats on start up") + require.NotEmpty(t, runner.instances[extraRegistrationId].stats.ConnectTime, "connect time should be added to secondary instance stats on start up") + require.Equal(t, updatedInstanceStatuses[extraRegistrationId], types.InstanceStatusHealthy) + + // update registration IDs one more time, this time removing the additional registration + originalDefaultInstanceStartTime := runner.instances[extraRegistrationId].stats.StartTime + runner.UpdateRegistrationIDs([]string{types.DefaultRegistrationID}) + waitHealthy(t, runner, logBytes) + + // now verify only the default instance remains + require.Equal(t, 1, len(runner.instances)) + // Confirm the default instance was started and is healthy + require.Contains(t, runner.instances, types.DefaultRegistrationID) + require.NotNil(t, runner.instances[types.DefaultRegistrationID].stats) + require.NotEmpty(t, runner.instances[types.DefaultRegistrationID].stats.StartTime, "start time should be added to default instance stats on start up") + require.NotEmpty(t, runner.instances[types.DefaultRegistrationID].stats.ConnectTime, "connect time should be added to default instance stats on start up") + // verify that rerunRequired has been reset for any future changes + require.False(t, runner.rerunRequired) + // verify the default instance was restarted + require.NotEqual(t, originalDefaultInstanceStartTime, runner.instances[types.DefaultRegistrationID].stats.StartTime) + + waitShutdown(t, runner, logBytes) + + // Confirm both instances exited + require.Contains(t, runner.instances, types.DefaultRegistrationID) + require.NotNil(t, runner.instances[types.DefaultRegistrationID].stats) + require.NotEmpty(t, runner.instances[types.DefaultRegistrationID].stats.ExitTime, "exit time should be added to default instance stats on shutdown") +} + +func TestUpdatingRegistrationIDsOnlyRestartsForChanges(t *testing.T) { + t.Parallel() + rootDirectory := testRootDirectory(t) + + logBytes, slogger := setUpTestSlogger() + extraRegistrationId := ulid.New() + + k := typesMocks.NewKnapsack(t) + k.On("RegistrationIDs").Return([]string{types.DefaultRegistrationID, extraRegistrationId}) + k.On("OsqueryHealthcheckStartupDelay").Return(0 * time.Second).Maybe() + k.On("WatchdogEnabled").Return(false) + k.On("RegisterChangeObserver", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) + k.On("Slogger").Return(slogger) + k.On("LatestOsquerydPath", mock.Anything).Return(testOsqueryBinaryDirectory) + k.On("RootDirectory").Return(rootDirectory).Maybe() + k.On("OsqueryFlags").Return([]string{}) + k.On("OsqueryVerbose").Return(true) + k.On("LoggingInterval").Return(5 * time.Minute).Maybe() + k.On("LogMaxBytesPerBatch").Return(0).Maybe() + k.On("Transport").Return("jsonrpc").Maybe() + k.On("ReadEnrollSecret").Return("", nil).Maybe() + setUpMockStores(t, k) + serviceClient := mockServiceClient() + + runner := New(k, serviceClient) + + // Start the instance + go runner.Run() + waitHealthy(t, runner, logBytes) + + require.Equal(t, 2, len(runner.instances)) + // Confirm the default instance was started + require.Contains(t, runner.instances, types.DefaultRegistrationID) + require.NotNil(t, runner.instances[types.DefaultRegistrationID].stats) + require.NotEmpty(t, runner.instances[types.DefaultRegistrationID].stats.StartTime, "start time should be added to default instance stats on start up") + require.NotEmpty(t, runner.instances[types.DefaultRegistrationID].stats.ConnectTime, "connect time should be added to default instance stats on start up") + // note the original start time + defaultInstanceStartTime := runner.instances[types.DefaultRegistrationID].stats.StartTime + + // Confirm the extra instance was started + require.Contains(t, runner.instances, extraRegistrationId) + require.NotNil(t, runner.instances[extraRegistrationId].stats) + require.NotEmpty(t, runner.instances[extraRegistrationId].stats.StartTime, "start time should be added to extra instance stats on start up") + require.NotEmpty(t, runner.instances[extraRegistrationId].stats.ConnectTime, "connect time should be added to extra instance stats on start up") + // note the original start time + extraInstanceStartTime := runner.instances[extraRegistrationId].stats.StartTime + + // rerun with identical registrationIDs in swapped order and verify that the instances are not restarted + runner.UpdateRegistrationIDs([]string{extraRegistrationId, types.DefaultRegistrationID}) + waitHealthy(t, runner, logBytes) + + require.Equal(t, 2, len(runner.instances)) + require.Equal(t, extraInstanceStartTime, runner.instances[extraRegistrationId].stats.StartTime) + require.Equal(t, defaultInstanceStartTime, runner.instances[types.DefaultRegistrationID].stats.StartTime) + + waitShutdown(t, runner, logBytes) + + // Confirm both instances exited + require.Contains(t, runner.instances, types.DefaultRegistrationID) + require.NotNil(t, runner.instances[types.DefaultRegistrationID].stats) + require.NotEmpty(t, runner.instances[types.DefaultRegistrationID].stats.ExitTime, "exit time should be added to default instance stats on shutdown") + require.Contains(t, runner.instances, extraRegistrationId) + require.NotNil(t, runner.instances[extraRegistrationId].stats) + require.NotEmpty(t, runner.instances[extraRegistrationId].stats.ExitTime, "exit time should be added to secondary instance stats on shutdown") +} + // sets up an osquery instance with a running extension to be used in tests. func setupOsqueryInstanceForTests(t *testing.T) (runner *Runner, logBytes *threadsafebuffer.ThreadSafeBuffer, teardown func()) { rootDirectory := testRootDirectory(t) From b9e19181812046c9311bf31329261e3c396e4c29 Mon Sep 17 00:00:00 2001 From: zack olson Date: Thu, 19 Dec 2024 12:35:13 -0500 Subject: [PATCH 04/13] shift to using buffered shutdown channel, rework --- ee/agent/types/registration.go | 1 - pkg/osquery/runtime/runner.go | 41 ++++++++++++++++------------- pkg/osquery/runtime/runtime_test.go | 13 +++++---- 3 files changed, 30 insertions(+), 25 deletions(-) diff --git a/ee/agent/types/registration.go b/ee/agent/types/registration.go index e6563e195..1c39ef1f7 100644 --- a/ee/agent/types/registration.go +++ b/ee/agent/types/registration.go @@ -11,4 +11,3 @@ type RegistrationTracker interface { RegistrationIDs() []string SetRegistrationIDs(registrationIDs []string) error } - \ No newline at end of file diff --git a/pkg/osquery/runtime/runner.go b/pkg/osquery/runtime/runner.go index 424bdd524..94b3c996c 100644 --- a/pkg/osquery/runtime/runner.go +++ b/pkg/osquery/runtime/runner.go @@ -7,6 +7,7 @@ import ( "log/slog" "slices" "sync" + "sync/atomic" "time" "github.com/kolide/launcher/ee/agent/flags/keys" @@ -27,9 +28,9 @@ type Runner struct { knapsack types.Knapsack serviceClient service.KolideService // shared service client for communication between osquery instance and Kolide SaaS opts []OsqueryInstanceOption // global options applying to all osquery instances - shutdown chan struct{} - rerunRequired bool - interrupted bool + shutdown chan struct{} // buffered shutdown channel for to enable shutting down to restart or exit + rerunRequired atomic.Bool + interrupted atomic.Bool } func New(k types.Knapsack, serviceClient service.KolideService, opts ...OsqueryInstanceOption) *Runner { @@ -39,9 +40,9 @@ func New(k types.Knapsack, serviceClient service.KolideService, opts ...OsqueryI slogger: k.Slogger().With("component", "osquery_runner"), knapsack: k, serviceClient: serviceClient, - shutdown: make(chan struct{}), - rerunRequired: false, - opts: opts, + // the buffer length is arbitrarily set at 100, this number just needs to be higher than the total possible instances + shutdown: make(chan struct{}, 100), + opts: opts, } k.RegisterChangeObserver(runner, @@ -60,8 +61,8 @@ func (r *Runner) Run() error { // if we're in a state that required re-running all registered instances, // reset the field and do that - if r.rerunRequired { - r.rerunRequired = false + if r.rerunRequired.Load() { + r.rerunRequired.Store(false) continue } @@ -214,6 +215,13 @@ func (r *Runner) Query(query string) ([]map[string]string, error) { } func (r *Runner) Interrupt(_ error) { + if r.interrupted.Load() { + // Already shut down, nothing else to do + return + } + + r.interrupted.Store(true) + if err := r.Shutdown(); err != nil { r.slogger.Log(context.TODO(), slog.LevelWarn, "could not shut down runner on interrupt", @@ -225,13 +233,12 @@ func (r *Runner) Interrupt(_ error) { // Shutdown instructs the runner to permanently stop the running instance (no // restart will be attempted). func (r *Runner) Shutdown() error { - if r.interrupted { - // Already shut down, nothing else to do - return nil + // ensure one shutdown is sent for each instance to read + r.instanceLock.Lock() + for range r.instances { + r.shutdown <- struct{}{} } - - r.interrupted = true - close(r.shutdown) + r.instanceLock.Unlock() if err := r.triggerShutdownForInstances(); err != nil { return fmt.Errorf("triggering shutdown for instances during runner shutdown: %w", err) @@ -385,7 +392,7 @@ func (r *Runner) UpdateRegistrationIDs(newRegistrationIDs []string) error { r.registrationIds = newRegistrationIDs // mark rerun as required so that we can safely shutdown all workers and have the changes // picked back up from within the main Run function - r.rerunRequired = true + r.rerunRequired.Store(true) if err := r.Shutdown(); err != nil { r.slogger.Log(context.TODO(), slog.LevelWarn, @@ -396,9 +403,5 @@ func (r *Runner) UpdateRegistrationIDs(newRegistrationIDs []string) error { return err } - // reset the shutdown channel and interrupted state - r.shutdown = make(chan struct{}) - r.interrupted = false - return nil } diff --git a/pkg/osquery/runtime/runtime_test.go b/pkg/osquery/runtime/runtime_test.go index 8b61d1e46..eb8a8fa17 100644 --- a/pkg/osquery/runtime/runtime_test.go +++ b/pkg/osquery/runtime/runtime_test.go @@ -638,11 +638,12 @@ func TestMultipleInstancesWithUpdatedRegistrationIDs(t *testing.T) { // Add in an extra instance extraRegistrationId := ulid.New() - runner.UpdateRegistrationIDs([]string{types.DefaultRegistrationID, extraRegistrationId}) + updateErr := runner.UpdateRegistrationIDs([]string{types.DefaultRegistrationID, extraRegistrationId}) + require.NoError(t, updateErr) waitHealthy(t, runner, logBytes) updatedInstanceStatuses := runner.InstanceStatuses() // verify that rerunRequired has been reset for any future changes - require.False(t, runner.rerunRequired) + require.False(t, runner.rerunRequired.Load()) // now verify both instances are reported require.Equal(t, 2, len(runner.instances)) require.Contains(t, updatedInstanceStatuses, types.DefaultRegistrationID) @@ -655,7 +656,8 @@ func TestMultipleInstancesWithUpdatedRegistrationIDs(t *testing.T) { // update registration IDs one more time, this time removing the additional registration originalDefaultInstanceStartTime := runner.instances[extraRegistrationId].stats.StartTime - runner.UpdateRegistrationIDs([]string{types.DefaultRegistrationID}) + updateErr = runner.UpdateRegistrationIDs([]string{types.DefaultRegistrationID}) + require.NoError(t, updateErr) waitHealthy(t, runner, logBytes) // now verify only the default instance remains @@ -666,7 +668,7 @@ func TestMultipleInstancesWithUpdatedRegistrationIDs(t *testing.T) { require.NotEmpty(t, runner.instances[types.DefaultRegistrationID].stats.StartTime, "start time should be added to default instance stats on start up") require.NotEmpty(t, runner.instances[types.DefaultRegistrationID].stats.ConnectTime, "connect time should be added to default instance stats on start up") // verify that rerunRequired has been reset for any future changes - require.False(t, runner.rerunRequired) + require.False(t, runner.rerunRequired.Load()) // verify the default instance was restarted require.NotEqual(t, originalDefaultInstanceStartTime, runner.instances[types.DefaultRegistrationID].stats.StartTime) @@ -726,7 +728,8 @@ func TestUpdatingRegistrationIDsOnlyRestartsForChanges(t *testing.T) { extraInstanceStartTime := runner.instances[extraRegistrationId].stats.StartTime // rerun with identical registrationIDs in swapped order and verify that the instances are not restarted - runner.UpdateRegistrationIDs([]string{extraRegistrationId, types.DefaultRegistrationID}) + updateErr := runner.UpdateRegistrationIDs([]string{extraRegistrationId, types.DefaultRegistrationID}) + require.NoError(t, updateErr) waitHealthy(t, runner, logBytes) require.Equal(t, 2, len(runner.instances)) From 2d74749d97b6e8a3993df70d2e48cd84fb8d98b1 Mon Sep 17 00:00:00 2001 From: zack olson Date: Thu, 19 Dec 2024 13:26:22 -0500 Subject: [PATCH 05/13] cleanup and add comment --- pkg/osquery/runtime/runner.go | 2 ++ pkg/osquery/runtime/runtime_test.go | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/osquery/runtime/runner.go b/pkg/osquery/runtime/runner.go index 94b3c996c..6f0250794 100644 --- a/pkg/osquery/runtime/runner.go +++ b/pkg/osquery/runtime/runner.go @@ -369,6 +369,8 @@ func (r *Runner) InstanceStatuses() map[string]types.InstanceStatus { return instanceStatuses } +// UpdateRegistrationIDs detects any changes between the new and stored registration IDs, +// and resets the runner instances for the new registrationIDs if required func (r *Runner) UpdateRegistrationIDs(newRegistrationIDs []string) error { slices.Sort(newRegistrationIDs) existingRegistrationIDs := r.registrationIds diff --git a/pkg/osquery/runtime/runtime_test.go b/pkg/osquery/runtime/runtime_test.go index eb8a8fa17..5b764c94e 100644 --- a/pkg/osquery/runtime/runtime_test.go +++ b/pkg/osquery/runtime/runtime_test.go @@ -674,7 +674,7 @@ func TestMultipleInstancesWithUpdatedRegistrationIDs(t *testing.T) { waitShutdown(t, runner, logBytes) - // Confirm both instances exited + // Confirm instance exited require.Contains(t, runner.instances, types.DefaultRegistrationID) require.NotNil(t, runner.instances[types.DefaultRegistrationID].stats) require.NotEmpty(t, runner.instances[types.DefaultRegistrationID].stats.ExitTime, "exit time should be added to default instance stats on shutdown") From db0b4344a1518c0720f58c74b34b66cba0604113 Mon Sep 17 00:00:00 2001 From: zack olson Date: Fri, 20 Dec 2024 10:53:54 -0500 Subject: [PATCH 06/13] don't parallel test for runtime --- pkg/osquery/runtime/runtime_test.go | 37 ++++++++++------------------- 1 file changed, 12 insertions(+), 25 deletions(-) diff --git a/pkg/osquery/runtime/runtime_test.go b/pkg/osquery/runtime/runtime_test.go index 5b764c94e..b06ab9482 100644 --- a/pkg/osquery/runtime/runtime_test.go +++ b/pkg/osquery/runtime/runtime_test.go @@ -114,8 +114,7 @@ func downloadOsqueryInBinDir(binDirectory string) error { return nil } -func TestBadBinaryPath(t *testing.T) { - t.Parallel() +func TestBadBinaryPath(t *testing.T) { //nolint:paralleltest rootDirectory := t.TempDir() logBytes, slogger := setUpTestSlogger() @@ -151,8 +150,7 @@ func TestBadBinaryPath(t *testing.T) { k.AssertExpectations(t) } -func TestWithOsqueryFlags(t *testing.T) { - t.Parallel() +func TestWithOsqueryFlags(t *testing.T) { //nolint:paralleltest rootDirectory := testRootDirectory(t) logBytes, slogger := setUpTestSlogger() @@ -179,9 +177,7 @@ func TestWithOsqueryFlags(t *testing.T) { waitShutdown(t, runner, logBytes) } -func TestFlagsChanged(t *testing.T) { - t.Parallel() - +func TestFlagsChanged(t *testing.T) { //nolint:paralleltest rootDirectory := testRootDirectory(t) logBytes, slogger := setUpTestSlogger() @@ -316,8 +312,7 @@ func waitHealthy(t *testing.T, runner *Runner, logBytes *threadsafebuffer.Thread time.Sleep(2 * time.Second) } -func TestSimplePath(t *testing.T) { - t.Parallel() +func TestSimplePath(t *testing.T) { //nolint:paralleltest rootDirectory := testRootDirectory(t) logBytes, slogger := setUpTestSlogger() @@ -349,8 +344,7 @@ func TestSimplePath(t *testing.T) { waitShutdown(t, runner, logBytes) } -func TestMultipleInstances(t *testing.T) { - t.Parallel() +func TestMultipleInstances(t *testing.T) { //nolint:paralleltest rootDirectory := testRootDirectory(t) logBytes, slogger := setUpTestSlogger() @@ -411,8 +405,7 @@ func TestMultipleInstances(t *testing.T) { require.NotEmpty(t, runner.instances[extraRegistrationId].stats.ExitTime, "exit time should be added to secondary instance stats on shutdown") } -func TestRunnerHandlesImmediateShutdownWithMultipleInstances(t *testing.T) { - t.Parallel() +func TestRunnerHandlesImmediateShutdownWithMultipleInstances(t *testing.T) { //nolint:paralleltest rootDirectory := testRootDirectory(t) logBytes, slogger := setUpTestSlogger() @@ -462,8 +455,7 @@ func TestRunnerHandlesImmediateShutdownWithMultipleInstances(t *testing.T) { require.NotEmpty(t, runner.instances[extraRegistrationId].stats.ExitTime, "exit time should be added to secondary instance stats on shutdown") } -func TestMultipleShutdowns(t *testing.T) { - t.Parallel() +func TestMultipleShutdowns(t *testing.T) { //nolint:paralleltest rootDirectory := testRootDirectory(t) logBytes, slogger := setUpTestSlogger() @@ -494,8 +486,7 @@ func TestMultipleShutdowns(t *testing.T) { } } -func TestOsqueryDies(t *testing.T) { - t.Parallel() +func TestOsqueryDies(t *testing.T) { //nolint:paralleltest rootDirectory := testRootDirectory(t) logBytes, slogger := setUpTestSlogger() @@ -538,8 +529,7 @@ func TestOsqueryDies(t *testing.T) { waitShutdown(t, runner, logBytes) } -func TestNotStarted(t *testing.T) { - t.Parallel() +func TestNotStarted(t *testing.T) { //nolint:paralleltest rootDirectory := t.TempDir() k := typesMocks.NewKnapsack(t) @@ -566,9 +556,8 @@ func WithStartFunc(f func(cmd *exec.Cmd) error) OsqueryInstanceOption { // TestExtensionIsCleanedUp tests that the osquery extension cleans // itself up. Unfortunately, this test has proved very flakey on // circle-ci, but just fine on laptops. -func TestExtensionIsCleanedUp(t *testing.T) { +func TestExtensionIsCleanedUp(t *testing.T) { //nolint:paralleltest t.Skip("https://github.com/kolide/launcher/issues/478") - t.Parallel() runner, logBytes, teardown := setupOsqueryInstanceForTests(t) defer teardown() @@ -593,8 +582,7 @@ func TestExtensionIsCleanedUp(t *testing.T) { <-timer1.C } -func TestMultipleInstancesWithUpdatedRegistrationIDs(t *testing.T) { - t.Parallel() +func TestMultipleInstancesWithUpdatedRegistrationIDs(t *testing.T) { //nolint:paralleltest rootDirectory := testRootDirectory(t) logBytes, slogger := setUpTestSlogger() @@ -680,8 +668,7 @@ func TestMultipleInstancesWithUpdatedRegistrationIDs(t *testing.T) { require.NotEmpty(t, runner.instances[types.DefaultRegistrationID].stats.ExitTime, "exit time should be added to default instance stats on shutdown") } -func TestUpdatingRegistrationIDsOnlyRestartsForChanges(t *testing.T) { - t.Parallel() +func TestUpdatingRegistrationIDsOnlyRestartsForChanges(t *testing.T) { //nolint:paralleltest rootDirectory := testRootDirectory(t) logBytes, slogger := setUpTestSlogger() From 81d84d91fdd384f8894a6e75ee6c552a0e8cc0ec Mon Sep 17 00:00:00 2001 From: zack olson Date: Fri, 20 Dec 2024 12:33:41 -0500 Subject: [PATCH 07/13] PR feedback: add registrationID lock, and fix error handling flow when reload is required --- pkg/osquery/runtime/runner.go | 36 +++++++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/pkg/osquery/runtime/runner.go b/pkg/osquery/runtime/runner.go index 6f0250794..d304e44a2 100644 --- a/pkg/osquery/runtime/runner.go +++ b/pkg/osquery/runtime/runner.go @@ -22,6 +22,7 @@ const ( type Runner struct { registrationIds []string // we expect to run one instance per registration ID + regIDLock sync.Mutex // locks access to registrationIds instances map[string]*OsqueryInstance // maps registration ID to currently-running instance instanceLock sync.Mutex // locks access to `instances` to avoid e.g. restarting an instance that isn't running yet slogger *slog.Logger @@ -54,9 +55,13 @@ func New(k types.Knapsack, serviceClient service.KolideService, opts ...OsqueryI func (r *Runner) Run() error { for { - // if our instances ever exit unexpectedly, return immediately - if err := r.runRegisteredInstances(); err != nil { - return err + err := r.runRegisteredInstances() + if err != nil { + // log any errors but continue, in case we intend to reload + r.slogger.Log(context.TODO(), slog.LevelWarn, + "runRegisteredInstances terminated with error", + "err", err, + ) } // if we're in a state that required re-running all registered instances, @@ -66,8 +71,7 @@ func (r *Runner) Run() error { continue } - // otherwise, exit cleanly - return nil + return err } } @@ -82,7 +86,11 @@ func (r *Runner) runRegisteredInstances() error { wg, ctx := errgroup.WithContext(context.Background()) // Start each worker for each instance - for _, registrationId := range r.registrationIds { + r.regIDLock.Lock() + regIDs := r.registrationIds + r.regIDLock.Unlock() + + for _, registrationId := range regIDs { id := registrationId wg.Go(func() error { if err := r.runInstance(id); err != nil { @@ -327,7 +335,11 @@ func (r *Runner) Healthy() error { defer r.instanceLock.Unlock() healthcheckErrs := make([]error, 0) - for _, registrationId := range r.registrationIds { + r.regIDLock.Lock() + regIDs := r.registrationIds + r.regIDLock.Unlock() + + for _, registrationId := range regIDs { instance, ok := r.instances[registrationId] if !ok { healthcheckErrs = append(healthcheckErrs, fmt.Errorf("running instance does not exist for %s", registrationId)) @@ -350,8 +362,11 @@ func (r *Runner) InstanceStatuses() map[string]types.InstanceStatus { r.instanceLock.Lock() defer r.instanceLock.Unlock() + r.regIDLock.Lock() + regIDs := r.registrationIds + r.regIDLock.Unlock() instanceStatuses := make(map[string]types.InstanceStatus) - for _, registrationId := range r.registrationIds { + for _, registrationId := range regIDs { instance, ok := r.instances[registrationId] if !ok { instanceStatuses[registrationId] = types.InstanceStatusNotStarted @@ -373,7 +388,10 @@ func (r *Runner) InstanceStatuses() map[string]types.InstanceStatus { // and resets the runner instances for the new registrationIDs if required func (r *Runner) UpdateRegistrationIDs(newRegistrationIDs []string) error { slices.Sort(newRegistrationIDs) + + r.regIDLock.Lock() existingRegistrationIDs := r.registrationIds + r.regIDLock.Unlock() slices.Sort(existingRegistrationIDs) if slices.Equal(newRegistrationIDs, existingRegistrationIDs) { @@ -391,7 +409,9 @@ func (r *Runner) UpdateRegistrationIDs(newRegistrationIDs []string) error { ) // we know there are changes, safe to update the internal registrationIDs now + r.regIDLock.Lock() r.registrationIds = newRegistrationIDs + r.regIDLock.Unlock() // mark rerun as required so that we can safely shutdown all workers and have the changes // picked back up from within the main Run function r.rerunRequired.Store(true) From c20b160bc3dba0a768d952a52efcdfc438ab4fbe Mon Sep 17 00:00:00 2001 From: zack olson Date: Fri, 20 Dec 2024 12:53:39 -0500 Subject: [PATCH 08/13] put tests back to parallel --- pkg/osquery/runtime/runtime_test.go | 36 +++++++++++++++++++---------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/pkg/osquery/runtime/runtime_test.go b/pkg/osquery/runtime/runtime_test.go index b06ab9482..a526cbe90 100644 --- a/pkg/osquery/runtime/runtime_test.go +++ b/pkg/osquery/runtime/runtime_test.go @@ -114,7 +114,8 @@ func downloadOsqueryInBinDir(binDirectory string) error { return nil } -func TestBadBinaryPath(t *testing.T) { //nolint:paralleltest +func TestBadBinaryPath(t *testing.T) { + t.Parallel() rootDirectory := t.TempDir() logBytes, slogger := setUpTestSlogger() @@ -150,7 +151,8 @@ func TestBadBinaryPath(t *testing.T) { //nolint:paralleltest k.AssertExpectations(t) } -func TestWithOsqueryFlags(t *testing.T) { //nolint:paralleltest +func TestWithOsqueryFlags(t *testing.T) { + t.Parallel() rootDirectory := testRootDirectory(t) logBytes, slogger := setUpTestSlogger() @@ -177,7 +179,8 @@ func TestWithOsqueryFlags(t *testing.T) { //nolint:paralleltest waitShutdown(t, runner, logBytes) } -func TestFlagsChanged(t *testing.T) { //nolint:paralleltest +func TestFlagsChanged(t *testing.T) { + t.Parallel() rootDirectory := testRootDirectory(t) logBytes, slogger := setUpTestSlogger() @@ -312,7 +315,8 @@ func waitHealthy(t *testing.T, runner *Runner, logBytes *threadsafebuffer.Thread time.Sleep(2 * time.Second) } -func TestSimplePath(t *testing.T) { //nolint:paralleltest +func TestSimplePath(t *testing.T) { + t.Parallel() rootDirectory := testRootDirectory(t) logBytes, slogger := setUpTestSlogger() @@ -344,7 +348,8 @@ func TestSimplePath(t *testing.T) { //nolint:paralleltest waitShutdown(t, runner, logBytes) } -func TestMultipleInstances(t *testing.T) { //nolint:paralleltest +func TestMultipleInstances(t *testing.T) { + t.Parallel() rootDirectory := testRootDirectory(t) logBytes, slogger := setUpTestSlogger() @@ -405,7 +410,8 @@ func TestMultipleInstances(t *testing.T) { //nolint:paralleltest require.NotEmpty(t, runner.instances[extraRegistrationId].stats.ExitTime, "exit time should be added to secondary instance stats on shutdown") } -func TestRunnerHandlesImmediateShutdownWithMultipleInstances(t *testing.T) { //nolint:paralleltest +func TestRunnerHandlesImmediateShutdownWithMultipleInstances(t *testing.T) { + t.Parallel() rootDirectory := testRootDirectory(t) logBytes, slogger := setUpTestSlogger() @@ -455,7 +461,8 @@ func TestRunnerHandlesImmediateShutdownWithMultipleInstances(t *testing.T) { //n require.NotEmpty(t, runner.instances[extraRegistrationId].stats.ExitTime, "exit time should be added to secondary instance stats on shutdown") } -func TestMultipleShutdowns(t *testing.T) { //nolint:paralleltest +func TestMultipleShutdowns(t *testing.T) { + t.Parallel() rootDirectory := testRootDirectory(t) logBytes, slogger := setUpTestSlogger() @@ -486,7 +493,8 @@ func TestMultipleShutdowns(t *testing.T) { //nolint:paralleltest } } -func TestOsqueryDies(t *testing.T) { //nolint:paralleltest +func TestOsqueryDies(t *testing.T) { + t.Parallel() rootDirectory := testRootDirectory(t) logBytes, slogger := setUpTestSlogger() @@ -529,7 +537,8 @@ func TestOsqueryDies(t *testing.T) { //nolint:paralleltest waitShutdown(t, runner, logBytes) } -func TestNotStarted(t *testing.T) { //nolint:paralleltest +func TestNotStarted(t *testing.T) { + t.Parallel() rootDirectory := t.TempDir() k := typesMocks.NewKnapsack(t) @@ -556,8 +565,9 @@ func WithStartFunc(f func(cmd *exec.Cmd) error) OsqueryInstanceOption { // TestExtensionIsCleanedUp tests that the osquery extension cleans // itself up. Unfortunately, this test has proved very flakey on // circle-ci, but just fine on laptops. -func TestExtensionIsCleanedUp(t *testing.T) { //nolint:paralleltest +func TestExtensionIsCleanedUp(t *testing.T) { t.Skip("https://github.com/kolide/launcher/issues/478") + t.Parallel() runner, logBytes, teardown := setupOsqueryInstanceForTests(t) defer teardown() @@ -582,7 +592,8 @@ func TestExtensionIsCleanedUp(t *testing.T) { //nolint:paralleltest <-timer1.C } -func TestMultipleInstancesWithUpdatedRegistrationIDs(t *testing.T) { //nolint:paralleltest +func TestMultipleInstancesWithUpdatedRegistrationIDs(t *testing.T) { + t.Parallel() rootDirectory := testRootDirectory(t) logBytes, slogger := setUpTestSlogger() @@ -668,7 +679,8 @@ func TestMultipleInstancesWithUpdatedRegistrationIDs(t *testing.T) { //nolint:pa require.NotEmpty(t, runner.instances[types.DefaultRegistrationID].stats.ExitTime, "exit time should be added to default instance stats on shutdown") } -func TestUpdatingRegistrationIDsOnlyRestartsForChanges(t *testing.T) { //nolint:paralleltest +func TestUpdatingRegistrationIDsOnlyRestartsForChanges(t *testing.T) { + t.Parallel() rootDirectory := testRootDirectory(t) logBytes, slogger := setUpTestSlogger() From c20d34afc0a8acc1aa40ba1fce7e92456f1ed487 Mon Sep 17 00:00:00 2001 From: zackattack01 Date: Mon, 23 Dec 2024 16:32:49 -0500 Subject: [PATCH 09/13] update var names from rebase --- pkg/osquery/runtime/runtime_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/osquery/runtime/runtime_test.go b/pkg/osquery/runtime/runtime_test.go index bfe7bd315..8ed38e70c 100644 --- a/pkg/osquery/runtime/runtime_test.go +++ b/pkg/osquery/runtime/runtime_test.go @@ -611,7 +611,7 @@ func TestMultipleInstancesWithUpdatedRegistrationIDs(t *testing.T) { k.On("WatchdogEnabled").Return(false) k.On("RegisterChangeObserver", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) k.On("Slogger").Return(slogger) - k.On("LatestOsquerydPath", mock.Anything).Return(testOsqueryBinaryDirectory) + k.On("LatestOsquerydPath", mock.Anything).Return(testOsqueryBinary) k.On("RootDirectory").Return(rootDirectory).Maybe() k.On("OsqueryFlags").Return([]string{}) k.On("OsqueryVerbose").Return(true) @@ -699,7 +699,7 @@ func TestUpdatingRegistrationIDsOnlyRestartsForChanges(t *testing.T) { k.On("WatchdogEnabled").Return(false) k.On("RegisterChangeObserver", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) k.On("Slogger").Return(slogger) - k.On("LatestOsquerydPath", mock.Anything).Return(testOsqueryBinaryDirectory) + k.On("LatestOsquerydPath", mock.Anything).Return(testOsqueryBinary) k.On("RootDirectory").Return(rootDirectory).Maybe() k.On("OsqueryFlags").Return([]string{}) k.On("OsqueryVerbose").Return(true) From b6029da268eb7e7a1f9c00eb4802323f56eb06b5 Mon Sep 17 00:00:00 2001 From: zack olson Date: Wed, 15 Jan 2025 11:23:58 -0500 Subject: [PATCH 10/13] update tests for new mocking and cleanup patterns --- pkg/osquery/runtime/runtime_test.go | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/pkg/osquery/runtime/runtime_test.go b/pkg/osquery/runtime/runtime_test.go index cbc5dafe4..18e6f43d2 100644 --- a/pkg/osquery/runtime/runtime_test.go +++ b/pkg/osquery/runtime/runtime_test.go @@ -778,10 +778,18 @@ func TestMultipleInstancesWithUpdatedRegistrationIDs(t *testing.T) { k.On("LogMaxBytesPerBatch").Return(0).Maybe() k.On("Transport").Return("jsonrpc").Maybe() k.On("ReadEnrollSecret").Return("", nil).Maybe() + k.On("InModernStandby").Return(false).Maybe() + k.On("RegisterChangeObserver", mock.Anything, keys.UpdateChannel).Maybe() + k.On("RegisterChangeObserver", mock.Anything, keys.PinnedLauncherVersion).Maybe() + k.On("RegisterChangeObserver", mock.Anything, keys.PinnedOsquerydVersion).Maybe() + k.On("UpdateChannel").Return("stable").Maybe() + k.On("PinnedLauncherVersion").Return("").Maybe() + k.On("PinnedOsquerydVersion").Return("").Maybe() setUpMockStores(t, k) - serviceClient := mockServiceClient() + serviceClient := mockServiceClient(t) runner := New(k, serviceClient) + ensureShutdownOnCleanup(t, runner, logBytes) // Start the instance go runner.Run() @@ -866,10 +874,18 @@ func TestUpdatingRegistrationIDsOnlyRestartsForChanges(t *testing.T) { k.On("LogMaxBytesPerBatch").Return(0).Maybe() k.On("Transport").Return("jsonrpc").Maybe() k.On("ReadEnrollSecret").Return("", nil).Maybe() + k.On("InModernStandby").Return(false).Maybe() + k.On("RegisterChangeObserver", mock.Anything, keys.UpdateChannel).Maybe() + k.On("RegisterChangeObserver", mock.Anything, keys.PinnedLauncherVersion).Maybe() + k.On("RegisterChangeObserver", mock.Anything, keys.PinnedOsquerydVersion).Maybe() + k.On("UpdateChannel").Return("stable").Maybe() + k.On("PinnedLauncherVersion").Return("").Maybe() + k.On("PinnedOsquerydVersion").Return("").Maybe() setUpMockStores(t, k) - serviceClient := mockServiceClient() + serviceClient := mockServiceClient(t) runner := New(k, serviceClient) + ensureShutdownOnCleanup(t, runner, logBytes) // Start the instance go runner.Run() From bccda002ba0b049bceff51c1919522f653740139 Mon Sep 17 00:00:00 2001 From: zack olson Date: Fri, 24 Jan 2025 12:08:08 -0500 Subject: [PATCH 11/13] fix data races, add comments --- pkg/osquery/runtime/runner.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pkg/osquery/runtime/runner.go b/pkg/osquery/runtime/runner.go index 6e3702dcb..6cd01c21f 100644 --- a/pkg/osquery/runtime/runner.go +++ b/pkg/osquery/runtime/runner.go @@ -54,6 +54,20 @@ func New(k types.Knapsack, serviceClient service.KolideService, opts ...OsqueryI return runner } +// String method is only added to runner because it is often used in our runtime tests as an argument +// passed to mocked knapsack calls. when we AssertExpectations, the runner struct is traversed by the +// Diff logic inside testify. This causes data races to be incorrectly reported for structs containing mutexes +// (the second read is coming from testify). Implementing the stringer interface with locks acquired prevents these +// races. see (one of) the issues here for additional context https://github.com/stretchr/testify/issues/1597 +func (r *Runner) String() string { + r.instanceLock.Lock() + defer r.instanceLock.Unlock() + r.regIDLock.Lock() + defer r.regIDLock.Unlock() + + return fmt.Sprintf("%#v", r) +} + func (r *Runner) Run() error { for { err := r.runRegisteredInstances() From f551f6b9455fc534bab65fd0698a72da822baa3b Mon Sep 17 00:00:00 2001 From: zack olson Date: Tue, 28 Jan 2025 09:39:32 -0500 Subject: [PATCH 12/13] don't acquire locks for runner's String method --- pkg/osquery/runtime/runner.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/pkg/osquery/runtime/runner.go b/pkg/osquery/runtime/runner.go index 0a139f3b7..678278142 100644 --- a/pkg/osquery/runtime/runner.go +++ b/pkg/osquery/runtime/runner.go @@ -63,16 +63,14 @@ func New(k types.Knapsack, serviceClient service.KolideService, settingsWriter s // String method is only added to runner because it is often used in our runtime tests as an argument // passed to mocked knapsack calls. when we AssertExpectations, the runner struct is traversed by the -// Diff logic inside testify. This causes data races to be incorrectly reported for structs containing mutexes -// (the second read is coming from testify). Implementing the stringer interface with locks acquired prevents these -// races. see (one of) the issues here for additional context https://github.com/stretchr/testify/issues/1597 +// Diff logic inside testify. This causes data races to be incorrectly reported for structs containing mutexes- +// the second read is coming from testify. +// see (one of) the issues here for additional context https://github.com/stretchr/testify/issues/1597 +// If we really needed to expose more here, we could acquire all locks and return fmt.Sprintf("%#v", r). but given +// that we do not, it seems safer to avoid introducing any additional lock contention in case our Stringer call +// is invoked someday in a production flow func (r *Runner) String() string { - r.instanceLock.Lock() - defer r.instanceLock.Unlock() - r.regIDLock.Lock() - defer r.regIDLock.Unlock() - - return fmt.Sprintf("%#v", r) + return "runtime.Runner{}" } func (r *Runner) Run() error { From 658307c0a8c987b871704814925eabb3b15c0aa4 Mon Sep 17 00:00:00 2001 From: Zack Olson Date: Tue, 28 Jan 2025 11:15:31 -0500 Subject: [PATCH 13/13] Update pkg/osquery/runtime/runner.go Co-authored-by: Rebecca Mahany-Horton --- pkg/osquery/runtime/runner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/osquery/runtime/runner.go b/pkg/osquery/runtime/runner.go index 678278142..2dd063a70 100644 --- a/pkg/osquery/runtime/runner.go +++ b/pkg/osquery/runtime/runner.go @@ -36,7 +36,7 @@ type Runner struct { serviceClient service.KolideService // shared service client for communication between osquery instance and Kolide SaaS settingsWriter settingsStoreWriter // writes to startup settings store opts []OsqueryInstanceOption // global options applying to all osquery instances - shutdown chan struct{} // buffered shutdown channel for to enable shutting down to restart or exit + shutdown chan struct{} // buffered shutdown channel to enable shutting down to restart or exit rerunRequired atomic.Bool interrupted atomic.Bool }