From 1e0d0bc179316ce4301cf0df77e00715a816cd3b Mon Sep 17 00:00:00 2001 From: walter Date: Wed, 11 Sep 2024 20:06:40 +0800 Subject: [PATCH] Add feature atomic restore (#166) Atomic restore will replace tables instead of update tables inplace so that the read is not affected during the fullsync. --- pkg/ccr/job.go | 37 ++++++++++++++++++++++++++++++------- pkg/rpc/fe.go | 40 +++++++++++++++++++++++++++------------- 2 files changed, 57 insertions(+), 20 deletions(-) diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index dda403c6..98e031f6 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -16,6 +16,7 @@ import ( "github.com/selectdb/ccr_syncer/pkg/ccr/base" "github.com/selectdb/ccr_syncer/pkg/ccr/record" + "github.com/selectdb/ccr_syncer/pkg/rpc" "github.com/selectdb/ccr_syncer/pkg/storage" utils "github.com/selectdb/ccr_syncer/pkg/utils" "github.com/selectdb/ccr_syncer/pkg/xerror" @@ -37,6 +38,7 @@ const ( var ( featureSchemaChangePartialSync bool featureCleanTableAndPartitions bool + featureAtomicRestore bool ) func init() { @@ -46,6 +48,8 @@ func init() { // The default value is false, since clean tables will erase views unexpectedly. flag.BoolVar(&featureCleanTableAndPartitions, "feature_clean_table_and_partitions", false, "clean non restored tables and partitions during fullsync") + flag.BoolVar(&featureAtomicRestore, "feature_atomic_restore", true, + "replace tables in atomic during fullsync (otherwise the dest table will not be able to read).") } type SyncType int @@ -446,8 +450,17 @@ func (j *Job) partialSync() error { tableRefs = append(tableRefs, tableRef) } - cleanPartitions, cleanTables := false, false // DO NOT drop exists tables and partitions - restoreResp, err := destRpc.RestoreSnapshot(dest, tableRefs, restoreSnapshotName, snapshotResp, cleanTables, cleanPartitions) + restoreReq := rpc.RestoreSnapshotRequest{ + TableRefs: tableRefs, + SnapshotName: restoreSnapshotName, + SnapshotResult: snapshotResp, + + // DO NOT drop exists tables and partitions + CleanPartitions: false, + CleanTables: false, + AtomicRestore: false, + } + restoreResp, err := destRpc.RestoreSnapshot(dest, &restoreReq) if err != nil { return err } @@ -671,15 +684,25 @@ func (j *Job) fullSync() error { tableRefs = append(tableRefs, tableRef) } - // drop exists partitions, and drop tables if in db sync. - cleanTables, cleanPartitions := false, false + restoreReq := rpc.RestoreSnapshotRequest{ + TableRefs: tableRefs, + SnapshotName: restoreSnapshotName, + SnapshotResult: snapshotResp, + CleanPartitions: false, + CleanTables: false, + AtomicRestore: false, + } if featureCleanTableAndPartitions { - cleanPartitions = true + // drop exists partitions, and drop tables if in db sync. + restoreReq.CleanPartitions = true if j.SyncType == DBSync { - cleanTables = true + restoreReq.CleanTables = true } } - restoreResp, err := destRpc.RestoreSnapshot(dest, tableRefs, restoreSnapshotName, snapshotResp, cleanTables, cleanPartitions) + if featureAtomicRestore { + restoreReq.AtomicRestore = true + } + restoreResp, err := destRpc.RestoreSnapshot(dest, &restoreReq) if err != nil { return err } diff --git a/pkg/rpc/fe.go b/pkg/rpc/fe.go index 267b6be3..8ca20955 100644 --- a/pkg/rpc/fe.go +++ b/pkg/rpc/fe.go @@ -71,6 +71,15 @@ func canUseNextAddr(err error) bool { return false } +type RestoreSnapshotRequest struct { + TableRefs []*festruct.TTableRef + SnapshotName string + SnapshotResult *festruct.TGetSnapshotResult_ + AtomicRestore bool + CleanPartitions bool + CleanTables bool +} + type IFeRpc interface { BeginTransaction(*base.Spec, string, []int64) (*festruct.TBeginTxnResult_, error) CommitTransaction(*base.Spec, int64, []*festruct_types.TTabletCommitInfo) (*festruct.TCommitTxnResult_, error) @@ -78,7 +87,7 @@ type IFeRpc interface { GetBinlog(*base.Spec, int64) (*festruct.TGetBinlogResult_, error) GetBinlogLag(*base.Spec, int64) (*festruct.TGetBinlogLagResult_, error) GetSnapshot(*base.Spec, string) (*festruct.TGetSnapshotResult_, error) - RestoreSnapshot(*base.Spec, []*festruct.TTableRef, string, *festruct.TGetSnapshotResult_, bool, bool) (*festruct.TRestoreSnapshotResult_, error) + RestoreSnapshot(*base.Spec, *RestoreSnapshotRequest) (*festruct.TRestoreSnapshotResult_, error) GetMasterToken(*base.Spec) (*festruct.TGetMasterTokenResult_, error) GetDbMeta(spec *base.Spec) (*festruct.TGetMetaResult_, error) GetTableMeta(spec *base.Spec, tableIds []int64) (*festruct.TGetMetaResult_, error) @@ -384,10 +393,9 @@ func (rpc *FeRpc) GetSnapshot(spec *base.Spec, labelName string) (*festruct.TGet return convertResult[festruct.TGetSnapshotResult_](result, err) } -func (rpc *FeRpc) RestoreSnapshot(spec *base.Spec, tableRefs []*festruct.TTableRef, label string, snapshotResult *festruct.TGetSnapshotResult_, cleanTables bool, cleanPartitions bool) (*festruct.TRestoreSnapshotResult_, error) { - // return rpc.masterClient.RestoreSnapshot(spec, tableRefs, label, snapshotResult) +func (rpc *FeRpc) RestoreSnapshot(spec *base.Spec, req *RestoreSnapshotRequest) (*festruct.TRestoreSnapshotResult_, error) { caller := func(client IFeRpc) (resultType, error) { - return client.RestoreSnapshot(spec, tableRefs, label, snapshotResult, cleanTables, cleanPartitions) + return client.RestoreSnapshot(spec, req) } result, err := rpc.callWithMasterRedirect(caller) return convertResult[festruct.TRestoreSnapshotResult_](result, err) @@ -661,10 +669,13 @@ func (rpc *singleFeClient) GetSnapshot(spec *base.Spec, labelName string) (*fest // 10: optional map properties // 11: optional binary meta // 12: optional binary job_info +// 13: optional bool clean_tables +// 14: optional bool clean_partitions +// 15: optional bool atomic_restore // } // // Restore Snapshot rpc -func (rpc *singleFeClient) RestoreSnapshot(spec *base.Spec, tableRefs []*festruct.TTableRef, label string, snapshotResult *festruct.TGetSnapshotResult_, cleanTables bool, cleanPartitions bool) (*festruct.TRestoreSnapshotResult_, error) { +func (rpc *singleFeClient) RestoreSnapshot(spec *base.Spec, restoreReq *RestoreSnapshotRequest) (*festruct.TRestoreSnapshotResult_, error) { // NOTE: ignore meta, because it's too large log.Debugf("Call RestoreSnapshot, addr: %s, spec: %s", rpc.Address(), spec) @@ -674,20 +685,23 @@ func (rpc *singleFeClient) RestoreSnapshot(spec *base.Spec, tableRefs []*festruc properties["reserve_replica"] = "true" req := &festruct.TRestoreSnapshotRequest{ Table: &spec.Table, - LabelName: &label, + LabelName: &restoreReq.SnapshotName, RepoName: &repoName, - TableRefs: tableRefs, + TableRefs: restoreReq.TableRefs, Properties: properties, - Meta: snapshotResult.GetMeta(), - JobInfo: snapshotResult.GetJobInfo(), - CleanTables: &cleanTables, - CleanPartitions: &cleanPartitions, + Meta: restoreReq.SnapshotResult.GetMeta(), + JobInfo: restoreReq.SnapshotResult.GetJobInfo(), + CleanTables: &restoreReq.CleanTables, + CleanPartitions: &restoreReq.CleanPartitions, + AtomicRestore: &restoreReq.AtomicRestore, } setAuthInfo(req, spec) // NOTE: ignore meta, because it's too large - log.Debugf("RestoreSnapshotRequest user %s, db %s, table %s, label name %s, properties %v, clean tables: %v, clean partitions: %v", - req.GetUser(), req.GetDb(), req.GetTable(), req.GetLabelName(), properties, cleanTables, cleanPartitions) + log.Debugf("RestoreSnapshotRequest user %s, db %s, table %s, label name %s, properties %v, clean tables: %t, clean partitions: %t, atomic restore: %t", + req.GetUser(), req.GetDb(), req.GetTable(), req.GetLabelName(), properties, + restoreReq.CleanTables, restoreReq.CleanPartitions, restoreReq.AtomicRestore) + if resp, err := client.RestoreSnapshot(context.Background(), req); err != nil { return nil, xerror.Wrapf(err, xerror.RPC, "RestoreSnapshot failed") } else {