-
Notifications
You must be signed in to change notification settings - Fork 455
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
etcd_docker 2: Add a docker based etcdintegration package
PR 2 for #4144 High level approach is as described in #4144 . This PR adds: - Functions to spin up a 1 node etcd cluster using docker (in `dockerexternal`) - A drop in replacement for the etcd/integration package using `dockerexternal` commit-id:7aeae843
- Loading branch information
1 parent
c5b2c66
commit 978c1a5
Showing
11 changed files
with
1,205 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
279 changes: 279 additions & 0 deletions
279
src/integration/resources/docker/dockerexternal/etcd.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,279 @@ | ||
// Copyright (c) 2022 Uber Technologies, Inc. | ||
// | ||
// Permission is hereby granted, free of charge, to any person obtaining a copy | ||
// of this software and associated documentation files (the "Software"), to deal | ||
// in the Software without restriction, including without limitation the rights | ||
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
// copies of the Software, and to permit persons to whom the Software is | ||
// furnished to do so, subject to the following conditions: | ||
// | ||
// The above copyright notice and this permission notice shall be included in | ||
// all copies or substantial portions of the Software. | ||
// | ||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | ||
// THE SOFTWARE. | ||
|
||
package dockerexternal | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"math/rand" | ||
"net" | ||
"strconv" | ||
"time" | ||
|
||
"github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration/bridge" | ||
xdockertest "github.com/m3db/m3/src/x/dockertest" | ||
xerrors "github.com/m3db/m3/src/x/errors" | ||
"github.com/m3db/m3/src/x/instrument" | ||
"github.com/m3db/m3/src/x/retry" | ||
|
||
"github.com/ory/dockertest/v3" | ||
"github.com/ory/dockertest/v3/docker" | ||
clientv3 "go.etcd.io/etcd/client/v3" | ||
"go.uber.org/zap" | ||
"google.golang.org/grpc" | ||
) | ||
|
||
var ( | ||
etcdImage = xdockertest.Image{ | ||
Name: "bitnami/etcd", | ||
Tag: "3.5.4", | ||
} | ||
) | ||
|
||
// NewEtcd constructs a single etcd node, running in a docker container. | ||
func NewEtcd( | ||
pool *dockertest.Pool, | ||
instrumentOpts instrument.Options, | ||
options ...EtcdClusterOption, | ||
) (*EtcdNode, error) { | ||
logger := instrumentOpts.Logger() | ||
if logger == nil { | ||
logger = zap.NewNop() | ||
instrumentOpts = instrumentOpts.SetLogger(logger) | ||
} | ||
|
||
var opts etcdClusterOptions | ||
for _, o := range options { | ||
o.apply(&opts) | ||
} | ||
|
||
return &EtcdNode{ | ||
pool: pool, | ||
instrumentOpts: instrumentOpts, | ||
logger: logger, | ||
opts: opts, | ||
// Solely for mocking in tests--unfortunately we don't want to take in the etcd client as a dependency here | ||
// (we don't know the endpoints, and therefore need to construct it ourselves). | ||
// Thus, we do two hops (mock newClient returning mock memberClient) | ||
newClient: func(config clientv3.Config) (memberClient, error) { | ||
return clientv3.New(config) | ||
}, | ||
}, nil | ||
} | ||
|
||
// EtcdNode is a single etcd node, running via a docker container. | ||
//nolint:maligned | ||
type EtcdNode struct { | ||
instrumentOpts instrument.Options | ||
logger *zap.Logger | ||
pool *dockertest.Pool | ||
opts etcdClusterOptions | ||
|
||
// namePrefix is used to name the cluster. Exists solely for unittests in this package; otherwise a const | ||
namePrefix string | ||
newClient func(config clientv3.Config) (memberClient, error) | ||
|
||
// initialized by Setup | ||
address string | ||
resource *xdockertest.Resource | ||
etcdCli *clientv3.Client | ||
bridge *bridge.Bridge | ||
|
||
stopped bool | ||
} | ||
|
||
// Setup starts the docker container. | ||
func (c *EtcdNode) Setup(ctx context.Context) (closeErr error) { | ||
if c.resource != nil { | ||
return errors.New("etcd cluster already started") | ||
} | ||
|
||
// nolint:gosec | ||
id := rand.New(rand.NewSource(time.Now().UnixNano())).Int() | ||
|
||
namePrefix := "m3-test-etcd-" | ||
if c.namePrefix != "" { | ||
// support overriding for tests | ||
namePrefix = c.namePrefix | ||
} | ||
|
||
// Roughly, runs: | ||
// docker run --rm --env ALLOW_NONE_AUTHENTICATION=yes -it --name Etcd bitnami/etcd | ||
// Port 2379 on the container is bound to a free port on the host | ||
resource, err := xdockertest.NewDockerResource(c.pool, xdockertest.ResourceOptions{ | ||
OverrideDefaults: false, | ||
// TODO: what even is this? | ||
Source: "etcd", | ||
ContainerName: fmt.Sprintf("%s%d", namePrefix, id), | ||
Image: etcdImage, | ||
Env: []string{"ALLOW_NONE_AUTHENTICATION=yes"}, | ||
InstrumentOpts: c.instrumentOpts, | ||
PortMappings: map[docker.Port][]docker.PortBinding{ | ||
"2379/tcp": {{ | ||
HostIP: "0.0.0.0", | ||
HostPort: strconv.Itoa(c.opts.port), | ||
}}, | ||
}, | ||
NoNetworkOverlay: true, | ||
}) | ||
|
||
if err != nil { | ||
return fmt.Errorf("starting etcd container: %w", err) | ||
} | ||
|
||
container := resource.Resource().Container | ||
c.logger.Info("etcd container started", | ||
zap.String("containerID", container.ID), | ||
zap.Any("ports", container.NetworkSettings.Ports), | ||
// Uncomment if you need gory details about the container printed; equivalent of `docker inspect <id> | ||
// zap.Any("container", container), | ||
) | ||
// Extract the port on which we are listening. | ||
// This is coming from the equivalent of docker inspect <container_id> | ||
portBinds := container.NetworkSettings.Ports["2379/tcp"] | ||
|
||
// If running in a docker container e.g. on buildkite, route to etcd using the published port on the *host* machine. | ||
// See also http://github.com/m3db/m3/blob/master/docker-compose.yml#L16-L16 | ||
ipAddr := "127.0.0.1" | ||
_, err = net.ResolveIPAddr("ip4", "host.docker.internal") | ||
if err == nil { | ||
c.logger.Info("Running tests within a docker container (e.g. for buildkite. " + | ||
"Using host.docker.internal to talk to etcd") | ||
ipAddr = "host.docker.internal" | ||
} | ||
|
||
c.resource = resource | ||
c.address = fmt.Sprintf("%s:%s", ipAddr, portBinds[0].HostPort) | ||
|
||
etcdCli, err := clientv3.New( | ||
clientv3.Config{ | ||
Endpoints: []string{c.address}, | ||
DialTimeout: 5 * time.Second, | ||
DialOptions: []grpc.DialOption{grpc.WithBlock()}, | ||
Logger: c.logger, | ||
}, | ||
) | ||
if err != nil { | ||
return fmt.Errorf("constructing etcd client: %w", err) | ||
} | ||
|
||
defer func() { | ||
if err := etcdCli.Close(); err != nil { | ||
var merr xerrors.MultiError | ||
closeErr = merr. | ||
Add(closeErr). | ||
Add(fmt.Errorf("closing etcd client: %w", err)). | ||
FinalError() | ||
} | ||
}() | ||
|
||
return c.waitForHealth(ctx, etcdCli) | ||
} | ||
|
||
func (c *EtcdNode) containerHostPort() string { | ||
portBinds := c.resource.Resource().Container.NetworkSettings.Ports["2379/tcp"] | ||
|
||
return fmt.Sprintf("127.0.0.1:%s", portBinds[0].HostPort) | ||
} | ||
|
||
func (c *EtcdNode) waitForHealth(ctx context.Context, memberCli memberClient) error { | ||
retrier := retry.NewRetrier(retry.NewOptions(). | ||
SetForever(true). | ||
SetMaxBackoff(5 * time.Second), | ||
) | ||
|
||
var timeout time.Duration | ||
deadline, ok := ctx.Deadline() | ||
if ok { | ||
timeout = deadline.Sub(time.Now()) | ||
} | ||
c.logger.Info( | ||
"Waiting for etcd to report healthy (via member list)", | ||
zap.String("timeout", timeout.String()), | ||
) | ||
err := retrier.AttemptContext(ctx, func() error { | ||
_, err := memberCli.MemberList(ctx) | ||
if err != nil { | ||
c.logger.Info( | ||
"Failed connecting to etcd while waiting for container to come up", | ||
zap.Error(err), | ||
zap.String("endpoints", c.address), | ||
) | ||
} | ||
return err | ||
}) | ||
if err == nil { | ||
c.logger.Info("etcd is healthy") | ||
return nil | ||
} | ||
return fmt.Errorf("waiting for etcd to become healthy: %w", err) | ||
} | ||
|
||
// Close stops the etcd node, and removes it. | ||
func (c *EtcdNode) Close(ctx context.Context) error { | ||
var err xerrors.MultiError | ||
err = err. | ||
Add(c.resource.Close()) | ||
return err.FinalError() | ||
} | ||
|
||
// Address is the host:port of the etcd node for use by etcd clients. | ||
func (c *EtcdNode) Address() string { | ||
return c.address | ||
} | ||
|
||
// Stop stops the etcd container, but does not purge it. A stopped container can be restarted with Restart. | ||
func (c *EtcdNode) Stop(ctx context.Context) error { | ||
if c.stopped { | ||
return errors.New("etcd node is already stopped") | ||
} | ||
if err := c.pool.Client.StopContainerWithContext(c.resource.Resource().Container.ID, 0, ctx); err != nil { | ||
return err | ||
} | ||
c.stopped = true | ||
return nil | ||
} | ||
|
||
// Restart restarts the etcd container. If it isn't currently stopped, the etcd container will be stopped and then | ||
// started; else it will just be start. | ||
func (c *EtcdNode) Restart(ctx context.Context) error { | ||
if !c.stopped { | ||
c.logger.Info("Stopping etcd node") | ||
|
||
if err := c.Stop(ctx); err != nil { | ||
return fmt.Errorf("stopping etcd node for Restart: %w", err) | ||
} | ||
} | ||
err := c.pool.Client.StartContainerWithContext(c.resource.Resource().Container.ID, nil, ctx) | ||
if err != nil { | ||
return fmt.Errorf("starting etcd node for Restart: %w", err) | ||
} | ||
c.stopped = false | ||
return nil | ||
} | ||
|
||
var _ memberClient = (*clientv3.Client)(nil) | ||
|
||
// memberClient exposes just one method of *clientv3.Client, for purposes of tests. | ||
type memberClient interface { | ||
MemberList(ctx context.Context) (*clientv3.MemberListResponse, error) | ||
} |
57 changes: 57 additions & 0 deletions
57
src/integration/resources/docker/dockerexternal/etcd_options.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
// Copyright (c) 2022 Uber Technologies, Inc. | ||
// | ||
// Permission is hereby granted, free of charge, to any person obtaining a copy | ||
// of this software and associated documentation files (the "Software"), to deal | ||
// in the Software without restriction, including without limitation the rights | ||
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
// copies of the Software, and to permit persons to whom the Software is | ||
// furnished to do so, subject to the following conditions: | ||
// | ||
// The above copyright notice and this permission notice shall be included in | ||
// all copies or substantial portions of the Software. | ||
// | ||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | ||
// THE SOFTWARE. | ||
|
||
package dockerexternal | ||
|
||
// EtcdClusterOption configure an etcd cluster. | ||
type EtcdClusterOption interface { | ||
apply(opts *etcdClusterOptions) | ||
} | ||
|
||
type etcdClusterOptions struct { | ||
useBridge bool | ||
port int | ||
} | ||
|
||
// EtcdClusterUseBridge configures an EtcdNode to insert a networking "bridge" between the etcd container and the | ||
// calling processes. The bridge intercepts network traffic, and forwards it, unless told not to via e.g. Blackhole(). | ||
// See the bridge package. | ||
// As noted in that package, this implementation is lifted directly from the etcd/integration package; all credit goes | ||
// to etcd authors for the approach. | ||
func EtcdClusterUseBridge(shouldUseBridge bool) EtcdClusterOption { | ||
return useBridge(shouldUseBridge) | ||
} | ||
|
||
type useBridge bool | ||
|
||
func (u useBridge) apply(opts *etcdClusterOptions) { | ||
opts.useBridge = bool(u) | ||
} | ||
|
||
// EtcdClusterPort sets a specific port for etcd to listen on. Default is to listen on :0 (any free port). | ||
func EtcdClusterPort(port int) EtcdClusterOption { | ||
return withPort(port) | ||
} | ||
|
||
type withPort int | ||
|
||
func (w withPort) apply(opts *etcdClusterOptions) { | ||
opts.port = int(w) | ||
} |
Oops, something went wrong.