diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index e4b71e46..cfaaca73 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -66,6 +66,7 @@ var ( featureSkipRollupBinlogs bool featureTxnInsert bool featureFilterStorageMedium bool + featureRestoreReplaceDiffSchema bool ErrMaterializedViewTable = xerror.NewWithoutStack(xerror.Meta, "Not support table type: materialized view") ) @@ -95,6 +96,8 @@ func init() { "enable txn insert support") flag.BoolVar(&featureFilterStorageMedium, "feature_filter_storage_medium", true, "enable filter storage medium property") + flag.BoolVar(&featureRestoreReplaceDiffSchema, "feature_restore_replace_diff_schema", true, + "replace the table with different schema during restore") } type SyncType int @@ -1057,6 +1060,9 @@ func (j *Job) fullSync() error { if featureAtomicRestore { restoreReq.AtomicRestore = true } + if featureRestoreReplaceDiffSchema { + restoreReq.ForceReplace = true + } restoreResp, err := destRpc.RestoreSnapshot(dest, &restoreReq) if err != nil { return err diff --git a/pkg/rpc/fe.go b/pkg/rpc/fe.go index b7ae4f59..2fede107 100644 --- a/pkg/rpc/fe.go +++ b/pkg/rpc/fe.go @@ -95,6 +95,7 @@ type RestoreSnapshotRequest struct { CleanPartitions bool CleanTables bool Compress bool + ForceReplace bool } type IFeRpc interface { @@ -800,14 +801,15 @@ func (rpc *singleFeClient) RestoreSnapshot(spec *base.Spec, restoreReq *RestoreS CleanPartitions: &restoreReq.CleanPartitions, AtomicRestore: &restoreReq.AtomicRestore, Compressed: utils.ThriftValueWrapper(restoreReq.Compress), + ForceReplace: &restoreReq.ForceReplace, } setAuthInfo(req, spec) // NOTE: ignore meta, because it's too large - log.Debugf("RestoreSnapshotRequest user %s, db %s, table %s, label name %s, properties %v, clean tables: %t, clean partitions: %t, atomic restore: %t, compressed: %t", + log.Debugf("RestoreSnapshotRequest user %s, db %s, table %s, label name %s, properties %v, clean tables: %t, clean partitions: %t, atomic restore: %t, compressed: %t, forceReplace: %t", req.GetUser(), req.GetDb(), req.GetTable(), req.GetLabelName(), properties, restoreReq.CleanTables, restoreReq.CleanPartitions, restoreReq.AtomicRestore, - req.GetCompressed()) + req.GetCompressed(), restoreReq.ForceReplace) if resp, err := client.RestoreSnapshot(context.Background(), req); err != nil { return nil, xerror.Wrapf(err, xerror.RPC, "RestoreSnapshot failed") diff --git a/pkg/rpc/kitex_gen/frontendservice/FrontendService.go b/pkg/rpc/kitex_gen/frontendservice/FrontendService.go index 159b689c..af554df5 100644 --- a/pkg/rpc/kitex_gen/frontendservice/FrontendService.go +++ b/pkg/rpc/kitex_gen/frontendservice/FrontendService.go @@ -57603,6 +57603,7 @@ type TRestoreSnapshotRequest struct { CleanPartitions *bool `thrift:"clean_partitions,14,optional" frugal:"14,optional,bool" json:"clean_partitions,omitempty"` AtomicRestore *bool `thrift:"atomic_restore,15,optional" frugal:"15,optional,bool" json:"atomic_restore,omitempty"` Compressed *bool `thrift:"compressed,16,optional" frugal:"16,optional,bool" json:"compressed,omitempty"` + ForceReplace *bool `thrift:"force_replace,17,optional" frugal:"17,optional,bool" json:"force_replace,omitempty"` } func NewTRestoreSnapshotRequest() *TRestoreSnapshotRequest { @@ -57755,6 +57756,15 @@ func (p *TRestoreSnapshotRequest) GetCompressed() (v bool) { } return *p.Compressed } + +var TRestoreSnapshotRequest_ForceReplace_DEFAULT bool + +func (p *TRestoreSnapshotRequest) GetForceReplace() (v bool) { + if !p.IsSetForceReplace() { + return TRestoreSnapshotRequest_ForceReplace_DEFAULT + } + return *p.ForceReplace +} func (p *TRestoreSnapshotRequest) SetCluster(val *string) { p.Cluster = val } @@ -57803,6 +57813,9 @@ func (p *TRestoreSnapshotRequest) SetAtomicRestore(val *bool) { func (p *TRestoreSnapshotRequest) SetCompressed(val *bool) { p.Compressed = val } +func (p *TRestoreSnapshotRequest) SetForceReplace(val *bool) { + p.ForceReplace = val +} var fieldIDToName_TRestoreSnapshotRequest = map[int16]string{ 1: "cluster", @@ -57821,6 +57834,7 @@ var fieldIDToName_TRestoreSnapshotRequest = map[int16]string{ 14: "clean_partitions", 15: "atomic_restore", 16: "compressed", + 17: "force_replace", } func (p *TRestoreSnapshotRequest) IsSetCluster() bool { @@ -57887,6 +57901,10 @@ func (p *TRestoreSnapshotRequest) IsSetCompressed() bool { return p.Compressed != nil } +func (p *TRestoreSnapshotRequest) IsSetForceReplace() bool { + return p.ForceReplace != nil +} + func (p *TRestoreSnapshotRequest) Read(iprot thrift.TProtocol) (err error) { var fieldTypeId thrift.TType @@ -58034,6 +58052,14 @@ func (p *TRestoreSnapshotRequest) Read(iprot thrift.TProtocol) (err error) { } else if err = iprot.Skip(fieldTypeId); err != nil { goto SkipFieldError } + case 17: + if fieldTypeId == thrift.BOOL { + if err = p.ReadField17(iprot); err != nil { + goto ReadFieldError + } + } else if err = iprot.Skip(fieldTypeId); err != nil { + goto SkipFieldError + } default: if err = iprot.Skip(fieldTypeId); err != nil { goto SkipFieldError @@ -58269,6 +58295,17 @@ func (p *TRestoreSnapshotRequest) ReadField16(iprot thrift.TProtocol) error { p.Compressed = _field return nil } +func (p *TRestoreSnapshotRequest) ReadField17(iprot thrift.TProtocol) error { + + var _field *bool + if v, err := iprot.ReadBool(); err != nil { + return err + } else { + _field = &v + } + p.ForceReplace = _field + return nil +} func (p *TRestoreSnapshotRequest) Write(oprot thrift.TProtocol) (err error) { var fieldId int16 @@ -58340,6 +58377,10 @@ func (p *TRestoreSnapshotRequest) Write(oprot thrift.TProtocol) (err error) { fieldId = 16 goto WriteFieldError } + if err = p.writeField17(oprot); err != nil { + fieldId = 17 + goto WriteFieldError + } } if err = oprot.WriteFieldStop(); err != nil { goto WriteFieldStopError @@ -58681,6 +58722,25 @@ WriteFieldEndError: return thrift.PrependError(fmt.Sprintf("%T write field 16 end error: ", p), err) } +func (p *TRestoreSnapshotRequest) writeField17(oprot thrift.TProtocol) (err error) { + if p.IsSetForceReplace() { + if err = oprot.WriteFieldBegin("force_replace", thrift.BOOL, 17); err != nil { + goto WriteFieldBeginError + } + if err := oprot.WriteBool(*p.ForceReplace); err != nil { + return err + } + if err = oprot.WriteFieldEnd(); err != nil { + goto WriteFieldEndError + } + } + return nil +WriteFieldBeginError: + return thrift.PrependError(fmt.Sprintf("%T write field 17 begin error: ", p), err) +WriteFieldEndError: + return thrift.PrependError(fmt.Sprintf("%T write field 17 end error: ", p), err) +} + func (p *TRestoreSnapshotRequest) String() string { if p == nil { return "" @@ -58743,6 +58803,9 @@ func (p *TRestoreSnapshotRequest) DeepEqual(ano *TRestoreSnapshotRequest) bool { if !p.Field16DeepEqual(ano.Compressed) { return false } + if !p.Field17DeepEqual(ano.ForceReplace) { + return false + } return true } @@ -58930,6 +58993,18 @@ func (p *TRestoreSnapshotRequest) Field16DeepEqual(src *bool) bool { } return true } +func (p *TRestoreSnapshotRequest) Field17DeepEqual(src *bool) bool { + + if p.ForceReplace == src { + return true + } else if p.ForceReplace == nil || src == nil { + return false + } + if *p.ForceReplace != *src { + return false + } + return true +} type TRestoreSnapshotResult_ struct { Status *status.TStatus `thrift:"status,1,optional" frugal:"1,optional,status.TStatus" json:"status,omitempty"` diff --git a/pkg/rpc/kitex_gen/frontendservice/k-FrontendService.go b/pkg/rpc/kitex_gen/frontendservice/k-FrontendService.go index aa1acef3..22f95d15 100644 --- a/pkg/rpc/kitex_gen/frontendservice/k-FrontendService.go +++ b/pkg/rpc/kitex_gen/frontendservice/k-FrontendService.go @@ -42106,6 +42106,20 @@ func (p *TRestoreSnapshotRequest) FastRead(buf []byte) (int, error) { goto SkipFieldError } } + case 17: + if fieldTypeId == thrift.BOOL { + l, err = p.FastReadField17(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldError + } + } else { + l, err = bthrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } default: l, err = bthrift.Binary.Skip(buf[offset:], fieldTypeId) offset += l @@ -42392,6 +42406,19 @@ func (p *TRestoreSnapshotRequest) FastReadField16(buf []byte) (int, error) { return offset, nil } +func (p *TRestoreSnapshotRequest) FastReadField17(buf []byte) (int, error) { + offset := 0 + + if v, l, err := bthrift.Binary.ReadBool(buf[offset:]); err != nil { + return offset, err + } else { + offset += l + p.ForceReplace = &v + + } + return offset, nil +} + // for compatibility func (p *TRestoreSnapshotRequest) FastWrite(buf []byte) int { return 0 @@ -42405,6 +42432,7 @@ func (p *TRestoreSnapshotRequest) FastWriteNocopy(buf []byte, binaryWriter bthri offset += p.fastWriteField14(buf[offset:], binaryWriter) offset += p.fastWriteField15(buf[offset:], binaryWriter) offset += p.fastWriteField16(buf[offset:], binaryWriter) + offset += p.fastWriteField17(buf[offset:], binaryWriter) offset += p.fastWriteField1(buf[offset:], binaryWriter) offset += p.fastWriteField2(buf[offset:], binaryWriter) offset += p.fastWriteField3(buf[offset:], binaryWriter) @@ -42443,6 +42471,7 @@ func (p *TRestoreSnapshotRequest) BLength() int { l += p.field14Length() l += p.field15Length() l += p.field16Length() + l += p.field17Length() } l += bthrift.Binary.FieldStopLength() l += bthrift.Binary.StructEndLength() @@ -42643,6 +42672,17 @@ func (p *TRestoreSnapshotRequest) fastWriteField16(buf []byte, binaryWriter bthr return offset } +func (p *TRestoreSnapshotRequest) fastWriteField17(buf []byte, binaryWriter bthrift.BinaryWriter) int { + offset := 0 + if p.IsSetForceReplace() { + offset += bthrift.Binary.WriteFieldBegin(buf[offset:], "force_replace", thrift.BOOL, 17) + offset += bthrift.Binary.WriteBool(buf[offset:], *p.ForceReplace) + + offset += bthrift.Binary.WriteFieldEnd(buf[offset:]) + } + return offset +} + func (p *TRestoreSnapshotRequest) field1Length() int { l := 0 if p.IsSetCluster() { @@ -42829,6 +42869,17 @@ func (p *TRestoreSnapshotRequest) field16Length() int { return l } +func (p *TRestoreSnapshotRequest) field17Length() int { + l := 0 + if p.IsSetForceReplace() { + l += bthrift.Binary.FieldBeginLength("force_replace", thrift.BOOL, 17) + l += bthrift.Binary.BoolLength(*p.ForceReplace) + + l += bthrift.Binary.FieldEndLength() + } + return l +} + func (p *TRestoreSnapshotResult_) FastRead(buf []byte) (int, error) { var err error var offset int diff --git a/pkg/rpc/thrift/FrontendService.thrift b/pkg/rpc/thrift/FrontendService.thrift index c1a4d106..bc9537dc 100644 --- a/pkg/rpc/thrift/FrontendService.thrift +++ b/pkg/rpc/thrift/FrontendService.thrift @@ -1388,6 +1388,7 @@ struct TRestoreSnapshotRequest { 14: optional bool clean_partitions 15: optional bool atomic_restore 16: optional bool compressed; + 17: optional bool force_replace } struct TRestoreSnapshotResult { diff --git a/regression-test/suites/db_sync/table/diff_schema/test_ds_tbl_diff_schema.groovy b/regression-test/suites/db_sync/table/diff_schema/test_ds_tbl_diff_schema.groovy new file mode 100644 index 00000000..cf64b7f2 --- /dev/null +++ b/regression-test/suites/db_sync/table/diff_schema/test_ds_tbl_diff_schema.groovy @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_ds_tbl_diff_schema") { + def helper = new GroovyShell(new Binding(['suite': delegate])) + .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + + def dbName = context.dbName + def tableName = "tbl_" + helper.randomSuffix() + + def exist = { res -> Boolean + return res.size() != 0 + } + + def notExist = { res -> Boolean + return res.size() == 0 + } + + sql "DROP TABLE IF EXISTS ${tableName}" + target_sql "DROP TABLE IF EXISTS ${tableName}" + + sql """ + CREATE TABLE if NOT EXISTS ${tableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + PARTITION BY RANGE(`test`, `id`) + ( + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + target_sql """ + CREATE TABLE if NOT EXISTS ${tableName} + ( + `test` INT, + `id` INT + ) + ENGINE=OLAP + PARTITION BY LIST(`test`, `id`) + ( + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ) + """ + + helper.ccrJobDelete() + helper.ccrJobCreate() + + assertTrue(helper.checkRestoreFinishTimesOf("${tableName}", 30)) + + assertTrue(helper.checkShowTimesOf("SHOW TABLES LIKE \"${tableName}\"", exist, 60, "sql")) + + assertTrue(helper.checkShowTimesOf("SHOW TABLES LIKE \"${tableName}\"", exist, 60, "target")) +} \ No newline at end of file