Skip to content

Commit

Permalink
Issue#398:fix the bug that decreasing replicas will make zookeeper un…
Browse files Browse the repository at this point in the history
…recoverable when zookeeper not running.
  • Loading branch information
hongchunhua committed Oct 14, 2021
1 parent d8b22d8 commit 019b3ec
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 13 deletions.
31 changes: 23 additions & 8 deletions pkg/controller/zookeepercluster/zookeepercluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ func (r *ReconcileZookeeperCluster) reconcileStatefulSet(instance *zookeeperv1be
foundSTSSize := *foundSts.Spec.Replicas
newSTSSize := *sts.Spec.Replicas
if newSTSSize != foundSTSSize {
// If zookeeper is not running, it must stop update replicas.
// Until zookeeper is running and the client connect it successfully, decreasing Replicas will take effect.
zkUri := utils.GetZkServiceUri(instance)
err = r.zkClient.Connect(zkUri)
if err != nil {
Expand All @@ -300,16 +302,25 @@ func (r *ReconcileZookeeperCluster) reconcileStatefulSet(instance *zookeeperv1be
if err != nil {
return fmt.Errorf("Error updating cluster size %s: %v", path, err)
}
// #398 if decrease node, remove node immediately after update node successfully.
// #398 if decrease node, remove node immediately after updating node successfully.
if newSTSSize < foundSTSSize {
var removes []string
config, _, err := r.zkClient.GetConfig()
if err != nil {
return fmt.Errorf("Error GetConfig %v", err)
}
r.log.Info("Get zookeeper config.", "Config: ", config)
for myid := newSTSSize + 1; myid <= foundSTSSize; myid++ {
removes = append(removes, strconv.Itoa(int(myid)))
if strings.Contains(config, "server."+strconv.Itoa(int(myid))+"=") {
removes = append(removes, strconv.Itoa(int(myid)))
}
}
r.log.Info("It will do reconfig to remove id:%s", strings.Join(removes, ","))
err := r.zkClient.ReconfigToRemove(removes)
// The node that have been removed witch reconfig alse can still provide services for all online clients.
// So We can remove it firstly, it will avoid to error that client maybe can't connect to server on preStop.
r.log.Info("Do reconfig to remove node.", "Remove ids", strings.Join(removes, ","))
err = r.zkClient.IncReconfig(nil, removes, -1)
if err != nil {
return fmt.Errorf("Error reconfig remove id:%s", strings.Join(removes, ","))
return fmt.Errorf("Error reconfig remove id:%s, %v", strings.Join(removes, ","), err)
}
}
}
Expand Down Expand Up @@ -632,7 +643,9 @@ func (r *ReconcileZookeeperCluster) reconcileClusterStatus(instance *zookeeperv1
instance.Status.Members.Unready = unreadyMembers

//If Cluster is in a ready state...
if instance.Spec.Replicas == instance.Status.ReadyReplicas && (!instance.Status.MetaRootCreated) {
// instance.Spec.Replicas is just an expected value that we set it, but it maybe not take effect by k8s.
// So we should check that instance.Status.Replicas is equal to ReadyReplicas, which means true true status of pods.
if instance.Status.Replicas == instance.Status.ReadyReplicas && (!instance.Status.MetaRootCreated) {
r.log.Info("Cluster is Ready, Creating ZK Metadata...")
zkUri := utils.GetZkServiceUri(instance)
err := r.zkClient.Connect(zkUri)
Expand All @@ -651,7 +664,7 @@ func (r *ReconcileZookeeperCluster) reconcileClusterStatus(instance *zookeeperv1
r.log.Info("Updating zookeeper status",
"StatefulSet.Namespace", instance.Namespace,
"StatefulSet.Name", instance.Name)
if instance.Status.ReadyReplicas == instance.Spec.Replicas {
if instance.Status.ReadyReplicas == instance.Status.Replicas {
instance.Status.SetPodsReadyConditionTrue()
} else {
instance.Status.SetPodsReadyConditionFalse()
Expand Down Expand Up @@ -781,7 +794,9 @@ func (r *ReconcileZookeeperCluster) getPVCCount(instance *zookeeperv1beta1.Zooke

func (r *ReconcileZookeeperCluster) cleanupOrphanPVCs(instance *zookeeperv1beta1.ZookeeperCluster) (err error) {
// this check should make sure we do not delete the PVCs before the STS has scaled down
if instance.Status.ReadyReplicas == instance.Spec.Replicas {
// instance.Spec.Replicas is just an expected value that we set it, but it maybe not take effect by k8s.
// So we should check that instance.Status.Replicas is equal to ReadyReplicas, which means true true status of pods.
if instance.Status.ReadyReplicas == instance.Status.Replicas {
pvcCount, err := r.getPVCCount(instance)
if err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,14 @@ func (client *MockZookeeperClient) NodeExists(zNodePath string) (version int32,
return 0, nil
}

func (client *MockZookeeperClient) ReconfigToRemove(leaving []string) (err error) {
func (client *MockZookeeperClient) IncReconfig(joining []string, leaving []string, version int64) (err error) {
return nil
}

func (client *MockZookeeperClient) GetConfig() (config string, version int32, err error) {
return "", 0, nil
}

func (client *MockZookeeperClient) Close() {
return
}
Expand Down
20 changes: 16 additions & 4 deletions pkg/zk/zookeeper_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@ type ZookeeperClient interface {
CreateNode(*v1beta1.ZookeeperCluster, string) error
NodeExists(string) (int32, error)
UpdateNode(string, string, int32) error
ReconfigToRemove([]string) error
IncReconfig([]string, []string, int64) error
GetConfig() (string, int32, error)
Close()
}

type DefaultZookeeperClient struct {
conn *zk.Conn
}

// zookeeper configure path
const ZOO_CONFIG_PATH = "/zookeeper/config"

func (client *DefaultZookeeperClient) Connect(zkUri string) (err error) {
host := []string{zkUri}
conn, _, err := zk.Connect(host, time.Second*5)
Expand Down Expand Up @@ -75,13 +79,21 @@ func (client *DefaultZookeeperClient) NodeExists(zNodePath string) (version int3
return zNodeStat.Version, err
}

func (client *DefaultZookeeperClient) ReconfigToRemove(leaving []string) (err error) {
if _, err := client.conn.IncrementalReconfig(nil, leaving, -1); err != nil {
return fmt.Errorf("Failed to remove node:%s, err:%v", strings.Join(leaving, ","), err)
func (client *DefaultZookeeperClient) IncReconfig(joining []string, leaving []string, version int64) (err error) {
if _, err := client.conn.IncrementalReconfig(joining, leaving, version); err != nil {
return fmt.Errorf("Failed to reconfig node:%s, err:%v", strings.Join(leaving, ","), err)
}
return nil
}

func (client *DefaultZookeeperClient) GetConfig() (config string, version int32, err error) {
data, stat, err := client.conn.Get(ZOO_CONFIG_PATH)
if err != nil {
return "", -1, fmt.Errorf("Get config %s error, err:%v", ZOO_CONFIG_PATH, err)
}
return string(data), stat.Version, nil
}

func (client *DefaultZookeeperClient) Close() {
client.conn.Close()
}

0 comments on commit 019b3ec

Please sign in to comment.