diff --git a/server/proxy/grpcproxy/util.go b/server/proxy/grpcproxy/util.go index 7e3d3193b31..669b04b3f87 100644 --- a/server/proxy/grpcproxy/util.go +++ b/server/proxy/grpcproxy/util.go @@ -37,7 +37,8 @@ func getAuthTokenFromClient(ctx context.Context) string { func withClientAuthToken(ctx, ctxWithToken context.Context) context.Context { token := getAuthTokenFromClient(ctxWithToken) if token != "" { - ctx = context.WithValue(ctx, rpctypes.TokenFieldNameGRPCKey{}, token) + md := metadata.Pairs(rpctypes.TokenFieldNameGRPC, token) + return metadata.NewOutgoingContext(ctx, md) } return ctx } diff --git a/server/proxy/grpcproxy/watch_broadcasts.go b/server/proxy/grpcproxy/watch_broadcasts.go index dacd3007d1d..26c7459cdb8 100644 --- a/server/proxy/grpcproxy/watch_broadcasts.go +++ b/server/proxy/grpcproxy/watch_broadcasts.go @@ -70,6 +70,7 @@ func (wbs *watchBroadcasts) coalesce(wb *watchBroadcast) { wbs.watchers[w] = wbswb } wb.receivers = nil + watchersCoalescing.Inc() } wbswb.mu.Unlock() wb.mu.Unlock() diff --git a/tests/e2e/etcd_grpcproxy_test.go b/tests/e2e/etcd_grpcproxy_test.go index 02174e89f62..0e109779dd2 100644 --- a/tests/e2e/etcd_grpcproxy_test.go +++ b/tests/e2e/etcd_grpcproxy_test.go @@ -17,6 +17,8 @@ package e2e import ( "context" "strings" + "sync" + "sync/atomic" "testing" "time" @@ -142,3 +144,97 @@ func waitForEndpointInLog(ctx context.Context, proxyProc *expect.ExpectProcess, return err } + +func TestGRPCProxyWatchersAfterTokenExpiry(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := e2e.NewEtcdProcessCluster(ctx, t, + e2e.WithClusterSize(1), + e2e.WithAuthTokenOpts("simple"), + e2e.WithAuthTokenTTL(1), + ) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, cluster.Stop()) }) + + cli := cluster.Etcdctl() + + createUsers(ctx, t, cli) + + require.NoError(t, cli.AuthEnable(ctx)) + + var ( + node1ClientURL = cluster.Procs[0].Config().ClientURL + proxyClientURL = "127.0.0.1:42379" + ) + + proxyProc, err := e2e.SpawnCmd([]string{ + e2e.BinPath.Etcd, "grpc-proxy", "start", + "--advertise-client-url", proxyClientURL, + "--listen-addr", proxyClientURL, + "--endpoints", node1ClientURL, + }, nil) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, proxyProc.Stop()) }) + + var totalEventsCount int64 + + handler := func(events clientv3.WatchChan) { + for { + select { + case ev, open := <-events: + if !open { + return + } + if ev.Err() != nil { + t.Logf("watch response error: %s", ev.Err()) + continue + } + atomic.AddInt64(&totalEventsCount, 1) + case <-ctx.Done(): + return + } + } + } + + withAuth := e2e.WithAuth("root", "rootPassword") + withEndpoint := e2e.WithEndpoints([]string{proxyClientURL}) + + events := cluster.Etcdctl(withAuth, withEndpoint).Watch(ctx, "/test", config.WatchOptions{Prefix: true, Revision: 1}) + + wg := sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + handler(events) + }() + + clusterCli := cluster.Etcdctl(withAuth) + require.NoError(t, clusterCli.Put(ctx, "/test/1", "test", config.PutOptions{})) + require.NoError(t, err) + + time.Sleep(time.Second * 2) + + events2 := cluster.Etcdctl(withAuth, withEndpoint).Watch(ctx, "/test", config.WatchOptions{Prefix: true, Revision: 1}) + + wg.Add(1) + go func() { + defer wg.Done() + handler(events2) + }() + + events3 := cluster.Etcdctl(withAuth, withEndpoint).Watch(ctx, "/test", config.WatchOptions{Prefix: true, Revision: 1}) + + wg.Add(1) + go func() { + defer wg.Done() + handler(events3) + }() + + time.Sleep(time.Second) + + cancel() + wg.Wait() + + assert.Equal(t, int64(3), atomic.LoadInt64(&totalEventsCount)) +} diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 4aff11b9d6f..fe1d57b1751 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -291,6 +291,10 @@ func WithAuthTokenOpts(token string) EPClusterOption { return func(c *EtcdProcessClusterConfig) { c.ServerConfig.AuthToken = token } } +func WithAuthTokenTTL(ttl uint) EPClusterOption { + return func(c *EtcdProcessClusterConfig) { c.ServerConfig.AuthTokenTTL = ttl } +} + func WithRollingStart(rolling bool) EPClusterOption { return func(c *EtcdProcessClusterConfig) { c.RollingStart = rolling } }