diff --git a/core/xservice/client.go b/core/xservice/client.go index 62325e2..9e874ac 100644 --- a/core/xservice/client.go +++ b/core/xservice/client.go @@ -29,11 +29,8 @@ func newClient(opts *Options) Client { } if os.Getenv(core.EnvEtcd) != "" { - cli, err := serviceEtcdClient() - if err != nil { - log.Fatal("etcd client", zap.Error(err)) - } - client.resolver, err = resolver.NewBuilder(cli) + var err error + client.resolver, err = resolver.NewBuilder(serviceEtcdClient()) if err != nil { log.Fatal("endpoints manager", zap.Error(err)) } diff --git a/core/xservice/server.go b/core/xservice/server.go index 865036b..f9667d9 100644 --- a/core/xservice/server.go +++ b/core/xservice/server.go @@ -365,11 +365,7 @@ func (t *serverImpl) registerGrpcServiceEtcd() { cancel() // deregister log.Debug("deregister service") - client, err := serviceEtcdClient() - if err != nil { - return - } - + client := serviceEtcdClient() em, _ := endpoints.NewManager(client, core.ServiceRegisterKeyPrefix) for _, service := range t.grpcServices { @@ -390,11 +386,7 @@ func (t *serverImpl) doRegisterGrpcServiceEtcd(ctx context.Context) { } }() - client, err := serviceEtcdClient() - if err != nil { - l.Error("get client", zap.Error(err)) - return - } + client := serviceEtcdClient() ticker := time.NewTicker(time.Second * 5) defer ticker.Stop() @@ -437,7 +429,7 @@ func (t *serverImpl) doRegisterGrpcServiceEtcd(ctx context.Context) { } } } else { - _, err = lease.KeepAliveOnce(context.Background(), id) + _, err := lease.KeepAliveOnce(context.Background(), id) if err != nil { id = 0 } diff --git a/core/xservice/service.go b/core/xservice/service.go index 8f7ea1e..7e8c258 100644 --- a/core/xservice/service.go +++ b/core/xservice/service.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "strings" + "sync" "time" clientv3 "go.etcd.io/etcd/client/v3" @@ -15,34 +16,48 @@ import ( "github.com/xinpianchang/xservice/pkg/log" ) +// serviceKey get full service key func serviceKey(serviceName string, desc *grpc.ServiceDesc) string { host, _ := os.Hostname() + pid := os.Getpid() if host == "" { - host = fmt.Sprint("unknown-host-pid-", os.Getpid()) + host = "unknown-host" } - return fmt.Sprint(serviceKeyPrefix(serviceName, desc), "/", host) + return fmt.Sprint(serviceKeyPrefix(serviceName, desc), "/", host, "-pid-", pid) } +// serviceKeyPrefix get service key prefix func serviceKeyPrefix(serviceName string, desc *grpc.ServiceDesc) string { return fmt.Sprint(core.ServiceRegisterKeyPrefix, "/", serviceName, "/", desc.ServiceName) } -func serviceEtcdClient() (*clientv3.Client, error) { - endpoints := strings.Split(os.Getenv(core.EnvEtcd), ",") +var ( + _etcdClient *clientv3.Client + _etcdClientOnce sync.Once +) - cfg := clientv3.Config{ - Endpoints: endpoints, - DialTimeout: time.Second * 5, - DialKeepAliveTime: time.Second * 5, - AutoSyncInterval: time.Second * 10, - Logger: log.Get().WithOptions(zap.IncreaseLevel(zapcore.ErrorLevel)), - } - if username := os.Getenv(core.EnvEtcdUser); username != "" { - cfg.Username = username - } - if password := os.Getenv(core.EnvEtcdPassword); password != "" { - cfg.Password = password - } - client, err := clientv3.New(cfg) - return client, err +// serviceEtcdClient lazy init etcd client +func serviceEtcdClient() *clientv3.Client { + _etcdClientOnce.Do(func() { + endpoints := strings.Split(os.Getenv(core.EnvEtcd), ",") + cfg := clientv3.Config{ + Endpoints: endpoints, + DialTimeout: time.Second * 5, + DialKeepAliveTime: time.Second * 10, + AutoSyncInterval: time.Second * 30, + Logger: log.Get().WithOptions(zap.IncreaseLevel(zapcore.ErrorLevel)), + } + if username := os.Getenv(core.EnvEtcdUser); username != "" { + cfg.Username = username + } + if password := os.Getenv(core.EnvEtcdPassword); password != "" { + cfg.Password = password + } + client, err := clientv3.New(cfg) + if err != nil { + log.Fatal("serviceEtcdClient", zap.Error(err)) + } + _etcdClient = client + }) + return _etcdClient }