diff --git a/CHANGELOG.md b/CHANGELOG.md index e31c35c20..bda708d31 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## v1.10.0 +BUGFIXES +* [#1435](https://github.com/bnb-chain/greenfield-storage-provider/pull/1435) fix: gc objects + ## v1.9.0 BUGFIXES * [#1423](https://github.com/bnb-chain/greenfield-storage-provider/pull/1423) fix: no such object error code diff --git a/Makefile b/Makefile index 1f058f96f..408bed1fd 100644 --- a/Makefile +++ b/Makefile @@ -40,7 +40,7 @@ install-go-test-coverage: go install github.com/vladopajic/go-test-coverage/v2@latest install-tools: - go install go.uber.org/mock/mockgen@latest + go install go.uber.org/mock/mockgen@v0.1.0 go install github.com/bufbuild/buf/cmd/buf@v1.28.0 go install github.com/cosmos/gogoproto/protoc-gen-gocosmos@latest diff --git a/README.md b/README.md index 22e9807be..6c5c6c465 100644 --- a/README.md +++ b/README.md @@ -84,7 +84,7 @@ Failure: plugin gocosmos: could not find protoc plugin for name gocosmos - pleas GO111MODULE=on GOBIN=/usr/local/go/bin go install github.com/cosmos/gogoproto/protoc-gen-gocosmos@latest # if you want to execute unit test of sp, you should execute the following command, assumed that you installed golang in /usr/local/go/bin. Other OS are similar. -GO111MODULE=on GOBIN=/usr/local/go/bin go install go.uber.org/mock/mockgen@latest +GO111MODULE=on GOBIN=/usr/local/go/bin go install go.uber.org/mock/mockgen@v0.1.0 ``` Above error messages are due to users don't set go env correctly. More info users can search `GOROOT`, `GOPATH` and `GOBIN`. diff --git a/cmd/command/bs_data_migration/v1.0.1/fix_payment.go b/cmd/command/bs_data_migration/v1.0.1/fix_payment.go index 8fa5ca711..84d28fd21 100644 --- a/cmd/command/bs_data_migration/v1.0.1/fix_payment.go +++ b/cmd/command/bs_data_migration/v1.0.1/fix_payment.go @@ -42,7 +42,7 @@ func FixPayment(endpoint string, db *gorm.DB) error { } defer resp.Body.Close() if resp.StatusCode != 200 { - httpErr = fmt.Errorf(resp.Status) + httpErr = fmt.Errorf("%s", resp.Status) return } err := json.NewDecoder(resp.Body).Decode(&paymentResult) diff --git a/cmd/command/query.go b/cmd/command/query.go index a60abed2f..c9c65ecf2 100644 --- a/cmd/command/query.go +++ b/cmd/command/query.go @@ -218,7 +218,7 @@ func (w *CMDWrapper) queryTasksAction(ctx *cli.Context) error { fmt.Printf("failed to query task due to no task, endpoint:%v, key:%v\n", endpoint, key) } for _, info := range infos { - fmt.Printf(info + "\n") + fmt.Printf("%s"+"\n", info) } return nil } diff --git a/core/piecestore/piecestore.go b/core/piecestore/piecestore.go index cf5dace6d..e1b30a985 100644 --- a/core/piecestore/piecestore.go +++ b/core/piecestore/piecestore.go @@ -48,4 +48,7 @@ type PieceStore interface { // DeletePiece deletes the piece data from piece store, it can delete // segment or ec piece data. DeletePiece(ctx context.Context, key string) error + // DeletePiecesByPrefix deletes pieces data from piece store, it can delete + // segment or ec piece data. + DeletePiecesByPrefix(ctx context.Context, key string) (uint64, error) } diff --git a/core/piecestore/piecestore_mock.go b/core/piecestore/piecestore_mock.go index ad090f369..e5a6ee432 100644 --- a/core/piecestore/piecestore_mock.go +++ b/core/piecestore/piecestore_mock.go @@ -205,6 +205,21 @@ func (mr *MockPieceStoreMockRecorder) DeletePiece(ctx, key any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeletePiece", reflect.TypeOf((*MockPieceStore)(nil).DeletePiece), ctx, key) } +// DeletePiecesByPrefix mocks base method. +func (m *MockPieceStore) DeletePiecesByPrefix(ctx context.Context, key string) (uint64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeletePiecesByPrefix", ctx, key) + ret0, _ := ret[0].(uint64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DeletePiecesByPrefix indicates an expected call of DeletePiecesByPrefix. +func (mr *MockPieceStoreMockRecorder) DeletePiecesByPrefix(ctx, key any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeletePiecesByPrefix", reflect.TypeOf((*MockPieceStore)(nil).DeletePiecesByPrefix), ctx, key) +} + // GetPiece mocks base method. func (m *MockPieceStore) GetPiece(ctx context.Context, key string, offset, limit int64) ([]byte, error) { m.ctrl.T.Helper() diff --git a/modular/executor/execute_gc.go b/modular/executor/execute_gc.go index def9827eb..7469b106a 100644 --- a/modular/executor/execute_gc.go +++ b/modular/executor/execute_gc.go @@ -3,6 +3,8 @@ package executor import ( "context" "errors" + "fmt" + "math" "strconv" "strings" "time" @@ -199,7 +201,6 @@ func (e *ExecuteModular) HandleGCObjectTask(ctx context.Context, task coretask.G currentGCBlockID uint64 currentGCObjectID uint64 responseEndBlockID uint64 - storageParams *storagetypes.Params gcObjectNumber int tryAgainLater bool taskIsCanceled bool @@ -252,30 +253,18 @@ func (e *ExecuteModular) HandleGCObjectTask(ctx context.Context, task coretask.G } for _, object := range waitingGCObjects { - if storageParams, err = e.baseApp.Consensus().QueryStorageParamsByTimestamp( - context.Background(), object.GetObjectInfo().GetCreateAt()); err != nil { - log.Errorw("failed to query storage params", "task_info", task.Info(), "error", err) - return - } - currentGCBlockID = uint64(object.GetDeleteAt()) objectInfo := object.GetObjectInfo() - objectVersion := objectInfo.Version currentGCObjectID = objectInfo.Id.Uint64() if currentGCBlockID < task.GetCurrentBlockNumber() { log.Errorw("skip gc object", "object_info", objectInfo, "task_current_gc_block_id", task.GetCurrentBlockNumber()) continue } - segmentCount := e.baseApp.PieceOp().SegmentPieceCount(objectInfo.GetPayloadSize(), - storageParams.VersionedParams.GetMaxSegmentSize()) - for segIdx := uint32(0); segIdx < segmentCount; segIdx++ { - pieceKey := e.baseApp.PieceOp().SegmentPieceKey(currentGCObjectID, segIdx, objectVersion) - // ignore this delete api error, TODO: refine gc workflow by enrich metadata index. - deleteErr := e.baseApp.PieceStore().DeletePiece(ctx, pieceKey) - log.CtxDebugw(ctx, "delete the primary sp pieces", "object_info", objectInfo, - "piece_key", pieceKey, "error", deleteErr) - } + segmentPieceKeyPrefix := fmt.Sprintf("s%d_", currentGCObjectID) + deletedSize, deleteErr := e.baseApp.PieceStore().DeletePiecesByPrefix(ctx, segmentPieceKeyPrefix) + log.CtxDebugw(ctx, "delete the primary sp pieces", "object_info", objectInfo, + "piece_key_prefix", segmentPieceKeyPrefix, "deletedSize", deletedSize, "error", deleteErr) bucketInfo, err := e.baseApp.GfSpClient().GetBucketInfoByBucketName(ctx, objectInfo.BucketName) if err != nil || bucketInfo == nil { log.Errorw("failed to get bucket by bucket name", "bucket_name", objectInfo.BucketName, "error", err) @@ -288,23 +277,30 @@ func (e *ExecuteModular) HandleGCObjectTask(ctx context.Context, task coretask.G } var redundancyIndex int32 = -1 - for rIdx, sspId := range gvg.GetSecondarySpIds() { - if spId == sspId { - redundancyIndex = int32(rIdx) - for segIdx := uint32(0); segIdx < segmentCount; segIdx++ { - pieceKey := e.baseApp.PieceOp().ECPieceKey(currentGCObjectID, segIdx, uint32(rIdx), objectVersion) - if objectInfo.GetRedundancyType() == storagetypes.REDUNDANCY_REPLICA_TYPE { - pieceKey = e.baseApp.PieceOp().SegmentPieceKey(objectInfo.Id.Uint64(), segIdx, objectVersion) - } + // since in GC the object will be completely deleted, simply find all pieces with the piece key prefix and remove them + ECPieceKeyPrefix := fmt.Sprintf("e%d_", currentGCObjectID) + if len(gvg.GetSecondarySpIds()) != 0 { + for rIdx, sspId := range gvg.GetSecondarySpIds() { + if spId == sspId { + redundancyIndex = int32(rIdx) // ignore this delete api error, TODO: refine gc workflow by enrich metadata index. - deleteErr := e.baseApp.PieceStore().DeletePiece(ctx, pieceKey) - log.CtxDebugw(ctx, "delete the secondary sp pieces", - "object_info", objectInfo, "piece_key", pieceKey, "error", deleteErr) + deletedSize, deleteErr = e.baseApp.PieceStore().DeletePiecesByPrefix(ctx, ECPieceKeyPrefix) + log.CtxDebugw(ctx, "delete the secondary sp pieces by prefix", + "object_info", objectInfo, "piece_key_prefix", ECPieceKeyPrefix, "deletedSize", deletedSize, "error", deleteErr) } } + } else { + // if failed to get secondary sps, check the current sp + deletedSize, deleteErr = e.baseApp.PieceStore().DeletePiecesByPrefix(ctx, ECPieceKeyPrefix) + log.CtxDebugw(ctx, "delete the sp pieces by prefix in current sp when secondary sp not found", + "object_info", objectInfo, "piece_key_prefix", ECPieceKeyPrefix, "deletedSize", deletedSize, "error", deleteErr) + + // signal as delete any integrity meta related with the object + redundancyIndex = math.MaxInt32 } + // ignore this delete api error, TODO: refine gc workflow by enrich metadata index - deleteErr := e.baseApp.GfSpDB().DeleteObjectIntegrity(objectInfo.Id.Uint64(), redundancyIndex) + deleteErr = e.baseApp.GfSpDB().DeleteObjectIntegrity(objectInfo.Id.Uint64(), redundancyIndex) log.CtxDebugw(ctx, "delete the object integrity meta", "object_info", objectInfo, "error", deleteErr) task.SetCurrentBlockNumber(currentGCBlockID) task.SetLastDeletedObjectId(currentGCObjectID) diff --git a/modular/executor/executor_task_test.go b/modular/executor/executor_task_test.go index b47080005..559edcd8f 100644 --- a/modular/executor/executor_task_test.go +++ b/modular/executor/executor_task_test.go @@ -520,32 +520,6 @@ func TestExecuteModular_HandleGCObjectTask(t *testing.T) { return e }, }, - { - name: "failed to query storage params", - task: &gfsptask.GfSpGCObjectTask{ - Task: &gfsptask.GfSpTask{}, - }, - fn: func() *ExecuteModular { - e := setup(t) - ctrl := gomock.NewController(t) - m := gfspclient.NewMockGfSpClientAPI(ctrl) - waitingGCObjects := []*metadatatypes.Object{ - { - ObjectInfo: &storagetypes.ObjectInfo{Id: sdkmath.NewUint(1)}, - }, - } - m.EXPECT().ListDeletedObjectsByBlockNumberRange(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), - gomock.Any()).Return(waitingGCObjects, uint64(0), nil).Times(1) - m.EXPECT().ReportTask(gomock.Any(), gomock.Any()).Return(nil).Times(1) - e.baseApp.SetGfSpClient(m) - - m1 := consensus.NewMockConsensus(ctrl) - m1.EXPECT().QueryStorageParamsByTimestamp(gomock.Any(), gomock.Any()).Return(nil, mockErr).Times(1) - m1.EXPECT().QuerySP(gomock.Any(), gomock.Any()).Return(&sptypes.StorageProvider{Id: 1}, nil).Times(1) - e.baseApp.SetConsensus(m1) - return e - }, - }, { name: "failed to get bucket by bucket name", task: &gfsptask.GfSpGCObjectTask{ @@ -568,18 +542,14 @@ func TestExecuteModular_HandleGCObjectTask(t *testing.T) { e.baseApp.SetGfSpClient(m) m1 := consensus.NewMockConsensus(ctrl) - m1.EXPECT().QueryStorageParamsByTimestamp(gomock.Any(), gomock.Any()).Return(&storagetypes.Params{ - VersionedParams: storagetypes.VersionedParams{MaxSegmentSize: 10}}, nil).Times(1) m1.EXPECT().QuerySP(gomock.Any(), gomock.Any()).Return(&sptypes.StorageProvider{Id: 1}, nil).Times(1) e.baseApp.SetConsensus(m1) m2 := piecestore.NewMockPieceOp(ctrl) - m2.EXPECT().SegmentPieceCount(gomock.Any(), gomock.Any()).Return(uint32(1)).Times(1) - m2.EXPECT().SegmentPieceKey(gomock.Any(), gomock.Any(), gomock.Any()).Return("test").Times(1) e.baseApp.SetPieceOp(m2) m3 := piecestore.NewMockPieceStore(ctrl) - m3.EXPECT().DeletePiece(gomock.Any(), gomock.Any()).Return(nil).Times(1) + m3.EXPECT().DeletePiecesByPrefix(gomock.Any(), gomock.Any()).Return(uint64(0), nil).Times(1) e.baseApp.SetPieceStore(m3) return e }, @@ -608,18 +578,14 @@ func TestExecuteModular_HandleGCObjectTask(t *testing.T) { e.baseApp.SetGfSpClient(m) m1 := consensus.NewMockConsensus(ctrl) - m1.EXPECT().QueryStorageParamsByTimestamp(gomock.Any(), gomock.Any()).Return(&storagetypes.Params{ - VersionedParams: storagetypes.VersionedParams{MaxSegmentSize: 10}}, nil).Times(1) m1.EXPECT().QuerySP(gomock.Any(), gomock.Any()).Return(&sptypes.StorageProvider{Id: 1}, nil).Times(1) e.baseApp.SetConsensus(m1) m2 := piecestore.NewMockPieceOp(ctrl) - m2.EXPECT().SegmentPieceCount(gomock.Any(), gomock.Any()).Return(uint32(1)).Times(1) - m2.EXPECT().SegmentPieceKey(gomock.Any(), gomock.Any(), gomock.Any()).Return("test").Times(1) e.baseApp.SetPieceOp(m2) m3 := piecestore.NewMockPieceStore(ctrl) - m3.EXPECT().DeletePiece(gomock.Any(), gomock.Any()).Return(nil).Times(1) + m3.EXPECT().DeletePiecesByPrefix(gomock.Any(), gomock.Any()).Return(uint64(0), nil).Times(1) e.baseApp.SetPieceStore(m3) return e }, @@ -676,19 +642,14 @@ func TestExecuteModular_HandleGCObjectTask(t *testing.T) { e.baseApp.SetGfSpClient(m) m1 := consensus.NewMockConsensus(ctrl) - m1.EXPECT().QueryStorageParamsByTimestamp(gomock.Any(), gomock.Any()).Return(&storagetypes.Params{ - VersionedParams: storagetypes.VersionedParams{MaxSegmentSize: 10}}, nil).Times(1) m1.EXPECT().QuerySP(gomock.Any(), gomock.Any()).Return(&sptypes.StorageProvider{Id: 1}, nil).Times(1) e.baseApp.SetConsensus(m1) m2 := piecestore.NewMockPieceOp(ctrl) - m2.EXPECT().SegmentPieceCount(gomock.Any(), gomock.Any()).Return(uint32(1)).Times(1) - m2.EXPECT().SegmentPieceKey(gomock.Any(), gomock.Any(), gomock.Any()).Return("test").Times(1) - m2.EXPECT().ECPieceKey(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return("test").Times(1) e.baseApp.SetPieceOp(m2) m3 := piecestore.NewMockPieceStore(ctrl) - m3.EXPECT().DeletePiece(gomock.Any(), gomock.Any()).Return(nil).Times(2) + m3.EXPECT().DeletePiecesByPrefix(gomock.Any(), gomock.Any()).Return(uint64(0), nil).Times(2) e.baseApp.SetPieceStore(m3) m4 := corespdb.NewMockSPDB(ctrl) diff --git a/store/piecestore/client/piece_store_client.go b/store/piecestore/client/piece_store_client.go index 1dc17599f..82f818d13 100644 --- a/store/piecestore/client/piece_store_client.go +++ b/store/piecestore/client/piece_store_client.go @@ -132,3 +132,28 @@ func (client *StoreClient) DeletePiece(ctx context.Context, key string) error { err = client.ps.Delete(ctx, key) return err } + +// DeletePiecesByPrefix deletes pieces by prefix from piece store. +func (client *StoreClient) DeletePiecesByPrefix(ctx context.Context, key string) (uint64, error) { + var ( + startTime = time.Now() + err error + valSize uint64 + ) + defer func() { + if err != nil { + metrics.PieceStoreCounter.WithLabelValues(PieceStoreFailureDel).Inc() + metrics.PieceStoreTime.WithLabelValues(PieceStoreFailureDel).Observe( + time.Since(startTime).Seconds()) + return + } + metrics.PieceStoreCounter.WithLabelValues(PieceStoreSuccessDel).Inc() + metrics.PieceStoreTime.WithLabelValues(PieceStoreSuccessDel).Observe( + time.Since(startTime).Seconds()) + metrics.PieceStoreUsageAmountGauge.WithLabelValues(PieceStoreSuccessDel).Add(0 - float64(valSize)) + }() + + valSize, err = client.ps.DeleteByPrefix(ctx, key) + + return valSize, err +} diff --git a/store/piecestore/piece/api.go b/store/piecestore/piece/api.go index 6d5b51aa0..ab50d8e37 100644 --- a/store/piecestore/piece/api.go +++ b/store/piecestore/piece/api.go @@ -14,6 +14,7 @@ type PieceAPI interface { Get(ctx context.Context, key string, offset, limit int64) (io.ReadCloser, error) Put(ctx context.Context, key string, reader io.Reader) error Delete(ctx context.Context, key string) error + DeleteByPrefix(ctx context.Context, key string) (uint64, error) } type PieceStore struct { @@ -35,6 +36,11 @@ func (p *PieceStore) Delete(ctx context.Context, key string) error { return p.storeAPI.DeleteObject(ctx, key) } +// DeleteByPrefix deletes several pieces in PieceStore and returns deleted size +func (p *PieceStore) DeleteByPrefix(ctx context.Context, key string) (uint64, error) { + return p.storeAPI.DeleteObjectsByPrefix(ctx, key) +} + // Head returns piece info in PieceStore func (p *PieceStore) Head(ctx context.Context, key string) (storage.Object, error) { return p.storeAPI.HeadObject(ctx, key) diff --git a/store/piecestore/piece/api_mock.go b/store/piecestore/piece/api_mock.go index cab16c92e..8621f3ea9 100644 --- a/store/piecestore/piece/api_mock.go +++ b/store/piecestore/piece/api_mock.go @@ -54,6 +54,21 @@ func (mr *MockPieceAPIMockRecorder) Delete(ctx, key any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockPieceAPI)(nil).Delete), ctx, key) } +// DeleteByPrefix mocks base method. +func (m *MockPieceAPI) DeleteByPrefix(ctx context.Context, key string) (uint64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteByPrefix", ctx, key) + ret0, _ := ret[0].(uint64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DeleteByPrefix indicates an expected call of DeleteByPrefix. +func (mr *MockPieceAPIMockRecorder) DeleteByPrefix(ctx, key any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteByPrefix", reflect.TypeOf((*MockPieceAPI)(nil).DeleteByPrefix), ctx, key) +} + // Get mocks base method. func (m *MockPieceAPI) Get(ctx context.Context, key string, offset, limit int64) (io.ReadCloser, error) { m.ctrl.T.Helper() diff --git a/store/piecestore/storage/disk_file.go b/store/piecestore/storage/disk_file.go index 7852a3261..8411c52d7 100644 --- a/store/piecestore/storage/disk_file.go +++ b/store/piecestore/storage/disk_file.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "io" + "io/fs" "math/rand" "os" "path/filepath" @@ -142,6 +143,44 @@ func (d *diskFileStore) DeleteObject(ctx context.Context, key string) error { return err } +func (d *diskFileStore) DeleteObjectsByPrefix(ctx context.Context, key string) (uint64, error) { + dirEntries, err := os.ReadDir(d.root) + if err != nil { + log.Errorw("DeleteObjectsByPrefix read directory error", "error", err) + return 0, err + } + + var ( + size uint64 + entryInfo fs.FileInfo + ) + + for _, dirEntry := range dirEntries { + entryName := dirEntry.Name() + if strings.HasPrefix(entryName, key) { + var curInfoSize int64 + // need to extract entry info and size first, otherwise when the object is deleted, the info can not be found + entryInfo, err = dirEntry.Info() + if entryInfo != nil { + curInfoSize = entryInfo.Size() + } + if err != nil { + log.Errorw("get dirEntry info error", "error", err) + } + err = d.DeleteObject(ctx, entryName) + if err != nil { + log.Errorw("remove single file by prefix error", "error", err) + } else { + if entryInfo != nil { + size += uint64(curInfoSize) + } + } + } + } + + return size, nil +} + func (d *diskFileStore) HeadBucket(ctx context.Context) error { if _, err := os.Stat(d.root); err != nil { if os.IsNotExist(err) { diff --git a/store/piecestore/storage/interface.go b/store/piecestore/storage/interface.go index ba459cfde..77872e8d8 100644 --- a/store/piecestore/storage/interface.go +++ b/store/piecestore/storage/interface.go @@ -21,6 +21,8 @@ type ObjectStorage interface { PutObject(ctx context.Context, key string, reader io.Reader) error // DeleteObject deletes an object DeleteObject(ctx context.Context, key string) error + // DeleteObjectsByPrefix deletes objects by prefix + DeleteObjectsByPrefix(ctx context.Context, key string) (uint64, error) // HeadBucket determines if a bucket exists and have permission to access it HeadBucket(ctx context.Context) error diff --git a/store/piecestore/storage/interface_mock.go b/store/piecestore/storage/interface_mock.go index 009050c75..5bc783759 100644 --- a/store/piecestore/storage/interface_mock.go +++ b/store/piecestore/storage/interface_mock.go @@ -69,6 +69,21 @@ func (mr *MockObjectStorageMockRecorder) DeleteObject(ctx, key any) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*MockObjectStorage)(nil).DeleteObject), ctx, key) } +// DeleteObjectsByPrefix mocks base method. +func (m *MockObjectStorage) DeleteObjectsByPrefix(ctx context.Context, key string) (uint64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteObjectsByPrefix", ctx, key) + ret0, _ := ret[0].(uint64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DeleteObjectsByPrefix indicates an expected call of DeleteObjectsByPrefix. +func (mr *MockObjectStorageMockRecorder) DeleteObjectsByPrefix(ctx, key any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObjectsByPrefix", reflect.TypeOf((*MockObjectStorage)(nil).DeleteObjectsByPrefix), ctx, key) +} + // GetObject mocks base method. func (m *MockObjectStorage) GetObject(ctx context.Context, key string, offset, limit int64) (io.ReadCloser, error) { m.ctrl.T.Helper() diff --git a/store/piecestore/storage/memory.go b/store/piecestore/storage/memory.go index 575214f60..14a6289ba 100644 --- a/store/piecestore/storage/memory.go +++ b/store/piecestore/storage/memory.go @@ -85,6 +85,39 @@ func (m *memoryStore) DeleteObject(ctx context.Context, key string) error { return nil } +func (m *memoryStore) DeleteObjectsByPrefix(ctx context.Context, key string) (uint64, error) { + var ( + continueDeleteObject = true + batchSize = int64(1000) + size uint64 + ) + + for continueDeleteObject { + // batch list and delete objects + objs, err := m.ListObjects(ctx, key, "", "", batchSize) + if err != nil { + log.Errorw("DeleteObjectsByPrefix read directory error", "error", err) + return size, err + } + + // if the object listed here is less than required batch size, meaning it is the last page + if int64(len(objs)) < batchSize { + continueDeleteObject = false + } + + for _, obj := range objs { + deleteErr := m.DeleteObject(ctx, obj.Key()) + if deleteErr != nil { + log.Errorw("remove single file by prefix error", "error", err) + } else { + size += uint64(obj.Size()) + } + } + } + + return size, nil +} + func (m *memoryStore) HeadBucket(ctx context.Context) error { return nil } diff --git a/store/piecestore/storage/oss.go b/store/piecestore/storage/oss.go index 98d7f313f..8f3b5d67a 100644 --- a/store/piecestore/storage/oss.go +++ b/store/piecestore/storage/oss.go @@ -73,6 +73,47 @@ func (o *ossStore) DeleteObject(ctx context.Context, key string) error { return o.bucket.DeleteObject(key, oss.GetResponseHeader(&respHeader)) } +func (o *ossStore) DeleteObjectsByPrefix(ctx context.Context, key string) (uint64, error) { + var ( + objectKeys []string + objectKeySizeMap = make(map[string]uint64) + continueDeleteObject = true + batchSize = int64(1000) + size uint64 + ) + + for continueDeleteObject { + objs, err := o.ListObjects(ctx, key, "", "", batchSize) + if err != nil { + log.Errorw("DeleteObjectsByPrefix list objects error", "error", err) + return size, err + } + + if len(objs) == 0 { + log.CtxDebugw(ctx, "No object is listed in oss by prefix", "prefix", key) + return 0, nil + } + + // if the object listed here is less than required batch size, meaning it is the last page + if int64(len(objs)) < batchSize { + continueDeleteObject = false + } + + for _, obj := range objs { + objectKeys = append(objectKeys, obj.Key()) + objectKeySizeMap[obj.Key()] = uint64(obj.Size()) + } + deletedObjResults, err := o.bucket.DeleteObjects(objectKeys) + if err != nil { + log.Errorw("DeleteObjectsByPrefix delete objects error", "error", err) + } + for _, deletedObjKey := range deletedObjResults.DeletedObjects { + size += objectKeySizeMap[deletedObjKey] + } + } + return size, nil +} + func (o *ossStore) HeadBucket(ctx context.Context) error { ok, err := o.client.IsBucketExist(o.bucket.BucketName) if !ok { diff --git a/store/piecestore/storage/s3.go b/store/piecestore/storage/s3.go index 050c26db4..0557a7b03 100644 --- a/store/piecestore/storage/s3.go +++ b/store/piecestore/storage/s3.go @@ -140,6 +140,55 @@ func (s *s3Store) DeleteObject(ctx context.Context, key string) error { return err } +func (s *s3Store) DeleteObjectsByPrefix(ctx context.Context, key string) (uint64, error) { + var ( + objectIdentifiers []*s3.ObjectIdentifier + objectKeySizeMap = make(map[string]uint64) + continueDeleteObject = true + batchSize = int64(1000) + size uint64 + ) + + for continueDeleteObject { + objs, err := s.ListObjects(ctx, key, "", "", batchSize) + if err != nil { + log.Errorw("DeleteObjectsByPrefix list objects error", "error", err) + return size, err + } + + if len(objs) == 0 { + log.CtxDebugw(ctx, "No object is listed in s3 by prefix", "prefix", key) + return 0, nil + } + + if int64(len(objs)) < batchSize { + continueDeleteObject = false + } + + for _, obj := range objs { + objKey := obj.Key() + objectIdentifiers = append(objectIdentifiers, &s3.ObjectIdentifier{Key: aws.String(objKey)}) + objectKeySizeMap[obj.Key()] = uint64(obj.Size()) + } + + deleteParams := s3.Delete{Objects: objectIdentifiers} + param := &s3.DeleteObjectsInput{ + Bucket: aws.String(s.bucketName), + Delete: &deleteParams, + } + deleteObjectsOutput, err := s.api.DeleteObjectsWithContext(ctx, param) + if err != nil { + log.Errorw("DeleteObjectsByPrefix delete objects with context error", "error", err) + } + if deleteObjectsOutput != nil { + for _, deletedObj := range deleteObjectsOutput.Deleted { + size += objectKeySizeMap[aws.StringValue(deletedObj.Key)] + } + } + } + return size, nil +} + func (s *s3Store) HeadBucket(ctx context.Context) error { if _, err := s.api.HeadBucketWithContext(ctx, &s3.HeadBucketInput{ Bucket: aws.String(s.bucketName), diff --git a/store/piecestore/storage/sharding.go b/store/piecestore/storage/sharding.go index c2e94d457..16ff5ffc3 100644 --- a/store/piecestore/storage/sharding.go +++ b/store/piecestore/storage/sharding.go @@ -6,6 +6,8 @@ import ( "hash/fnv" "io" "strings" + + "github.com/bnb-chain/greenfield-storage-provider/pkg/log" ) type sharded struct { @@ -63,6 +65,38 @@ func (s *sharded) DeleteObject(ctx context.Context, key string) error { return s.pick(key).DeleteObject(ctx, key) } +func (s *sharded) DeleteObjectsByPrefix(ctx context.Context, key string) (uint64, error) { + var ( + continueDeleteObject = true + batchSize = int64(1000) + size uint64 + ) + + for continueDeleteObject { + // batch list and delete objects + objs, err := s.pick(key).ListObjects(ctx, key, "", "", batchSize) + if err != nil { + log.Errorw("DeleteObjectsByPrefix list objects error", "error", err) + return size, err + } + + // if the object listed here is less than required batch size, meaning it is the last page + if int64(len(objs)) < batchSize { + continueDeleteObject = false + } + + for _, obj := range objs { + err = s.pick(obj.Key()).DeleteObject(ctx, obj.Key()) + if err != nil { + log.Errorw("remove single file by prefix error", "error", err) + } else { + size += uint64(obj.Size()) + } + } + } + return size, nil +} + func (s *sharded) HeadBucket(ctx context.Context) error { for _, o := range s.stores { if err := o.HeadBucket(ctx); err != nil { diff --git a/store/sqldb/object_integrity.go b/store/sqldb/object_integrity.go index f7d91ded4..e08538bf4 100644 --- a/store/sqldb/object_integrity.go +++ b/store/sqldb/object_integrity.go @@ -4,6 +4,7 @@ import ( "encoding/hex" "errors" "fmt" + "math" "sort" "time" @@ -232,10 +233,17 @@ func (s *SpDBImpl) DeleteObjectIntegrity(objectID uint64, redundancyIndex int32) }() shardTableName := GetIntegrityMetasTableName(objectID) - err = s.db.Table(shardTableName).Delete(&IntegrityMetaTable{ - ObjectID: objectID, // should be the primary key - RedundancyIndex: redundancyIndex, - }).Error + if redundancyIndex == math.MaxInt32 { + // delete any integrity meta related with object id + err = s.db.Table(shardTableName).Delete(&IntegrityMetaTable{ + ObjectID: objectID, // should be the primary key + }).Error + } else { + err = s.db.Table(shardTableName).Delete(&IntegrityMetaTable{ + ObjectID: objectID, // should be the primary key + RedundancyIndex: redundancyIndex, + }).Error + } return err }