From 8315e2a4444279451601eb8d3c8aef8d304ec940 Mon Sep 17 00:00:00 2001 From: Andrew Mains Date: Wed, 31 Aug 2022 11:18:21 -0400 Subject: [PATCH] =?UTF-8?q?etcd=5Fdocker=203:=20Incorporate=20docker=20bas?= =?UTF-8?q?ed=20etcd=20integration=20package=20into=E2=80=A6=20(#4147)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR 3 for https://github.com/m3db/m3/issues/4144 High level approach is as described in https://github.com/m3db/m3/issues/4144 . This PR incorporates the new test package into our unittests. Usage is via the `etcdintegration` package, which makes it transparent to the test code; it simply gets an etcd server started via different means. One piece of weirdness to call out here: the package currently relies on autosync being *disabled* on the client side. This is because the advertise client URL (aka what etcd tells clients to connect to) isn't correct for the open port on the host. That is we have: - etcd: listen on container port 0.0.0.0:2379, advertise 0.0.0.0:2379 - docker: expose etcd port 2379 to 0.0.0.0:0 on host machine (random free port) - client: connect to etcd via host machine. We could probably make this better. commit-id:263fed13 --- go.mod | 2 - go.sum | 2 - .../integration/custom_aggregations_test.go | 5 +- src/aggregator/integration/election.go | 21 +- .../integration/metadata_change_test.go | 4 +- .../integration/multi_client_one_type_test.go | 4 +- .../multi_server_forwarding_pipeline_test.go | 2 +- .../integration/multi_server_resend_test.go | 2 +- .../one_client_multi_type_forwarded_test.go | 4 +- .../one_client_multi_type_timed_test.go | 4 +- .../one_client_multi_type_untimed_test.go | 14 +- .../integration/one_client_passthru_test.go | 2 +- .../integration/placement_change_test.go | 7 +- .../integration/resend_stress_test.go | 1 - .../integration/same_id_multi_type_test.go | 4 +- src/aggregator/integration/setup.go | 8 + src/cluster/client/etcd/client.go | 8 +- src/cluster/client/etcd/client_test.go | 24 +- src/cluster/client/etcd/config.go | 27 ++- src/cluster/client/etcd/config_test.go | 7 + src/cluster/client/etcd/types.go | 5 + src/cluster/etcd/watchmanager/manager_test.go | 222 +++++++++--------- src/cluster/integration/etcd/etcd.go | 80 ++----- src/cluster/integration/etcd/options.go | 4 +- src/cluster/kv/etcd/store_test.go | 4 +- .../services/heartbeat/etcd/store_test.go | 2 +- src/cluster/services/leader/client_test.go | 8 +- .../services/leader/election/client_test.go | 2 +- src/cmd/services/m3dbnode/main/main_test.go | 5 +- 29 files changed, 257 insertions(+), 227 deletions(-) diff --git a/go.mod b/go.mod index c40d0e3a82..485f992594 100644 --- a/go.mod +++ b/go.mod @@ -70,7 +70,6 @@ require ( go.etcd.io/etcd/client/pkg/v3 v3.6.0-alpha.0 go.etcd.io/etcd/client/v3 v3.6.0-alpha.0 go.etcd.io/etcd/server/v3 v3.6.0-alpha.0 - go.etcd.io/etcd/tests/v3 v3.6.0-alpha.0 go.opentelemetry.io/collector v0.45.0 go.opentelemetry.io/otel v1.4.1 go.opentelemetry.io/otel/bridge/opentracing v1.4.1 @@ -121,7 +120,6 @@ require ( github.com/go-playground/locales v0.13.0 // indirect github.com/go-playground/universal-translator v0.17.0 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect - github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/google/btree v1.0.1 // indirect github.com/gorilla/handlers v1.5.1 // indirect github.com/gorilla/websocket v1.4.2 // indirect diff --git a/go.sum b/go.sum index 770fa1c583..22fa99d1b7 100644 --- a/go.sum +++ b/go.sum @@ -1592,8 +1592,6 @@ go.etcd.io/etcd/raft/v3 v3.6.0-alpha.0 h1:BQ6CnNP4pIpy5rusFlTBxAacDgPXhuiHFwoTsB go.etcd.io/etcd/raft/v3 v3.6.0-alpha.0/go.mod h1:/kZdrBXlc5fUgYXfIEQ0B5sb7ejXPKbtF4jWzF1exiQ= go.etcd.io/etcd/server/v3 v3.6.0-alpha.0 h1:BQUVqBqNFZZyrRbfydrRLzq9hYvCcRj97SsX1YwD7CA= go.etcd.io/etcd/server/v3 v3.6.0-alpha.0/go.mod h1:3QM2rLq3B3hSXmVEvgVt3vEEbG/AumSs0Is7EgrlKzU= -go.etcd.io/etcd/tests/v3 v3.6.0-alpha.0 h1:3qrZ3p/E7CxdV1kKtAU75hHOcUoXcSTwC7ELKWyzMJo= -go.etcd.io/etcd/tests/v3 v3.6.0-alpha.0/go.mod h1:hFQkP/cTsZIXXvUv+BsGHZ3TK+76XZMi5GToYA94iac= go.mongodb.org/mongo-driver v1.0.3/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.mongodb.org/mongo-driver v1.1.1/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.mongodb.org/mongo-driver v1.1.2/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= diff --git a/src/aggregator/integration/custom_aggregations_test.go b/src/aggregator/integration/custom_aggregations_test.go index d9c58cbe41..6bae441f46 100644 --- a/src/aggregator/integration/custom_aggregations_test.go +++ b/src/aggregator/integration/custom_aggregations_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2016 Uber Technologies, Inc. // @@ -68,7 +68,6 @@ func testCustomAggregations(t *testing.T, metadataFns [4]metadataFn) { if testing.Short() { t.SkipNow() } - aggTypesOpts := aggregation.NewTypesOptions(). SetCounterTypeStringTransformFn(aggregation.SuffixTransform). SetTimerTypeStringTransformFn(aggregation.SuffixTransform). @@ -179,7 +178,7 @@ func testCustomAggregations(t *testing.T, metadataFns [4]metadataFn) { // must be the longer than the lowest resolution across all policies. finalTime := end.Add(6 * time.Second) clock.SetNow(finalTime) - time.Sleep(6 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/election.go b/src/aggregator/integration/election.go index 1f24e02263..221996347d 100644 --- a/src/aggregator/integration/election.go +++ b/src/aggregator/integration/election.go @@ -26,9 +26,9 @@ import ( "github.com/m3db/m3/src/cluster/services" "github.com/m3db/m3/src/cluster/services/leader" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/integration" ) var ( @@ -40,27 +40,38 @@ var ( ) type testCluster struct { - t *testing.T - cluster *integration.Cluster + t *testing.T + cluster *integration.Cluster + leaderService services.LeaderService } func newTestCluster(t *testing.T) *testCluster { integration.BeforeTestExternal(t) - return &testCluster{ + cluster := &testCluster{ t: t, cluster: integration.NewCluster(t, &integration.ClusterConfig{ Size: testClusterSize, + // UseBridge: true, }), } + return cluster } func (tc *testCluster) LeaderService() services.LeaderService { + if tc.leaderService != nil { + return tc.leaderService + } + svc, err := leader.NewService(tc.etcdClient(), tc.options()) require.NoError(tc.t, err) - return svc + tc.leaderService = svc + return tc.leaderService } func (tc *testCluster) Close() { + if tc.leaderService != nil { + require.NoError(tc.t, tc.leaderService.Close()) + } tc.cluster.Terminate(tc.t) } diff --git a/src/aggregator/integration/metadata_change_test.go b/src/aggregator/integration/metadata_change_test.go index fa93ce7baf..9822579101 100644 --- a/src/aggregator/integration/metadata_change_test.go +++ b/src/aggregator/integration/metadata_change_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2016 Uber Technologies, Inc. // @@ -138,7 +138,7 @@ func testMetadataChange(t *testing.T, oldMetadataFn, newMetadataFn metadataFn) { // must be the longer than the lowest resolution across all policies. finalTime := end.Add(6 * time.Second) clock.SetNow(finalTime) - time.Sleep(6 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/multi_client_one_type_test.go b/src/aggregator/integration/multi_client_one_type_test.go index d05185a9ab..2af620205f 100644 --- a/src/aggregator/integration/multi_client_one_type_test.go +++ b/src/aggregator/integration/multi_client_one_type_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2016 Uber Technologies, Inc. // @@ -126,7 +126,7 @@ func testMultiClientOneType(t *testing.T, metadataFn metadataFn) { // must be the longer than the lowest resolution across all policies. finalTime := stop.Add(6 * time.Second) clock.SetNow(finalTime) - time.Sleep(4 * time.Second) + time.Sleep(waitForDataToFlush) for i := 0; i < numClients; i++ { require.NoError(t, clients[i].close()) diff --git a/src/aggregator/integration/multi_server_forwarding_pipeline_test.go b/src/aggregator/integration/multi_server_forwarding_pipeline_test.go index ec83c8f362..fb1cdbd407 100644 --- a/src/aggregator/integration/multi_server_forwarding_pipeline_test.go +++ b/src/aggregator/integration/multi_server_forwarding_pipeline_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2018 Uber Technologies, Inc. // diff --git a/src/aggregator/integration/multi_server_resend_test.go b/src/aggregator/integration/multi_server_resend_test.go index 6b7c2fb71e..7833e8d991 100644 --- a/src/aggregator/integration/multi_server_resend_test.go +++ b/src/aggregator/integration/multi_server_resend_test.go @@ -1,5 +1,4 @@ //go:build integration -// +build integration // Copyright (c) 2018 Uber Technologies, Inc. // @@ -142,6 +141,7 @@ func TestMultiServerResendAggregatedValues(t *testing.T) { // Election cluster setup. electionCluster := newTestCluster(t) + defer electionCluster.Close() // Sharding function maps all metrics to shard 0 except for the rollup metric, // which gets mapped to the last shard. diff --git a/src/aggregator/integration/one_client_multi_type_forwarded_test.go b/src/aggregator/integration/one_client_multi_type_forwarded_test.go index 9ead187784..039eeae391 100644 --- a/src/aggregator/integration/one_client_multi_type_forwarded_test.go +++ b/src/aggregator/integration/one_client_multi_type_forwarded_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2018 Uber Technologies, Inc. // @@ -121,7 +121,7 @@ func TestOneClientMultiTypeForwardedMetrics(t *testing.T) { // Move time forward and wait for flushing to happen. finalTime := stop.Add(2 * time.Second) clock.SetNow(finalTime) - time.Sleep(2 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/one_client_multi_type_timed_test.go b/src/aggregator/integration/one_client_multi_type_timed_test.go index 8449f5858b..80f8d6ec3f 100644 --- a/src/aggregator/integration/one_client_multi_type_timed_test.go +++ b/src/aggregator/integration/one_client_multi_type_timed_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2018 Uber Technologies, Inc. // @@ -119,7 +119,7 @@ func TestOneClientMultiTypeTimedMetrics(t *testing.T) { // Move time forward and wait for flushing to happen. finalTime := stop.Add(time.Minute + 2*time.Second) clock.SetNow(finalTime) - time.Sleep(2 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/one_client_multi_type_untimed_test.go b/src/aggregator/integration/one_client_multi_type_untimed_test.go index 59f6800b66..0adc3a5c2d 100644 --- a/src/aggregator/integration/one_client_multi_type_untimed_test.go +++ b/src/aggregator/integration/one_client_multi_type_untimed_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2016 Uber Technologies, Inc. // @@ -26,9 +26,17 @@ import ( "testing" "time" + "github.com/m3db/m3/src/cluster/placement" + "github.com/stretchr/testify/require" +) - "github.com/m3db/m3/src/cluster/placement" +const ( + // waitForDataToFlush is the amount of time we will wait in these tests between finishing writing data to + // the aggregator, and attempting to assert that data went through. + // The aggregator generally, and these tests specifically are quite sensitive to time. + // The tests probably need a bit of a rethink to wait on (or poll for) an actual condition instead of sleeping. + waitForDataToFlush = 10 * time.Second ) func TestOneClientMultiTypeUntimedMetricsWithStagedMetadatas(t *testing.T) { @@ -114,7 +122,7 @@ func testOneClientMultiType(t *testing.T, metadataFn metadataFn) { // must be the longer than the lowest resolution across all policies. finalTime := stop.Add(6 * time.Second) clock.SetNow(finalTime) - time.Sleep(4 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/one_client_passthru_test.go b/src/aggregator/integration/one_client_passthru_test.go index 75a6f49e25..489730a123 100644 --- a/src/aggregator/integration/one_client_passthru_test.go +++ b/src/aggregator/integration/one_client_passthru_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2020 Uber Technologies, Inc. // diff --git a/src/aggregator/integration/placement_change_test.go b/src/aggregator/integration/placement_change_test.go index 99683d7f3d..9f798b0ee1 100644 --- a/src/aggregator/integration/placement_change_test.go +++ b/src/aggregator/integration/placement_change_test.go @@ -1,5 +1,4 @@ //go:build integration -// +build integration // Copyright (c) 2018 Uber Technologies, Inc. // @@ -227,9 +226,9 @@ func TestPlacementChange(t *testing.T) { } clock.SetNow(start2) - time.Sleep(6 * time.Second) + time.Sleep(waitForDataToFlush) setPlacement(t, placementKey, clusterClient, finalPlacement) - time.Sleep(6 * time.Second) + time.Sleep(waitForDataToFlush) for _, data := range datasets[1] { clock.SetNow(data.timestamp) @@ -245,7 +244,7 @@ func TestPlacementChange(t *testing.T) { // Move time forward and wait for flushing to happen. clock.SetNow(finalTime) - time.Sleep(6 * time.Second) + time.Sleep(waitForDataToFlush) // Remove all the topic consumers before closing clients and servers. This allows to close the // connections between servers while they still are running. Otherwise, during server shutdown, diff --git a/src/aggregator/integration/resend_stress_test.go b/src/aggregator/integration/resend_stress_test.go index c1510ac7ee..691edad117 100644 --- a/src/aggregator/integration/resend_stress_test.go +++ b/src/aggregator/integration/resend_stress_test.go @@ -1,5 +1,4 @@ //go:build integration -// +build integration // Copyright (c) 2018 Uber Technologies, Inc. // diff --git a/src/aggregator/integration/same_id_multi_type_test.go b/src/aggregator/integration/same_id_multi_type_test.go index 09974adb2d..fe6b70d999 100644 --- a/src/aggregator/integration/same_id_multi_type_test.go +++ b/src/aggregator/integration/same_id_multi_type_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2016 Uber Technologies, Inc. // @@ -138,7 +138,7 @@ func testSameIDMultiType(t *testing.T, metadataFn metadataFn) { // must be the longer than the lowest resolution across all policies. finalTime := stop.Add(6 * time.Second) clock.SetNow(finalTime) - time.Sleep(4 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/setup.go b/src/aggregator/integration/setup.go index b0723392dc..3ed6726d6e 100644 --- a/src/aggregator/integration/setup.go +++ b/src/aggregator/integration/setup.go @@ -88,6 +88,7 @@ type testServerSetup struct { // Signals. doneCh chan struct{} closedCh chan struct{} + stopped bool } func newTestServerSetup(t *testing.T, opts testServerOptions) *testServerSetup { @@ -448,6 +449,10 @@ func (ts *testServerSetup) sortedResults() []aggregated.MetricWithStoragePolicy } func (ts *testServerSetup) stopServer() error { + if ts.stopped { + return nil + } + ts.stopped = true if err := ts.aggregator.Close(); err != nil { return err } @@ -460,6 +465,9 @@ func (ts *testServerSetup) stopServer() error { func (ts *testServerSetup) close() { ts.electionCluster.Close() + if err := ts.stopServer(); err != nil { + panic(err.Error()) + } } func (tss testServerSetups) newClient(t *testing.T) *client { diff --git a/src/cluster/client/etcd/client.go b/src/cluster/client/etcd/client.go index af4a71828a..6dd5f0a338 100644 --- a/src/cluster/client/etcd/client.go +++ b/src/cluster/client/etcd/client.go @@ -339,8 +339,14 @@ func newConfigFromCluster(rnd randInt63N, cluster Cluster) (clientv3.Config, err if err != nil { return clientv3.Config{}, err } + + // Support disabling autosync if a user very explicitly requests it (via negative duration). + autoSyncInterval := cluster.AutoSyncInterval() + if autoSyncInterval < 0 { + autoSyncInterval = 0 + } cfg := clientv3.Config{ - AutoSyncInterval: cluster.AutoSyncInterval(), + AutoSyncInterval: autoSyncInterval, DialTimeout: cluster.DialTimeout(), DialOptions: cluster.DialOptions(), Endpoints: cluster.Endpoints(), diff --git a/src/cluster/client/etcd/client_test.go b/src/cluster/client/etcd/client_test.go index 3d1d0b60db..343842eb4e 100644 --- a/src/cluster/client/etcd/client_test.go +++ b/src/cluster/client/etcd/client_test.go @@ -25,18 +25,23 @@ import ( "testing" "time" + "github.com/m3db/m3/src/cluster/kv" + "github.com/m3db/m3/src/cluster/services" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" + "github.com/m3db/m3/src/x/retry" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/integration" "google.golang.org/grpc" - - "github.com/m3db/m3/src/cluster/kv" - "github.com/m3db/m3/src/cluster/services" ) func TestETCDClientGen(t *testing.T) { - cs, err := NewConfigServiceClient(testOptions()) + cs, err := NewConfigServiceClient( + testOptions(). + // These are error cases; don't retry for no reason. + SetRetryOptions(retry.NewOptions().SetMaxRetries(0)), + ) require.NoError(t, err) c := cs.(*csclient) @@ -414,6 +419,15 @@ func Test_newConfigFromCluster(t *testing.T) { ) }) + t.Run("negative autosync on M3 disables autosync for etcd", func(t *testing.T) { + inputCfg := newFullConfig() + inputCfg.AutoSyncInterval = -1 + etcdCfg, err := newConfigFromCluster(testRnd, inputCfg.NewCluster()) + require.NoError(t, err) + + assert.Equal(t, time.Duration(0), etcdCfg.AutoSyncInterval) + }) + // Separate test just because the assert.Equal won't work for functions. t.Run("passes through dial options", func(t *testing.T) { clusterCfg := newFullConfig() diff --git a/src/cluster/client/etcd/config.go b/src/cluster/client/etcd/config.go index 877f2051e5..520afa385e 100644 --- a/src/cluster/client/etcd/config.go +++ b/src/cluster/client/etcd/config.go @@ -35,12 +35,22 @@ import ( // ClusterConfig is the config for a zoned etcd cluster. type ClusterConfig struct { - Zone string `yaml:"zone"` - Endpoints []string `yaml:"endpoints"` - KeepAlive *KeepAliveConfig `yaml:"keepAlive"` - TLS *TLSConfig `yaml:"tls"` - AutoSyncInterval time.Duration `yaml:"autoSyncInterval"` - DialTimeout time.Duration `yaml:"dialTimeout"` + Zone string `yaml:"zone"` + Endpoints []string `yaml:"endpoints"` + KeepAlive *KeepAliveConfig `yaml:"keepAlive"` + TLS *TLSConfig `yaml:"tls"` + // AutoSyncInterval configures the etcd client's AutoSyncInterval + // (go.etcd.io/etcd/client/v3@v3.6.0-alpha.0/config.go:32). + // By default, it is 1m. + // + // Advanced: + // + // One important difference from the etcd config: we have autosync *on* by default (unlike etcd), meaning that + // the zero value here doesn't indicate autosync off. + // Instead, users should pass in a negative value to indicate "disable autosync" + // Only do this if you truly have a good reason for it! Most production use cases want autosync on. + AutoSyncInterval time.Duration `yaml:"autoSyncInterval"` + DialTimeout time.Duration `yaml:"dialTimeout"` DialOptions []grpc.DialOption `yaml:"-"` // nonserializable } @@ -59,7 +69,10 @@ func (c ClusterConfig) NewCluster() Cluster { SetKeepAliveOptions(keepAliveOpts). SetTLSOptions(c.TLS.newOptions()) - if c.AutoSyncInterval > 0 { + // Autosync should *always* be on, unless the user very explicitly requests it to be off. They can do this via a + // negative value (in which case we can assume they know what they're doing). + // Therefore, only update if it's nonzero, on the assumption that zero is just the empty value. + if c.AutoSyncInterval != 0 { cluster = cluster.SetAutoSyncInterval(c.AutoSyncInterval) } diff --git a/src/cluster/client/etcd/config_test.go b/src/cluster/client/etcd/config_test.go index 1bc8959117..278b41bb1f 100644 --- a/src/cluster/client/etcd/config_test.go +++ b/src/cluster/client/etcd/config_test.go @@ -181,3 +181,10 @@ func TestDefaultConfig(t *testing.T) { require.Equal(t, defaultDialTimeout, cluster.DialTimeout()) require.Equal(t, defaultAutoSyncInterval, cluster.AutoSyncInterval()) } + +func TestConfig_negativeAutosync(t *testing.T) { + cluster := ClusterConfig{ + AutoSyncInterval: -5, + }.NewCluster() + require.Equal(t, time.Duration(-5), cluster.AutoSyncInterval()) +} diff --git a/src/cluster/client/etcd/types.go b/src/cluster/client/etcd/types.go index 9caf012e70..5b60832801 100644 --- a/src/cluster/client/etcd/types.go +++ b/src/cluster/client/etcd/types.go @@ -159,6 +159,11 @@ type Cluster interface { SetTLSOptions(TLSOptions) Cluster AutoSyncInterval() time.Duration + + // SetAutoSyncInterval sets the etcd client to autosync cluster endpoints periodically. This defaults to + // 1 minute (defaultAutoSyncInterval). If negative or zero, it will disable autosync. This differs slightly + // from the underlying etcd configuration its setting, which only supports 0 for disabling. We do this because + // there's otherwise no good way to specify "disable" in our configs (which default to SetAutoSyncInterval(1m)). SetAutoSyncInterval(value time.Duration) Cluster DialTimeout() time.Duration diff --git a/src/cluster/etcd/watchmanager/manager_test.go b/src/cluster/etcd/watchmanager/manager_test.go index d65758e8ff..ddc8df306c 100644 --- a/src/cluster/etcd/watchmanager/manager_test.go +++ b/src/cluster/etcd/watchmanager/manager_test.go @@ -22,16 +22,15 @@ package watchmanager import ( "fmt" - "runtime" "sync/atomic" "testing" "time" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/integration" "golang.org/x/net/context" "github.com/m3db/m3/src/x/clock" @@ -164,114 +163,117 @@ func TestWatchRecreate(t *testing.T) { <-doneCh } -func TestWatchNoLeader(t *testing.T) { - t.Skip("flaky, started to fail very consistently on CI") - const ( - watchInitAndRetryDelay = 200 * time.Millisecond - watchCheckInterval = 50 * time.Millisecond - ) - - integration.BeforeTestExternal(t) - ecluster := integration.NewCluster(t, &integration.ClusterConfig{Size: 3}) - defer ecluster.Terminate(t) - - var ( - ec = ecluster.Client(0) - tickDuration = 10 * time.Millisecond - electionTimeout = time.Duration(3*ecluster.Members[0].ElectionTicks) * tickDuration - doneCh = make(chan struct{}, 1) - eventLog = []*clientv3.Event{} - updateCalled int32 - shouldStop int32 - ) - - opts := NewOptions(). - SetClient(ec). - SetUpdateFn( - func(_ string, e []*clientv3.Event) error { - atomic.AddInt32(&updateCalled, 1) - if len(e) > 0 { - eventLog = append(eventLog, e...) - } - return nil - }, - ). - SetTickAndStopFn( - func(string) bool { - if atomic.LoadInt32(&shouldStop) == 0 { - return false - } - - close(doneCh) - - return true - }, - ). - SetWatchChanInitTimeout(watchInitAndRetryDelay). - SetWatchChanResetInterval(watchInitAndRetryDelay). - SetWatchChanCheckInterval(watchCheckInterval) - - integration.WaitClientV3(t, ec) - - wh, err := NewWatchManager(opts) - require.NoError(t, err) - - go wh.Watch("foo") - - runtime.Gosched() - time.Sleep(10 * time.Millisecond) - - // there should be a valid watch now, trigger a notification - _, err = ec.Put(context.Background(), "foo", "bar") - require.NoError(t, err) - - leaderIdx := ecluster.WaitLeader(t) - require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Members), "got invalid leader") - - // simulate quorum loss - ecluster.Members[1].Stop(t) - ecluster.Members[2].Stop(t) - - // wait for election timeout, then member[0] will not have a leader. - time.Sleep(electionTimeout) - - require.NoError(t, ecluster.Members[1].Restart(t)) - require.NoError(t, ecluster.Members[2].Restart(t)) - - // wait for leader + election delay just in case - time.Sleep(time.Duration(3*ecluster.Members[0].ElectionTicks) * tickDuration) - - leaderIdx = ecluster.WaitLeader(t) - require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Members), "got invalid leader") - integration.WaitClientV3(t, ec) // wait for client to be ready again - - _, err = ec.Put(context.Background(), "foo", "baz") - require.NoError(t, err) - - // give some time for watch to be updated - require.True(t, clock.WaitUntil(func() bool { - return atomic.LoadInt32(&updateCalled) >= 2 - }, 10*time.Second)) - - updates := atomic.LoadInt32(&updateCalled) - if updates < 2 { - require.Fail(t, - "insufficient update calls", - "expected at least 2 update attempts, got %d during a partition", - updates) - } - - atomic.AddInt32(&shouldStop, 1) - <-doneCh - - require.Len(t, eventLog, 2) - require.NotNil(t, eventLog[0]) - require.Equal(t, eventLog[0].Kv.Key, []byte("foo")) - require.Equal(t, eventLog[0].Kv.Value, []byte("bar")) - require.NotNil(t, eventLog[1]) - require.Equal(t, eventLog[1].Kv.Key, []byte("foo")) - require.Equal(t, eventLog[1].Kv.Value, []byte("baz")) -} +// TODO: this test has been skipped for a while, and now doesn't work with the docker based etcd integration package. +// Revive it if it's useful, and make it no longer flake. +//nolint:gocritic +//func TestWatchNoLeader(t *testing.T) { +// t.Skip("flaky, started to fail very consistently on CI") +// const ( +// watchInitAndRetryDelay = 200 * time.Millisecond +// watchCheckInterval = 50 * time.Millisecond +// ) +// +// integration.BeforeTestExternal(t) +// ecluster := integration.NewCluster(t, &integration.ClusterConfig{Size: 3}) +// defer ecluster.Terminate(t) +// +// var ( +// ec = ecluster.Client(0) +// tickDuration = 10 * time.Millisecond +// electionTimeout = time.Duration(3*ecluster.Address[0].ElectionTicks) * tickDuration +// doneCh = make(chan struct{}, 1) +// eventLog = []*clientv3.Event{} +// updateCalled int32 +// shouldStop int32 +// ) +// +// opts := NewOptions(). +// SetClient(ec). +// SetUpdateFn( +// func(_ string, e []*clientv3.Event) error { +// atomic.AddInt32(&updateCalled, 1) +// if len(e) > 0 { +// eventLog = append(eventLog, e...) +// } +// return nil +// }, +// ). +// SetTickAndStopFn( +// func(string) bool { +// if atomic.LoadInt32(&shouldStop) == 0 { +// return false +// } +// +// close(doneCh) +// +// return true +// }, +// ). +// SetWatchChanInitTimeout(watchInitAndRetryDelay). +// SetWatchChanResetInterval(watchInitAndRetryDelay). +// SetWatchChanCheckInterval(watchCheckInterval) +// +// integration.WaitClientV3(t, ec) +// +// wh, err := NewWatchManager(opts) +// require.NoError(t, err) +// +// go wh.Watch("foo") +// +// runtime.Gosched() +// time.Sleep(10 * time.Millisecond) +// +// // there should be a valid watch now, trigger a notification +// _, err = ec.Put(context.Background(), "foo", "bar") +// require.NoError(t, err) +// +// leaderIdx := ecluster.WaitLeader(t) +// require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Address), "got invalid leader") +// +// // simulate quorum loss +// ecluster.Address[1].Stop(t) +// ecluster.Address[2].Stop(t) +// +// // wait for election timeout, then member[0] will not have a leader. +// time.Sleep(electionTimeout) +// +// require.NoError(t, ecluster.Address[1].Restart(t)) +// require.NoError(t, ecluster.Address[2].Restart(t)) +// +// // wait for leader + election delay just in case +// time.Sleep(time.Duration(3*ecluster.Address[0].ElectionTicks) * tickDuration) +// +// leaderIdx = ecluster.WaitLeader(t) +// require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Address), "got invalid leader") +// integration.WaitClientV3(t, ec) // wait for client to be ready again +// +// _, err = ec.Put(context.Background(), "foo", "baz") +// require.NoError(t, err) +// +// // give some time for watch to be updated +// require.True(t, clock.WaitUntil(func() bool { +// return atomic.LoadInt32(&updateCalled) >= 2 +// }, 10*time.Second)) +// +// updates := atomic.LoadInt32(&updateCalled) +// if updates < 2 { +// require.Fail(t, +// "insufficient update calls", +// "expected at least 2 update attempts, got %d during a partition", +// updates) +// } +// +// atomic.AddInt32(&shouldStop, 1) +// <-doneCh +// +// require.Len(t, eventLog, 2) +// require.NotNil(t, eventLog[0]) +// require.Equal(t, eventLog[0].Kv.Key, []byte("foo")) +// require.Equal(t, eventLog[0].Kv.Value, []byte("bar")) +// require.NotNil(t, eventLog[1]) +// require.Equal(t, eventLog[1].Kv.Key, []byte("foo")) +// require.Equal(t, eventLog[1].Kv.Value, []byte("baz")) +//} func TestWatchCompactedRevision(t *testing.T) { wh, ec, updateCalled, shouldStop, doneCh, closer := testSetup(t) diff --git a/src/cluster/integration/etcd/etcd.go b/src/cluster/integration/etcd/etcd.go index 2c93bf1200..b836e36967 100644 --- a/src/cluster/integration/etcd/etcd.go +++ b/src/cluster/integration/etcd/etcd.go @@ -21,26 +21,24 @@ package etcd import ( + "context" "encoding/json" "fmt" "io/ioutil" "net/http" - "net/url" - "os" "strings" - "time" "github.com/m3db/m3/src/cluster/client" etcdclient "github.com/m3db/m3/src/cluster/client/etcd" "github.com/m3db/m3/src/cluster/services" - xclock "github.com/m3db/m3/src/x/clock" - "github.com/m3db/m3/src/x/errors" + "github.com/m3db/m3/src/integration/resources/docker/dockerexternal" + "github.com/m3db/m3/src/x/instrument" - "go.etcd.io/etcd/server/v3/embed" + "github.com/ory/dockertest/v3" ) type embeddedKV struct { - etcd *embed.Etcd + etcd *dockerexternal.EtcdNode opts Options dir string } @@ -51,12 +49,12 @@ func New(opts Options) (EmbeddedKV, error) { if err != nil { return nil, err } - cfg := embed.NewConfig() - cfg.Dir = dir - cfg.Logger = "zap" - setRandomPorts(cfg) - e, err := embed.StartEtcd(cfg) + pool, err := dockertest.NewPool("") + if err != nil { + return nil, fmt.Errorf("constructing dockertest.Pool for EmbeddedKV: %w", err) + } + e, err := dockerexternal.NewEtcd(pool, instrument.NewOptions()) if err != nil { return nil, fmt.Errorf("unable to start etcd, err: %v", err) } @@ -67,56 +65,16 @@ func New(opts Options) (EmbeddedKV, error) { }, nil } -func setRandomPorts(cfg *embed.Config) { - randomPortURL, err := url.Parse("http://localhost:0") - if err != nil { - panic(err.Error()) - } - - cfg.LPUrls = []url.URL{*randomPortURL} - cfg.APUrls = []url.URL{*randomPortURL} - cfg.LCUrls = []url.URL{*randomPortURL} - cfg.ACUrls = []url.URL{*randomPortURL} - - cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) -} - func (e *embeddedKV) Close() error { - var multi errors.MultiError - - // see if there's any errors - select { - case err := <-e.etcd.Err(): - multi = multi.Add(err) - default: - } - - // shutdown and release - e.etcd.Server.Stop() - e.etcd.Close() - - multi = multi.Add(os.RemoveAll(e.dir)) - return multi.FinalError() + return e.etcd.Close(context.TODO()) } func (e *embeddedKV) Start() error { timeout := e.opts.InitTimeout() - select { - case <-e.etcd.Server.ReadyNotify(): - break - case <-time.After(timeout): - return fmt.Errorf("etcd server took too long to start") - } - - // ensure v3 api endpoints are available, https://github.com/coreos/etcd/pull/7075 - apiVersionEndpoint := fmt.Sprintf("http://%s/version", e.etcd.Clients[0].Addr().String()) - fn := func() bool { return version3Available(apiVersionEndpoint) } - ok := xclock.WaitUntil(fn, timeout) - if !ok { - return fmt.Errorf("api version 3 not available") - } + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() - return nil + return e.etcd.Setup(ctx) } type versionResponse struct { @@ -144,11 +102,7 @@ func version3Available(endpoint string) bool { } func (e *embeddedKV) Endpoints() []string { - addresses := make([]string, 0, len(e.etcd.Clients)) - for _, c := range e.etcd.Clients { - addresses = append(addresses, c.Addr().String()) - } - return addresses + return []string{e.etcd.Address()} } func (e *embeddedKV) ConfigServiceClient(fns ...ClientOptFn) (client.Client, error) { @@ -156,7 +110,9 @@ func (e *embeddedKV) ConfigServiceClient(fns ...ClientOptFn) (client.Client, err SetInstrumentOptions(e.opts.InstrumentOptions()). SetServicesOptions(services.NewOptions().SetInitTimeout(e.opts.InitTimeout())). SetClusters([]etcdclient.Cluster{ - etcdclient.NewCluster().SetZone(e.opts.Zone()).SetEndpoints(e.Endpoints()), + etcdclient.NewCluster().SetZone(e.opts.Zone()). + SetEndpoints(e.Endpoints()). + SetAutoSyncInterval(-1), }). SetService(e.opts.ServiceID()). SetEnv(e.opts.Environment()). diff --git a/src/cluster/integration/etcd/options.go b/src/cluster/integration/etcd/options.go index 2c986bebfb..4382996a3a 100644 --- a/src/cluster/integration/etcd/options.go +++ b/src/cluster/integration/etcd/options.go @@ -27,7 +27,7 @@ import ( ) const ( - defaulTimeout = 5 * time.Second + defaultTimeout = 30 * time.Second defaultDir = "etcd.dir" defaultServiceID = "integration.service" defaultEnv = "integration.env" @@ -48,7 +48,7 @@ func NewOptions() Options { return &opts{ iopts: instrument.NewOptions(), workingDir: defaultDir, - initTimeout: defaulTimeout, + initTimeout: defaultTimeout, serviceID: defaultServiceID, env: defaultEnv, zone: defaultZone, diff --git a/src/cluster/kv/etcd/store_test.go b/src/cluster/kv/etcd/store_test.go index a4f4773934..64c7e781d2 100644 --- a/src/cluster/kv/etcd/store_test.go +++ b/src/cluster/kv/etcd/store_test.go @@ -36,10 +36,10 @@ import ( "github.com/m3db/m3/src/x/retry" "github.com/golang/protobuf/proto" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/integration" "golang.org/x/net/context" ) @@ -513,7 +513,7 @@ func TestWatchNonBlocking(t *testing.T) { ecluster, opts, closeFn := testCluster(t) defer closeFn() - ec := ecluster.Client(0) + ec := ecluster.RandClient() opts = opts.SetWatchChanResetInterval(200 * time.Millisecond).SetWatchChanInitTimeout(500 * time.Millisecond) diff --git a/src/cluster/services/heartbeat/etcd/store_test.go b/src/cluster/services/heartbeat/etcd/store_test.go index 4950becc1e..b38025259b 100644 --- a/src/cluster/services/heartbeat/etcd/store_test.go +++ b/src/cluster/services/heartbeat/etcd/store_test.go @@ -28,9 +28,9 @@ import ( "github.com/m3db/m3/src/cluster/placement" "github.com/m3db/m3/src/cluster/services" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/integration" ) func TestKeys(t *testing.T) { diff --git a/src/cluster/services/leader/client_test.go b/src/cluster/services/leader/client_test.go index 766d9e59fd..0fb7b75d4c 100644 --- a/src/cluster/services/leader/client_test.go +++ b/src/cluster/services/leader/client_test.go @@ -30,10 +30,10 @@ import ( "github.com/m3db/m3/src/cluster/services/leader/campaign" "github.com/m3db/m3/src/cluster/services/leader/election" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/integration" "golang.org/x/net/context" ) @@ -127,11 +127,15 @@ func TestNewClient(t *testing.T) { assert.NotNil(t, svc) } +// TODO: this test most likely wasn't testing what we thought it was. While using etcd/testing/framework/integration, +// the client gets closed func TestNewClient_BadCluster(t *testing.T) { + t.Skip("This test only works with the etcd/testing/framework/integration package, " + + "and doesn't provide much signal on correctness of our code.") tc := newTestCluster(t) cl := tc.etcdClient() tc.close() - + require.NoError(t, cl.Close()) _, err := newClient(cl, tc.options(), "") assert.Error(t, err) } diff --git a/src/cluster/services/leader/election/client_test.go b/src/cluster/services/leader/election/client_test.go index af9c0b1759..6cc2db348d 100644 --- a/src/cluster/services/leader/election/client_test.go +++ b/src/cluster/services/leader/election/client_test.go @@ -25,11 +25,11 @@ import ( "testing" "time" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" - "go.etcd.io/etcd/tests/v3/framework/integration" ) type testCluster struct { diff --git a/src/cmd/services/m3dbnode/main/main_test.go b/src/cmd/services/m3dbnode/main/main_test.go index f488662251..f46e6c39aa 100644 --- a/src/cmd/services/m3dbnode/main/main_test.go +++ b/src/cmd/services/m3dbnode/main/main_test.go @@ -1,4 +1,6 @@ +//go:build big // +build big + // // Copyright (c) 2017 Uber Technologies, Inc. // @@ -51,7 +53,7 @@ import ( // TestConfig tests booting a server using file based configuration. func TestConfig(t *testing.T) { // Embedded kv - embeddedKV, err := etcd.New(etcd.NewOptions()) + embeddedKV, err := etcd.New(etcd.NewOptions().SetInitTimeout(30 * time.Second)) require.NoError(t, err) defer func() { require.NoError(t, embeddedKV.Close()) @@ -631,6 +633,7 @@ db: etcdClusters: - zone: {{.ServiceZone}} endpoints: {{.EtcdEndpoints}} + autoSyncInterval: -1 ` embeddedKVConfigPortion = `