Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
HuSharp committed Nov 10, 2023
1 parent 6de4871 commit 672470f
Showing 8 changed files with 445 additions and 207 deletions.
16 changes: 14 additions & 2 deletions cmd/http-service/idl/api/service.proto
Original file line number Diff line number Diff line change
@@ -1066,7 +1066,13 @@ message StopBackupReq {
string backup_id = 2 [(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = {description: "The unique ID of the backup."}];
}

message StopBackupResp {}
message StopBackupResp {
bool success = 1 [(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = {
description: "Whether the request is ssuccessful.",
example: "true"
}];
optional string message = 2 [(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = {description: "The message of the response."}];
}

message StopRestoreReq {
option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_schema) = {
@@ -1084,7 +1090,13 @@ message StopRestoreReq {
string restore_id = 2 [(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = {description: "The unique ID of the restore."}];
}

message StopRestoreResp {}
message StopRestoreResp {
bool success = 1 [(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = {
description: "Whether the request is ssuccessful.",
example: "true"
}];
optional string message = 2 [(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = {description: "The message of the response."}];
}

message DeleteBackupReq {
option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_schema) = {
429 changes: 242 additions & 187 deletions cmd/http-service/pbgen/api/service.pb.go

Large diffs are not rendered by default.

26 changes: 24 additions & 2 deletions cmd/http-service/pbgen/oas/openapi-spec.swagger.json
Original file line number Diff line number Diff line change
@@ -1239,10 +1239,32 @@
"title": "ResumeClusterResp"
},
"apiStopBackupResp": {
"type": "object"
"type": "object",
"properties": {
"success": {
"type": "boolean",
"example": true,
"description": "Whether the request is ssuccessful."
},
"message": {
"type": "string",
"description": "The message of the response."
}
}
},
"apiStopRestoreResp": {
"type": "object"
"type": "object",
"properties": {
"success": {
"type": "boolean",
"example": true,
"description": "Whether the request is ssuccessful."
},
"message": {
"type": "string",
"description": "The message of the response."
}
}
},
"apiTiDBMember": {
"type": "object",
151 changes: 149 additions & 2 deletions cmd/http-service/server/backup.go
Original file line number Diff line number Diff line change
@@ -359,6 +359,20 @@ func (s *ClusterServer) GetBackup(ctx context.Context, req *api.GetBackupReq) (*
}

info := convertToBackupInfo(backup)
// get job status
kubeCli := s.KubeClient.GetKubeClient(k8sID)
_, err = kubeCli.BatchV1().Jobs(backup.GetNamespace()).Get(ctx, backup.GetBackupJobName(), metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
logger.Info("Backup Job not found, Backup Job is Stopped.", zap.Error(err))
} else {
logger.Error("Get backup job failed", zap.Error(err))
message := fmt.Sprintf("Get backup job failed: %s", err.Error())
setResponseStatusCodes(ctx, http.StatusInternalServerError)
return &api.GetBackupResp{Success: false, Message: &message}, nil
}
}

return &api.GetBackupResp{Success: true, Data: info}, nil
}

@@ -409,6 +423,20 @@ func (s *ClusterServer) GetRestore(ctx context.Context, req *api.GetRestoreReq)
}

info := convertToRestoreInfo(restore)
// get job status
kubeCli := s.KubeClient.GetKubeClient(k8sID)
_, err = kubeCli.BatchV1().Jobs(restore.GetNamespace()).Get(ctx, restore.GetRestoreJobName(), metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
logger.Info("Restore Job not found, Restore Job is Stopped.", zap.Error(err))
} else {
logger.Error("Get Restore job failed", zap.Error(err))
message := fmt.Sprintf("Get restore job failed: %s", err.Error())
setResponseStatusCodes(ctx, http.StatusInternalServerError)
return &api.GetRestoreResp{Success: false, Message: &message}, nil
}
}

return &api.GetRestoreResp{Success: true, Data: info}, nil
}

@@ -431,11 +459,130 @@ func convertToRestoreInfo(restore *v1alpha1.Restore) *api.RestoreInfo {
}

func (s *ClusterServer) StopBackup(ctx context.Context, req *api.StopBackupReq) (*api.StopBackupResp, error) {
return nil, errors.New("StopBackup not implemented")
k8sID := getKubernetesID(ctx)
opCli := s.KubeClient.GetOperatorClient(k8sID)
kubeCli := s.KubeClient.GetKubeClient(k8sID)
logger := log.L().With(zap.String("request", "StopBackup"), zap.String("k8sID", k8sID),
zap.String("clusterID", req.ClusterId), zap.String("backupID", req.BackupId))
if opCli == nil || kubeCli == nil {
logger.Error("K8s client not found")
message := fmt.Sprintf("no %s is specified in the request header or the kubeconfig context not exists", HeaderKeyKubernetesID)
setResponseStatusCodes(ctx, http.StatusBadRequest)
return &api.StopBackupResp{Success: false, Message: &message}, nil
}

// check whether the backup exists
backup, err := opCli.PingcapV1alpha1().Backups(req.ClusterId).Get(ctx, req.BackupId, metav1.GetOptions{})
if err != nil {
logger.Error("Backup not found", zap.Error(err))
message := fmt.Sprintf("Backup %s not found", req.BackupId)
setResponseStatusCodes(ctx, http.StatusBadRequest)
return &api.StopBackupResp{Success: false, Message: &message}, nil
}

// stop backup
if backup.Spec.Mode == v1alpha1.BackupModeLog {
backup.Spec.LogStop = true
_, err = opCli.PingcapV1alpha1().Backups(req.ClusterId).Update(ctx, backup, metav1.UpdateOptions{})
if err != nil {
logger.Error("Stop log backup failed", zap.Error(err))
message := fmt.Sprintf("Stop log backup failed: %s", err.Error())
setResponseStatusCodes(ctx, http.StatusInternalServerError)
return &api.StopBackupResp{Success: false, Message: &message}, nil
}
} else {
_, err := kubeCli.BatchV1().Jobs(backup.GetNamespace()).Get(ctx, backup.GetBackupJobName(), metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
logger.Warn("Backup is already Stopped", zap.Error(err))
message := fmt.Sprintf("Backup %s is already Stopped", req.BackupId)
setResponseStatusCodes(ctx, http.StatusNotFound)
return &api.StopBackupResp{Success: false, Message: &message}, nil
}
logger.Error("Get backup job failed", zap.Error(err))
message := fmt.Sprintf("Get backup job failed: %s", err.Error())
setResponseStatusCodes(ctx, http.StatusInternalServerError)
return &api.StopBackupResp{Success: false, Message: &message}, nil
}

err = kubeCli.BatchV1().Jobs(backup.GetNamespace()).Delete(ctx, backup.GetBackupJobName(), metav1.DeleteOptions{})
if err != nil {
logger.Error("Stop backup failed", zap.Error(err))
message := fmt.Sprintf("Stop backup failed: %s", err.Error())
setResponseStatusCodes(ctx, http.StatusInternalServerError)
return &api.StopBackupResp{Success: false, Message: &message}, nil
}
}

// update backup status
backup.Status.Phase = v1alpha1.BackupStopped
_, err = opCli.PingcapV1alpha1().Backups(req.ClusterId).Update(ctx, backup, metav1.UpdateOptions{})
if err != nil {
logger.Error("Backup not found", zap.Error(err))
message := fmt.Sprintf("Backup %s not found", req.BackupId)
setResponseStatusCodes(ctx, http.StatusBadRequest)
return &api.StopBackupResp{Success: false, Message: &message}, nil
}

return &api.StopBackupResp{Success: true}, nil
}

func (s *ClusterServer) StopRestore(ctx context.Context, req *api.StopRestoreReq) (*api.StopRestoreResp, error) {
return nil, errors.New("StopRestore not implemented")
k8sID := getKubernetesID(ctx)
opCli := s.KubeClient.GetOperatorClient(k8sID)
kubeCli := s.KubeClient.GetKubeClient(k8sID)
logger := log.L().With(zap.String("request", "StopRestore"), zap.String("k8sID", k8sID),
zap.String("clusterID", req.ClusterId), zap.String("storeID", req.RestoreId))
if opCli == nil || kubeCli == nil {
logger.Error("K8s client not found")
message := fmt.Sprintf("no %s is specified in the request header or the kubeconfig context not exists", HeaderKeyKubernetesID)
setResponseStatusCodes(ctx, http.StatusBadRequest)
return &api.StopRestoreResp{Success: false, Message: &message}, nil
}

// check whether the restore exists
restore, err := opCli.PingcapV1alpha1().Restores(req.ClusterId).Get(ctx, req.RestoreId, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
logger.Warn("Restore is already Stopped", zap.Error(err))
message := fmt.Sprintf("Restore %s is already Stopped", req.RestoreId)
setResponseStatusCodes(ctx, http.StatusNotFound)
return &api.StopRestoreResp{Success: false, Message: &message}, nil
}
logger.Error("Restore not found", zap.Error(err))
message := fmt.Sprintf("Restore %s not found", req.RestoreId)
setResponseStatusCodes(ctx, http.StatusBadRequest)
return &api.StopRestoreResp{Success: false, Message: &message}, nil
}

// stop restore
_, err = kubeCli.BatchV1().Jobs(restore.GetNamespace()).Get(ctx, restore.GetRestoreJobName(), metav1.GetOptions{})
if err != nil {
logger.Error("Get restore job failed", zap.Error(err))
message := fmt.Sprintf("Get restore job failed: %s", err.Error())
setResponseStatusCodes(ctx, http.StatusInternalServerError)
return &api.StopRestoreResp{Success: false, Message: &message}, nil
}

err = kubeCli.BatchV1().Jobs(restore.GetNamespace()).Delete(ctx, restore.GetRestoreJobName(), metav1.DeleteOptions{})
if err != nil {
logger.Error("Stop restore failed", zap.Error(err))
message := fmt.Sprintf("Stop restore failed: %s", err.Error())
setResponseStatusCodes(ctx, http.StatusInternalServerError)
return &api.StopRestoreResp{Success: false, Message: &message}, nil
}

// update restore status
restore.Status.Phase = v1alpha1.RestoreStopped
_, err = opCli.PingcapV1alpha1().Restores(req.ClusterId).Update(ctx, restore, metav1.UpdateOptions{})
if err != nil {
logger.Error("Restore not found", zap.Error(err))
message := fmt.Sprintf("Restore %s not found", req.RestoreId)
setResponseStatusCodes(ctx, http.StatusBadRequest)
return &api.StopRestoreResp{Success: false, Message: &message}, nil
}

return &api.StopRestoreResp{Success: true}, nil
}

func (s *ClusterServer) DeleteBackup(ctx context.Context, req *api.DeleteBackupReq) (*api.DeleteBackupResp, error) {
16 changes: 8 additions & 8 deletions cmd/http-service/server/cluster.go
Original file line number Diff line number Diff line change
@@ -42,17 +42,17 @@ import (
type ClusterStatus string

const (
// the cluster is still being created
// ClusterStatusCreating the cluster is still being created
ClusterStatusCreating ClusterStatus = "creating"
// all components are running
// ClusterStatusRunning all components are running
ClusterStatusRunning ClusterStatus = "running"
// some components are deleting
// ClusterStatusDeleting some components are deleting
ClusterStatusDeleting ClusterStatus = "deleting"
// some components are scaling
// ClusterStatusScaling some components are scaling
ClusterStatusScaling ClusterStatus = "scaling"
// some components are upgrading
// ClusterStatusUpgrading some components are upgrading
ClusterStatusUpgrading ClusterStatus = "upgrading"
// some components are unavailable
// ClusterStatusUnavailable some components are unavailable
ClusterStatusUnavailable ClusterStatus = "unavailable"

helperImage = "busybox:1.36"
@@ -860,7 +860,7 @@ func convertToClusterInfo(logger *zap.Logger, kubeCli kubernetes.Interface, tc *
Name: strings.TrimPrefix(member.PodName, namePrefix),
Id: id,
StartTime: getPodStartTime(podList, member.PodName),
State: string(member.State),
State: member.State,
})
}

@@ -900,7 +900,7 @@ func convertToClusterInfo(logger *zap.Logger, kubeCli kubernetes.Interface, tc *
Name: strings.TrimPrefix(member.PodName, namePrefix),
Id: id,
StartTime: getPodStartTime(podList, member.PodName),
State: string(member.State),
State: member.State,
})
if member.State == v1alpha1.TiKVStateUp {
tiflashReadyCount++
4 changes: 2 additions & 2 deletions pkg/apis/pingcap/v1alpha1/backup.go
Original file line number Diff line number Diff line change
@@ -309,7 +309,7 @@ func NeedNotClean(backup *Backup) bool {
return backup.Spec.CleanPolicy == CleanPolicyTypeOnFailure && !IsBackupFailed(backup)
}

// ParseLogBackupSubCommand parse the log backup subcommand from cr.
// ParseLogBackupSubcommand parse the log backup subcommand from cr.
// The parse priority of the command is stop > truncate > start.
func ParseLogBackupSubcommand(backup *Backup) LogSubCommandType {
if backup.Spec.Mode != BackupModeLog {
@@ -382,7 +382,7 @@ func IsLogBackupAlreadyTruncate(backup *Backup) bool {
return specTS <= startCommitTS || specTS <= successedTS
}

// IsLogBackupAlreadyStop return whether log backup has already stoped.
// IsLogBackupAlreadyStop return whether log backup has already stopped.
func IsLogBackupAlreadyStop(backup *Backup) bool {
return backup.Spec.Mode == BackupModeLog && backup.Status.Phase == BackupStopped
}
2 changes: 2 additions & 0 deletions pkg/apis/pingcap/v1alpha1/types.go
Original file line number Diff line number Diff line change
@@ -2383,6 +2383,8 @@ const (
RestoreRetryFailed RestoreConditionType = "RetryFailed"
// RestoreInvalid means invalid restore CR.
RestoreInvalid RestoreConditionType = "Invalid"
// RestoreStopped means the restore was stopped.
RestoreStopped RestoreConditionType = "Stopped"
)

// RestoreCondition describes the observed state of a Restore at a certain point.
8 changes: 4 additions & 4 deletions pkg/pdapi/pdapi.go
Original file line number Diff line number Diff line change
@@ -64,7 +64,7 @@ type PDClient interface {
GetTombStoneStores() (*StoresInfo, error)
// GetStore gets a TiKV store for a specific store id from cluster
GetStore(storeID uint64) (*StoreInfo, error)
// storeLabelsEqualNodeLabels compares store labels with node labels
// SetStoreLabels compares store labels with node labels
// for historic reasons, PD stores TiKV labels as []*StoreLabel which is a key-value pair slice
SetStoreLabels(storeID uint64, labels map[string]string) (bool, error)
// UpdateReplicationConfig updates the replication config
@@ -196,7 +196,7 @@ type MembersInfo struct {

// below copied from github.com/tikv/pd/pkg/autoscaling

// Strategy within a HTTP request provides rules and resources to help make decision for auto scaling.
// Strategy within an HTTP request provides rules and resources to help make decision for auto scaling.
type Strategy struct {
Rules []*Rule `json:"rules"`
Resources []*Resource `json:"resources"`
@@ -258,7 +258,7 @@ func (c *pdClient) GetHealth() (*HealthInfo, error) {
if err != nil {
return nil, err
}
healths := []MemberHealth{}
var healths []MemberHealth
err = json.Unmarshal(body, &healths)
if err != nil {
return nil, err
@@ -378,7 +378,7 @@ func (c *pdClient) DeleteStore(storeID uint64) error {
}
defer httputil.DeferClose(res.Body)

// Remove an offline store should returns http.StatusOK
// Remove an offline store should return http.StatusOK
if res.StatusCode == http.StatusOK || res.StatusCode == http.StatusNotFound {
return nil
}

0 comments on commit 672470f

Please sign in to comment.