From 019b3ec946388bf5a31ae3612f64f33254c9f06d Mon Sep 17 00:00:00 2001 From: hongchunhua Date: Thu, 14 Oct 2021 15:57:46 +0800 Subject: [PATCH] Issue#398:fix the bug that decreasing replicas will make zookeeper unrecoverable when zookeeper not running. --- .../zookeepercluster_controller.go | 31 ++++++++++++++----- .../zookeepercluster_controller_test.go | 6 +++- pkg/zk/zookeeper_client.go | 20 +++++++++--- 3 files changed, 44 insertions(+), 13 deletions(-) diff --git a/pkg/controller/zookeepercluster/zookeepercluster_controller.go b/pkg/controller/zookeepercluster/zookeepercluster_controller.go index a03afa65c..8c4edcbda 100644 --- a/pkg/controller/zookeepercluster/zookeepercluster_controller.go +++ b/pkg/controller/zookeepercluster/zookeepercluster_controller.go @@ -280,6 +280,8 @@ func (r *ReconcileZookeeperCluster) reconcileStatefulSet(instance *zookeeperv1be foundSTSSize := *foundSts.Spec.Replicas newSTSSize := *sts.Spec.Replicas if newSTSSize != foundSTSSize { + // If zookeeper is not running, it must stop update replicas. + // Until zookeeper is running and the client connect it successfully, decreasing Replicas will take effect. zkUri := utils.GetZkServiceUri(instance) err = r.zkClient.Connect(zkUri) if err != nil { @@ -300,16 +302,25 @@ func (r *ReconcileZookeeperCluster) reconcileStatefulSet(instance *zookeeperv1be if err != nil { return fmt.Errorf("Error updating cluster size %s: %v", path, err) } - // #398 if decrease node, remove node immediately after update node successfully. + // #398 if decrease node, remove node immediately after updating node successfully. if newSTSSize < foundSTSSize { var removes []string + config, _, err := r.zkClient.GetConfig() + if err != nil { + return fmt.Errorf("Error GetConfig %v", err) + } + r.log.Info("Get zookeeper config.", "Config: ", config) for myid := newSTSSize + 1; myid <= foundSTSSize; myid++ { - removes = append(removes, strconv.Itoa(int(myid))) + if strings.Contains(config, "server."+strconv.Itoa(int(myid))+"=") { + removes = append(removes, strconv.Itoa(int(myid))) + } } - r.log.Info("It will do reconfig to remove id:%s", strings.Join(removes, ",")) - err := r.zkClient.ReconfigToRemove(removes) + // The node that have been removed witch reconfig alse can still provide services for all online clients. + // So We can remove it firstly, it will avoid to error that client maybe can't connect to server on preStop. + r.log.Info("Do reconfig to remove node.", "Remove ids", strings.Join(removes, ",")) + err = r.zkClient.IncReconfig(nil, removes, -1) if err != nil { - return fmt.Errorf("Error reconfig remove id:%s", strings.Join(removes, ",")) + return fmt.Errorf("Error reconfig remove id:%s, %v", strings.Join(removes, ","), err) } } } @@ -632,7 +643,9 @@ func (r *ReconcileZookeeperCluster) reconcileClusterStatus(instance *zookeeperv1 instance.Status.Members.Unready = unreadyMembers //If Cluster is in a ready state... - if instance.Spec.Replicas == instance.Status.ReadyReplicas && (!instance.Status.MetaRootCreated) { + // instance.Spec.Replicas is just an expected value that we set it, but it maybe not take effect by k8s. + // So we should check that instance.Status.Replicas is equal to ReadyReplicas, which means true true status of pods. + if instance.Status.Replicas == instance.Status.ReadyReplicas && (!instance.Status.MetaRootCreated) { r.log.Info("Cluster is Ready, Creating ZK Metadata...") zkUri := utils.GetZkServiceUri(instance) err := r.zkClient.Connect(zkUri) @@ -651,7 +664,7 @@ func (r *ReconcileZookeeperCluster) reconcileClusterStatus(instance *zookeeperv1 r.log.Info("Updating zookeeper status", "StatefulSet.Namespace", instance.Namespace, "StatefulSet.Name", instance.Name) - if instance.Status.ReadyReplicas == instance.Spec.Replicas { + if instance.Status.ReadyReplicas == instance.Status.Replicas { instance.Status.SetPodsReadyConditionTrue() } else { instance.Status.SetPodsReadyConditionFalse() @@ -781,7 +794,9 @@ func (r *ReconcileZookeeperCluster) getPVCCount(instance *zookeeperv1beta1.Zooke func (r *ReconcileZookeeperCluster) cleanupOrphanPVCs(instance *zookeeperv1beta1.ZookeeperCluster) (err error) { // this check should make sure we do not delete the PVCs before the STS has scaled down - if instance.Status.ReadyReplicas == instance.Spec.Replicas { + // instance.Spec.Replicas is just an expected value that we set it, but it maybe not take effect by k8s. + // So we should check that instance.Status.Replicas is equal to ReadyReplicas, which means true true status of pods. + if instance.Status.ReadyReplicas == instance.Status.Replicas { pvcCount, err := r.getPVCCount(instance) if err != nil { return err diff --git a/pkg/controller/zookeepercluster/zookeepercluster_controller_test.go b/pkg/controller/zookeepercluster/zookeepercluster_controller_test.go index 405b04a31..9735dd40b 100644 --- a/pkg/controller/zookeepercluster/zookeepercluster_controller_test.go +++ b/pkg/controller/zookeepercluster/zookeepercluster_controller_test.go @@ -59,10 +59,14 @@ func (client *MockZookeeperClient) NodeExists(zNodePath string) (version int32, return 0, nil } -func (client *MockZookeeperClient) ReconfigToRemove(leaving []string) (err error) { +func (client *MockZookeeperClient) IncReconfig(joining []string, leaving []string, version int64) (err error) { return nil } +func (client *MockZookeeperClient) GetConfig() (config string, version int32, err error) { + return "", 0, nil +} + func (client *MockZookeeperClient) Close() { return } diff --git a/pkg/zk/zookeeper_client.go b/pkg/zk/zookeeper_client.go index b04e416d3..da1eeea9e 100644 --- a/pkg/zk/zookeeper_client.go +++ b/pkg/zk/zookeeper_client.go @@ -24,7 +24,8 @@ type ZookeeperClient interface { CreateNode(*v1beta1.ZookeeperCluster, string) error NodeExists(string) (int32, error) UpdateNode(string, string, int32) error - ReconfigToRemove([]string) error + IncReconfig([]string, []string, int64) error + GetConfig() (string, int32, error) Close() } @@ -32,6 +33,9 @@ type DefaultZookeeperClient struct { conn *zk.Conn } +// zookeeper configure path +const ZOO_CONFIG_PATH = "/zookeeper/config" + func (client *DefaultZookeeperClient) Connect(zkUri string) (err error) { host := []string{zkUri} conn, _, err := zk.Connect(host, time.Second*5) @@ -75,13 +79,21 @@ func (client *DefaultZookeeperClient) NodeExists(zNodePath string) (version int3 return zNodeStat.Version, err } -func (client *DefaultZookeeperClient) ReconfigToRemove(leaving []string) (err error) { - if _, err := client.conn.IncrementalReconfig(nil, leaving, -1); err != nil { - return fmt.Errorf("Failed to remove node:%s, err:%v", strings.Join(leaving, ","), err) +func (client *DefaultZookeeperClient) IncReconfig(joining []string, leaving []string, version int64) (err error) { + if _, err := client.conn.IncrementalReconfig(joining, leaving, version); err != nil { + return fmt.Errorf("Failed to reconfig node:%s, err:%v", strings.Join(leaving, ","), err) } return nil } +func (client *DefaultZookeeperClient) GetConfig() (config string, version int32, err error) { + data, stat, err := client.conn.Get(ZOO_CONFIG_PATH) + if err != nil { + return "", -1, fmt.Errorf("Get config %s error, err:%v", ZOO_CONFIG_PATH, err) + } + return string(data), stat.Version, nil +} + func (client *DefaultZookeeperClient) Close() { client.conn.Close() }