Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CP 1.0.x] Add remove leaked collection meta cmd #223

Merged
merged 1 commit into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions states/etcd/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func RemoveCommand(cli clientv3.KV, instanceName, basePath string) *cobra.Comman
remove.SegmentCollectionDroppedCommand(cli, basePath),
// remove etcd-config
remove.EtcdConfigCommand(cli, instanceName),
// remove collection has been dropped
remove.CollectionCleanCommand(cli, basePath),
)

return removeCmd
Expand Down
3 changes: 3 additions & 0 deletions states/etcd/common/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ import (
)

const (
SnapshotPrefix = "snapshots"
// CollectionMetaPrefix is prefix for rootcoord collection meta.
CollectionMetaPrefix = `root-coord/collection`
// DBCollectionMetaPrefix is prefix for rootcoord database collection meta
DBCollectionMetaPrefix = `root-coord/database/collection-info`
// FieldMetaPrefix is prefix for rootcoord collection fields meta
FieldMetaPrefix = `root-coord/fields`
// CollectionLoadPrefix is prefix for querycoord collection loaded in milvus v2.1.x
CollectionLoadPrefix = "queryCoord-collectionMeta"
// CollectionLoadPrefixV2 is prefix for querycoord collection loaded in milvus v2.2.x
Expand Down
9 changes: 5 additions & 4 deletions states/etcd/common/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ import (
)

const (
segmentMetaPrefix = "datacoord-meta/s"
SegmentMetaPrefix = "datacoord-meta/s"
SegmentStatsMetaPrefix = "datacoord-meta/statslog"
)

// ListSegmentsVersion list segment info as specified version.
func ListSegmentsVersion(ctx context.Context, cli clientv3.KV, basePath string, version string, filters ...func(*models.Segment) bool) ([]*models.Segment, error) {
prefix := path.Join(basePath, segmentMetaPrefix) + "/"
prefix := path.Join(basePath, SegmentMetaPrefix) + "/"
switch version {
case models.LTEVersion2_1:
segments, keys, err := ListProtoObjects[datapb.SegmentInfo](ctx, cli, prefix)
Expand Down Expand Up @@ -107,7 +108,7 @@ func getSegmentLazyFunc(cli clientv3.KV, basePath string, segment datapbv2.Segme
func ListSegments(cli clientv3.KV, basePath string, filter func(*datapb.SegmentInfo) bool) ([]*datapb.SegmentInfo, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
resp, err := cli.Get(ctx, path.Join(basePath, segmentMetaPrefix)+"/", clientv3.WithPrefix())
resp, err := cli.Get(ctx, path.Join(basePath, SegmentMetaPrefix)+"/", clientv3.WithPrefix())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -309,7 +310,7 @@ func RemoveSegmentByID(ctx context.Context, cli clientv3.KV, basePath string, co

func UpdateSegments(ctx context.Context, cli clientv3.KV, basePath string, collectionID int64, fn func(segment *datapbv2.SegmentInfo)) error {

prefix := path.Join(basePath, fmt.Sprintf("%s/%d", segmentMetaPrefix, collectionID)) + "/"
prefix := path.Join(basePath, fmt.Sprintf("%s/%d", SegmentMetaPrefix, collectionID)) + "/"
segments, keys, err := ListProtoObjects[datapbv2.SegmentInfo](ctx, cli, prefix)
if err != nil {
return err
Expand Down
154 changes: 154 additions & 0 deletions states/etcd/remove/collection_clean.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package remove

import (
"context"
"fmt"
"path"
"strconv"
"strings"

"github.com/samber/lo"
"github.com/spf13/cobra"
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/milvus-io/birdwatcher/models"
"github.com/milvus-io/birdwatcher/states/etcd/common"
etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version"
)

var paginationSize = 2000

type ExcludePrefixOptions func(string) bool

// CollectionCleanCommand returns command to remove
func CollectionCleanCommand(cli clientv3.KV, basePath string) *cobra.Command {
cmd := &cobra.Command{
Use: "collection-meta-leaked",
Short: "Remove leaked collection meta for collection has been dropped",
Run: func(cmd *cobra.Command, args []string) {
run, err := cmd.Flags().GetBool("run")
if err != nil {
fmt.Println(err.Error())
return
}

collections, err := common.ListCollectionsVersion(context.TODO(), cli, basePath, etcdversion.GetVersion())
if err != nil {
fmt.Println(err.Error())
return
}

id2Collection := lo.SliceToMap(collections, func(col *models.Collection) (string, *models.Collection) {
fmt.Printf("existing collectionID %v\n", col.ID)
return strconv.FormatInt(col.ID, 10), col
})

cleanMetaFn := func(ctx context.Context, prefix string, opts ...ExcludePrefixOptions) error {
return walkWithPrefix(ctx, cli, prefix, paginationSize, func(k []byte, v []byte) error {
sKey := string(k)
for _, opt := range opts {
if opt(sKey) {
return nil
}
}

key := sKey[len(prefix):]
ss := strings.Split(key, "/")
collectionExist := false
for _, s := range ss {
if _, ok := id2Collection[s]; ok {
collectionExist = true
}
}

if !collectionExist {
fmt.Println("clean meta key ", sKey)
if run {
_, err = cli.Delete(ctx, sKey)
return err
}
}

return nil
})
}

// remove collection meta
// meta before database
collectionMetaPrefix := path.Join(basePath, common.CollectionMetaPrefix)
// with database
dbCollectionMetaPrefix := path.Join(basePath, common.DBCollectionMetaPrefix)
// remove collection field meta
fieldsPrefix := path.Join(basePath, common.FieldMetaPrefix)
fieldsSnapShotPrefix := path.Join(basePath, common.SnapshotPrefix, common.FieldMetaPrefix)
// remove collection partition meta
partitionsPrefix := path.Join(basePath, common.PartitionPrefix)
partitionsSnapShotPrefix := path.Join(basePath, common.SnapshotPrefix, common.PartitionPrefix)
prefixes := []string{
collectionMetaPrefix,
dbCollectionMetaPrefix,
fieldsPrefix,
fieldsSnapShotPrefix,
partitionsPrefix,
partitionsSnapShotPrefix}

for _, prefix := range prefixes {
fmt.Printf("start cleaning leaked collection meta, prefix: %s\n", prefix)
err = cleanMetaFn(context.TODO(), prefix)
if err != nil {
fmt.Println(err.Error())
return
}
fmt.Printf("clean leaked collection meta done, prefix: %s\n", prefix)
}

// remove segment meta
segmentPrefix := path.Join(basePath, common.SegmentMetaPrefix)
segmentStatsPrefix := path.Join(basePath, common.SegmentStatsMetaPrefix)
fmt.Printf("start cleaning leaked segment meta, prefix: %s, exclude prefix%s\n", segmentPrefix, segmentStatsPrefix)
err = cleanMetaFn(context.TODO(), segmentPrefix, func(key string) bool {
return strings.HasPrefix(key, segmentStatsPrefix)
})
if err != nil {
fmt.Println(err.Error())
return
}

fmt.Printf("clean leaked segment meta done, prefix: %s\n", segmentPrefix)
},
}

cmd.Flags().Bool("run", false, "flags indicating whether to execute removed command")
return cmd
}

func walkWithPrefix(ctx context.Context, cli clientv3.KV, prefix string, paginationSize int, fn func([]byte, []byte) error) error {
batch := int64(paginationSize)
opts := []clientv3.OpOption{
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend),
clientv3.WithLimit(batch),
clientv3.WithRange(clientv3.GetPrefixRangeEnd(prefix)),
}

key := prefix
for {
resp, err := cli.Get(ctx, key, opts...)
if err != nil {
return err
}

for _, kv := range resp.Kvs {
if err = fn(kv.Key, kv.Value); err != nil {
return err
}
}

if !resp.More {
break
}
// move to next key
key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0))
}

return nil
}