Skip to content

Commit

Permalink
Use context.WithTimeoutCause and context.WithCancelCause for better r…
Browse files Browse the repository at this point in the history
…eadability
  • Loading branch information
Amulyam24 committed Jan 22, 2025
1 parent 2f5d70a commit 56f27d6
Show file tree
Hide file tree
Showing 12 changed files with 33 additions and 31 deletions.
2 changes: 1 addition & 1 deletion cmd/clusterctl/client/repository/repository_gitlab.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (g *gitLabRepository) GetFile(ctx context.Context, version, path string) ([
return content, nil
}

timeoutctx, cancel := context.WithTimeout(ctx, 30*time.Second)
timeoutctx, cancel := context.WithTimeoutCause(ctx, 30*time.Second, errors.New("http request timeout expired"))
defer cancel()
request, err := http.NewRequestWithContext(timeoutctx, http.MethodGet, url, http.NoBody)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions controllers/clustercache/cluster_accessor_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func createCachedClient(ctx context.Context, clusterAccessorConfig *clusterAcces

// Use a context that is independent of the passed in context, so the cache doesn't get stopped
// when the passed in context is canceled.
cacheCtx, cacheCtxCancel := context.WithCancel(context.Background())
cacheCtx, cacheCtxCancel := context.WithCancelCause(context.Background())

// We need to be able to stop the cache's shared informers, so wrap this in a stoppableCache.
cache := &stoppableCache{
Expand Down Expand Up @@ -261,7 +261,7 @@ func createCachedClient(ctx context.Context, clusterAccessorConfig *clusterAcces
go cache.Start(cacheCtx) //nolint:errcheck

// Wait until the cache is initially synced.
cacheSyncCtx, cacheSyncCtxCancel := context.WithTimeout(ctx, clusterAccessorConfig.Cache.InitialSyncTimeout)
cacheSyncCtx, cacheSyncCtxCancel := context.WithTimeoutCause(ctx, clusterAccessorConfig.Cache.InitialSyncTimeout, errors.New("initial sync timeout expired"))
defer cacheSyncCtxCancel()
if !cache.WaitForCacheSync(cacheSyncCtx) {
cache.Stop()
Expand Down Expand Up @@ -297,13 +297,13 @@ type clientWithTimeout struct {
var _ client.Client = &clientWithTimeout{}

func (c clientWithTimeout) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error {
ctx, cancel := context.WithTimeout(ctx, c.timeout)
ctx, cancel := context.WithTimeoutCause(ctx, c.timeout, errors.New("call timeout expired"))
defer cancel()
return c.Client.Get(ctx, key, obj, opts...)
}

func (c clientWithTimeout) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
ctx, cancel := context.WithTimeout(ctx, c.timeout)
ctx, cancel := context.WithTimeoutCause(ctx, c.timeout, errors.New("call timeout expired"))
defer cancel()
return c.Client.List(ctx, list, opts...)
}
Expand All @@ -314,7 +314,7 @@ type stoppableCache struct {

lock sync.Mutex
stopped bool
cancelFunc context.CancelFunc
cancelFunc context.CancelCauseFunc
}

// Stop cancels the cache.Cache's context, unless it has already been stopped.
Expand All @@ -327,5 +327,5 @@ func (cc *stoppableCache) Stop() {
}

cc.stopped = true
cc.cancelFunc()
cc.cancelFunc(errors.New("stoppable cache context cancelled"))
}
5 changes: 3 additions & 2 deletions controllers/remote/cluster_cache_healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

. "github.com/onsi/gomega"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -106,7 +107,7 @@ func TestClusterCacheHealthCheck(t *testing.T) {

testClusterKey = util.ObjectKey(testCluster)

_, cancel := context.WithCancel(ctx)
_, cancel := context.WithCancelCause(ctx)
cc = &stoppableCache{cancelFunc: cancel}
cct.clusterAccessors[testClusterKey] = &clusterAccessor{cache: cc}

Expand All @@ -123,7 +124,7 @@ func TestClusterCacheHealthCheck(t *testing.T) {
t.Log("Deleting Namespace")
g.Expect(env.Delete(ctx, ns)).To(Succeed())
t.Log("Stopping the manager")
cc.cancelFunc()
cc.cancelFunc(errors.New("context cancelled"))
mgrCancel()
}

Expand Down
8 changes: 4 additions & 4 deletions controllers/remote/cluster_cache_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ func (t *ClusterCacheTracker) createCachedClient(ctx context.Context, config *re

// Use a context that is independent of the passed in context, so the cache doesn't get stopped
// when the passed in context is canceled.
cacheCtx, cacheCtxCancel := context.WithCancel(context.Background())
cacheCtx, cacheCtxCancel := context.WithCancelCause(context.Background())

// We need to be able to stop the cache's shared informers, so wrap this in a stoppableCache.
cache := &stoppableCache{
Expand Down Expand Up @@ -547,7 +547,7 @@ func (t *ClusterCacheTracker) createCachedClient(ctx context.Context, config *re
go cache.Start(cacheCtx) //nolint:errcheck

// Wait until the cache is initially synced
cacheSyncCtx, cacheSyncCtxCancel := context.WithTimeout(ctx, initialCacheSyncTimeout)
cacheSyncCtx, cacheSyncCtxCancel := context.WithTimeoutCause(ctx, initialCacheSyncTimeout, errors.New("initial sync timeout expired"))
defer cacheSyncCtxCancel()
if !cache.WaitForCacheSync(cacheSyncCtx) {
cache.Stop()
Expand Down Expand Up @@ -774,13 +774,13 @@ type clientWithTimeout struct {
var _ client.Client = &clientWithTimeout{}

func (c clientWithTimeout) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error {
ctx, cancel := context.WithTimeout(ctx, c.timeout)
ctx, cancel := context.WithTimeoutCause(ctx, c.timeout, errors.New("call timeout expired"))
defer cancel()
return c.Client.Get(ctx, key, obj, opts...)
}

func (c clientWithTimeout) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
ctx, cancel := context.WithTimeout(ctx, c.timeout)
ctx, cancel := context.WithTimeoutCause(ctx, c.timeout, errors.New("call timeout expired"))
defer cancel()
return c.Client.List(ctx, list, opts...)
}
5 changes: 3 additions & 2 deletions controllers/remote/stoppable_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"sync"

"github.com/pkg/errors"
"sigs.k8s.io/controller-runtime/pkg/cache"
)

Expand All @@ -29,7 +30,7 @@ type stoppableCache struct {

lock sync.Mutex
stopped bool
cancelFunc context.CancelFunc
cancelFunc context.CancelCauseFunc
}

// Stop cancels the cache.Cache's context, unless it has already been stopped.
Expand All @@ -42,5 +43,5 @@ func (cc *stoppableCache) Stop() {
}

cc.stopped = true
cc.cancelFunc()
cc.cancelFunc(errors.New("stoppable cache context cancelled"))
}
12 changes: 6 additions & 6 deletions controlplane/kubeadm/internal/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func newEtcdClient(ctx context.Context, etcdClient etcd, callTimeout time.Durati
return nil, errors.New("etcd client was not configured with any endpoints")
}

ctx, cancel := context.WithTimeout(ctx, callTimeout)
ctx, cancel := context.WithTimeoutCause(ctx, callTimeout, errors.New("call timeout expired"))
defer cancel()

status, err := etcdClient.Status(ctx, endpoints[0])
Expand All @@ -200,7 +200,7 @@ func (c *Client) Close() error {

// Members retrieves a list of etcd members.
func (c *Client) Members(ctx context.Context) ([]*Member, error) {
ctx, cancel := context.WithTimeout(ctx, c.CallTimeout)
ctx, cancel := context.WithTimeoutCause(ctx, c.CallTimeout, errors.New("call timeout expired"))
defer cancel()

response, err := c.EtcdClient.MemberList(ctx)
Expand Down Expand Up @@ -231,7 +231,7 @@ func (c *Client) Members(ctx context.Context) ([]*Member, error) {

// MoveLeader moves the leader to the provided member ID.
func (c *Client) MoveLeader(ctx context.Context, newLeaderID uint64) error {
ctx, cancel := context.WithTimeout(ctx, c.CallTimeout)
ctx, cancel := context.WithTimeoutCause(ctx, c.CallTimeout, errors.New("call timeout expired"))
defer cancel()

_, err := c.EtcdClient.MoveLeader(ctx, newLeaderID)
Expand All @@ -240,7 +240,7 @@ func (c *Client) MoveLeader(ctx context.Context, newLeaderID uint64) error {

// RemoveMember removes a given member.
func (c *Client) RemoveMember(ctx context.Context, id uint64) error {
ctx, cancel := context.WithTimeout(ctx, c.CallTimeout)
ctx, cancel := context.WithTimeoutCause(ctx, c.CallTimeout, errors.New("call timeout expired"))
defer cancel()

_, err := c.EtcdClient.MemberRemove(ctx, id)
Expand All @@ -249,7 +249,7 @@ func (c *Client) RemoveMember(ctx context.Context, id uint64) error {

// UpdateMemberPeerURLs updates the list of peer URLs.
func (c *Client) UpdateMemberPeerURLs(ctx context.Context, id uint64, peerURLs []string) ([]*Member, error) {
ctx, cancel := context.WithTimeout(ctx, c.CallTimeout)
ctx, cancel := context.WithTimeoutCause(ctx, c.CallTimeout, errors.New("call timeout expired"))
defer cancel()

response, err := c.EtcdClient.MemberUpdate(ctx, id, peerURLs)
Expand All @@ -267,7 +267,7 @@ func (c *Client) UpdateMemberPeerURLs(ctx context.Context, id uint64, peerURLs [

// Alarms retrieves all alarms on a cluster.
func (c *Client) Alarms(ctx context.Context) ([]MemberAlarm, error) {
ctx, cancel := context.WithTimeout(ctx, c.CallTimeout)
ctx, cancel := context.WithTimeoutCause(ctx, c.CallTimeout, errors.New("call timeout expired"))
defer cancel()

alarmResponse, err := c.EtcdClient.AlarmList(ctx)
Expand Down
2 changes: 1 addition & 1 deletion exp/runtime/internal/controllers/warmup.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (r *warmupRunnable) Start(ctx context.Context) error {
if r.warmupTimeout == 0 {
r.warmupTimeout = defaultWarmupTimeout
}
ctx, cancel := context.WithTimeout(ctx, r.warmupTimeout)
ctx, cancel := context.WithTimeoutCause(ctx, r.warmupTimeout, errors.New("warmup timeout expired"))
defer cancel()

err := wait.PollUntilContextTimeout(ctx, r.warmupInterval, r.warmupTimeout, true, func(ctx context.Context) (done bool, err error) {
Expand Down
2 changes: 1 addition & 1 deletion hack/tools/internal/log-push/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func getLogsFromGCS(ctx context.Context, logPath string, logFileRegex *regexp.Re
klog.Infof("Getting logs from gs://%s/%s", bucket, folder)

// Set timeout.
ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
ctx, cancel := context.WithTimeoutCause(ctx, 2*time.Minute, errors.New("new client timeout expired"))
defer cancel()

// Create GCS client.
Expand Down
6 changes: 3 additions & 3 deletions hack/tools/internal/tilt-prepare/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,8 @@ func runTaskGroup(ctx context.Context, name string, tasks map[string]taskFunctio
}(time.Now())

// Create a context to be used for canceling all the tasks when another fails.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(errors.New("run task group context cancelled"))

// Make channels to pass fatal errors in WaitGroup
errors := make(chan error)
Expand All @@ -483,7 +483,7 @@ func runTaskGroup(ctx context.Context, name string, tasks map[string]taskFunctio
break
case err := <-errors:
// cancel all the running tasks
cancel()
cancel(err)
// consumes all the errors from the channel
errList := []error{err}
Loop:
Expand Down
2 changes: 1 addition & 1 deletion internal/controllers/machine/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (d *Helper) EvictPods(ctx context.Context, podDeleteList *PodDeleteList) Ev

// Trigger evictions for at most 10s. We'll continue on the next reconcile if we hit the timeout.
evictionTimeout := 10 * time.Second
ctx, cancel := context.WithTimeout(ctx, evictionTimeout)
ctx, cancel := context.WithTimeoutCause(ctx, evictionTimeout, errors.New("eviction timeout expired"))
defer cancel()

res := EvictionResult{
Expand Down
2 changes: 1 addition & 1 deletion internal/runtime/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func httpCall(ctx context.Context, request, response runtime.Object, opts *httpC
extensionURL.RawQuery = values.Encode()

var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, opts.timeout)
ctx, cancel = context.WithTimeoutCause(ctx, opts.timeout, errors.New("http request timeout expired"))
defer cancel()
}

Expand Down
6 changes: 3 additions & 3 deletions internal/test/envtest/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ type Environment struct {
Config *rest.Config

env *envtest.Environment
cancelManager context.CancelFunc
cancelManager context.CancelCauseFunc
}

// newEnvironment creates a new environment spinning up a local api-server.
Expand Down Expand Up @@ -376,7 +376,7 @@ func newEnvironment(uncachedObjs ...client.Object) *Environment {

// start starts the manager.
func (e *Environment) start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancelCause(ctx)
e.cancelManager = cancel

go func() {
Expand All @@ -392,7 +392,7 @@ func (e *Environment) start(ctx context.Context) {
// stop stops the test environment.
func (e *Environment) stop() error {
fmt.Println("Stopping the test environment")
e.cancelManager()
e.cancelManager(errors.New("test environment context cancelled"))
return e.env.Stop()
}

Expand Down

0 comments on commit 56f27d6

Please sign in to comment.