Skip to content

Commit

Permalink
Filter dropped tables (selectdb#123)
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter authored Jul 18, 2024
1 parent f371b7f commit 8088311
Show file tree
Hide file tree
Showing 21 changed files with 2,582 additions and 30 deletions.
4 changes: 4 additions & 0 deletions pkg/ccr/ingest_binlog_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,10 @@ func (j *IngestBinlogJob) preparePartition(srcTableId, destTableId int64, partit

func (j *IngestBinlogJob) prepareTable(tableRecord *record.TableRecord) {
log.Debugf("tableRecord: %v", tableRecord)
if j.srcMeta.IsTableDropped(tableRecord.Id) {
log.Infof("skip the dropped table %d", tableRecord.Id)
return
}

if len(tableRecord.PartitionRecords) == 0 {
j.setError(xerror.Errorf(xerror.Meta, "partition records is empty"))
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccr/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -1168,3 +1168,7 @@ func (m *Meta) ClearTable(dbName string, tableName string) {
func (m *Meta) IsPartitionDropped(partitionId int64) bool {
panic("IsPartitionDropped is not supported, please use ThriftMeta instead")
}

func (m *Meta) IsTableDropped(partitionId int64) bool {
panic("IsTableDropped is not supported, please use ThriftMeta instead")
}
1 change: 1 addition & 0 deletions pkg/ccr/metaer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type IngestBinlogMetaer interface {
GetIndexNameMap(tableId, partitionId int64) (map[string]*IndexMeta, error)
GetBackendMap() (map[int64]*base.Backend, error)
IsPartitionDropped(partitionId int64) bool
IsTableDropped(tableId int64) bool
}

type Metaer interface {
Expand Down
12 changes: 12 additions & 0 deletions pkg/ccr/thrift_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,16 +132,22 @@ func NewThriftMeta(spec *base.Spec, rpcFactory rpc.IRpcFactory, tableIds []int64
for _, partition := range dbMeta.GetDroppedPartitions() {
droppedPartitions[partition] = struct{}{}
}
droppedTables := make(map[int64]struct{})
for _, table := range dbMeta.GetDroppedTables() {
droppedTables[table] = struct{}{}
}

return &ThriftMeta{
meta: meta,
droppedPartitions: droppedPartitions,
droppedTables: droppedTables,
}, nil
}

type ThriftMeta struct {
meta *Meta
droppedPartitions map[int64]struct{}
droppedTables map[int64]struct{}
}

func (tm *ThriftMeta) GetTablets(tableId, partitionId, indexId int64) (*btree.Map[int64, *TabletMeta], error) {
Expand Down Expand Up @@ -233,3 +239,9 @@ func (tm *ThriftMeta) IsPartitionDropped(partitionId int64) bool {
_, ok := tm.droppedPartitions[partitionId]
return ok
}

// Whether the target table are dropped
func (tm *ThriftMeta) IsTableDropped(tableId int64) bool {
_, ok := tm.droppedTables[tableId]
return ok
}
Loading

0 comments on commit 8088311

Please sign in to comment.