Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: 优化gke扩容时节点IP的获取 #3406

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,14 @@ var (
SuccessNodeIDsKey ParamKey = "successNodeIDs"
// FailedNodeIDsKey xxx
FailedNodeIDsKey ParamKey = "failedNodeIDs"
// FailureNodeIDsKey xxx
FailureNodeIDsKey ParamKey = "failureNodeIDs"
// TimeoutNodeIDsKey xxx
TimeoutNodeIDsKey ParamKey = "timeoutNodeIDs"

// FailureReason xxx
FailureReason ParamKey = "failureReason"

// SuccessAddClusterNodeIDsKey xxx
// cloud cluster success & failed Instance
SuccessAddClusterNodeIDsKey ParamKey = "successAddClusterNodeIDs"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,14 +251,26 @@ func recordClusterInstanceToDB(ctx context.Context, state *cloudprovider.TaskSta
state.Task.CommonParams = make(map[string]string)
}

if len(instancesNames) > 0 {
state.Task.CommonParams[cloudprovider.SuccessNodeIDsKey.String()] = strings.Join(instancesNames, ",")
state.Task.CommonParams[cloudprovider.NodeNamesKey.String()] = strings.Join(instancesNames, ",")
state.Task.CommonParams[cloudprovider.NodeIDsKey.String()] = strings.Join(instancesNames, ",")
successIns, failureIns, err := checkInstance(client, instancesNames)
if err != nil {
_ = returnGkeInstancesAndCleanNodes(ctx, info, instancesNames)
blog.Errorf("recordClusterInstanceToDB[%s] checkInstance failed, %v, successInstances[%+v],"+
" failureInstances[%+v]", taskID, err, successIns, failureIns)
state.Task.CommonParams[cloudprovider.SuccessNodeIDsKey.String()] = strings.Join(successIns, ",")
state.Task.CommonParams[cloudprovider.FailureNodeIDsKey.String()] = strings.Join(failureIns, ",")
state.Task.CommonParams[cloudprovider.FailureReason.String()] = err.Error()
return fmt.Errorf("checkInstance failed, %v, successInstances[%+v], failureInstances[%+v]",
err, successIns, failureIns)
}

if len(successIns) > 0 {
state.Task.CommonParams[cloudprovider.SuccessNodeIDsKey.String()] = strings.Join(successIns, ",")
state.Task.CommonParams[cloudprovider.NodeNamesKey.String()] = strings.Join(successIns, ",")
state.Task.CommonParams[cloudprovider.NodeIDsKey.String()] = strings.Join(successIns, ",")
}

// record successNodes to cluster manager DB
nodeIPs, err := transInstancesToNode(ctx, instancesNames, info)
nodeIPs, err := transInstancesToNode(ctx, successIns, info)
if err != nil {
blog.Errorf("recordClusterInstanceToDB[%s] failed: %v", taskID, err)
}
Expand All @@ -271,14 +283,16 @@ func recordClusterInstanceToDB(ctx context.Context, state *cloudprovider.TaskSta
return nil
}

func checkInstance(client *api.ComputeServiceClient, ids []string) error {
func checkInstance(client *api.ComputeServiceClient, ids []string) ([]string, []string, error) {
successIns, failureIns := make([]string, 0), make([]string, 0)
timeCtx, cancel := context.WithTimeout(context.TODO(), 5*time.Minute)
defer cancel()
err := loop.LoopDoFunc(timeCtx, func() error {
running, failed := make([]string, 0), make([]string, 0)
insList, err := client.ListZoneInstanceWithFilter(context.Background(), api.InstanceNameFilter(ids))
if err != nil {
blog.Errorf("checkInstance ListZoneInstanceWithFilter failed, %s", err.Error())
return err
return nil
}

// check response data
Expand All @@ -291,18 +305,60 @@ func checkInstance(client *api.ComputeServiceClient, ids []string) error {

for _, in := range insList.Items {
if len(in.NetworkInterfaces[0].NetworkIP) == 0 {
failed = append(failed, in.Name)
blog.Warnf("checkInstance[%s] IP is still not distributed", in.Name)
return nil
continue
}
running = append(running, in.Name)
}

return loop.EndLoop
successIns = running
failureIns = failed

if len(failed) == 0 {
return loop.EndLoop
}

return nil
})
if err != nil {
return err
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return successIns, failureIns, err
}
if errors.Is(err, context.DeadlineExceeded) {
running, failed := make([]string, 0), make([]string, 0)
insList, err := client.ListZoneInstanceWithFilter(context.Background(), api.InstanceNameFilter(ids))
if err != nil {
blog.Errorf("checkInstance ListZoneInstanceWithFilter failed, %s", err.Error())
return nil, nil, err
}

return nil
// check response data
if len(insList.Items) != len(ids) {
blog.Warnf("checkInstance desired %d, but got %d, instances[%+v]", len(ids), len(insList.Items),
insList.Items)
return nil, nil, fmt.Errorf("checkInstance desired %d, but got %d", len(ids), len(insList.Items))
}

blog.Infof("checkInstance desired %d, response %d", len(ids), len(insList.Items))

for _, in := range insList.Items {
if len(in.NetworkInterfaces[0].NetworkIP) == 0 {
failed = append(failed, in.Name)
blog.Warnf("checkInstance[%s] IP is still not distributed", in.Name)
continue
}
running = append(running, in.Name)
}

successIns = running
failureIns = failed

if len(failureIns) > 0 {
return successIns, failureIns, fmt.Errorf("failed to get all instance ip")
}
}

return successIns, failureIns, nil
}

// transInstancesToNode record success nodes to cm DB
Expand All @@ -314,17 +370,6 @@ func transInstancesToNode(ctx context.Context, instanceNames []string, info *clo
nodeIPs = make([]string, 0)
err error
)
client, err := api.NewComputeServiceClient(info.CmOption)
if err != nil {
blog.Errorf("transInstanceIDsToNodes create ComputeServiceClient failed, %s", err.Error())
return nil, err
}

err = checkInstance(client, instanceNames)
if err != nil {
blog.Errorf("transInstanceIDsToNodes checkInstance failed, %s", err.Error())
return nil, err
}

taskID := cloudprovider.GetTaskIDFromContext(ctx)
err = retry.Do(func() error {
Expand Down Expand Up @@ -451,7 +496,11 @@ func checkClusterInstanceStatus(ctx context.Context, info *cloudprovider.CloudDe

// set cluster node status
for _, n := range addFailureNodes {
err = cloudprovider.UpdateNodeStatusByInstanceID(n, common.StatusAddNodesFailed)
node, err := cloudprovider.GetStorageModel().GetNodeByName(ctx, info.Cluster.ClusterID, n)
if err != nil {
blog.Errorf("checkClusterInstanceStatus[%s] GetNodeByName[%s] failed: %v", taskID, n, err)
}
err = cloudprovider.UpdateNodeStatusByInstanceID(node.NodeID, common.StatusAddNodesFailed)
if err != nil {
blog.Errorf("checkClusterInstanceStatus[%s] UpdateNodeStatusByInstanceID[%s] failed: %v", taskID, n, err)
}
Expand Down
Loading