Skip to content

Commit

Permalink
opt: make clear logs (selectdb#394)
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter authored and wyxxxcat committed Jan 22, 2025
1 parent 54838a6 commit 005db63
Show file tree
Hide file tree
Showing 11 changed files with 158 additions and 150 deletions.
2 changes: 1 addition & 1 deletion cmd/ccr_syncer/ccr_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func parseConfigFile() error {
continue
}

log.Infof("config %s=%s", key, value)
log.Infof("force set config %s=%s", key, value)
if err := flag.Set(key, value); err != nil {
return fmt.Errorf("set flag key value '%s': %v", line, err)
}
Expand Down
39 changes: 18 additions & 21 deletions pkg/ccr/base/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ func (s *Spec) CheckTablePropertyValid() ([]string, error) {
}

func (s *Spec) IsEnableRestoreSnapshotCompression() (bool, error) {
log.Debugf("check frontend enable restore snapshot compression")
log.Tracef("check frontend enable restore snapshot compression")

db, err := s.Connect()
if err != nil {
Expand Down Expand Up @@ -429,7 +429,7 @@ func (s *Spec) IsEnableRestoreSnapshotCompression() (bool, error) {
}

func (s *Spec) GetAllTables() ([]string, error) {
log.Debugf("get all tables in database %s", s.Database)
log.Tracef("get all tables in database %s", s.Database)

db, err := s.Connect()
if err != nil {
Expand Down Expand Up @@ -496,7 +496,7 @@ func (s *Spec) queryResult(querySQL string, queryColumn string, errMsg string) (
}

func (s *Spec) GetAllViewsFromTable(tableName string) ([]string, error) {
log.Debugf("get all view from table %s", tableName)
log.Tracef("get all view from table %s", tableName)

var results []string
// first, query information_schema.tables with table_schema and table_type, get all views' name
Expand Down Expand Up @@ -635,7 +635,7 @@ func (s *Spec) CreateTableOrView(createTable *record.CreateTable, srcDatabase st
// When create view, the db name of sql is source db name, we should use dest db name to create view
createSql := createTable.Sql
if createTable.IsCreateView() {
log.Debugf("create view, use dest db name to replace source db name")
log.Tracef("create view, use dest db name to replace source db name")

// replace `internal`.`source_db_name`. or `default_cluster:source_db_name`. to `internal`.`dest_db_name`.
originalNameNewStyle := "`internal`.`" + strings.TrimSpace(srcDatabase) + "`."
Expand Down Expand Up @@ -733,7 +733,7 @@ func (s *Spec) CheckTableExistsByName(tableName string) (bool, error) {
}

func (s *Spec) CancelRestoreIfExists(snapshotName string) error {
log.Debugf("cancel restore %s, db name: %s", snapshotName, s.Database)
log.Tracef("cancel restore %s, db name: %s", snapshotName, s.Database)

db, err := s.Connect()
if err != nil {
Expand Down Expand Up @@ -806,11 +806,6 @@ func (s *Spec) CreatePartialSnapshot(snapshotName, table string, partitions []st
return xerror.Errorf(xerror.Normal, "source db is empty! you should have at least one table")
}

// table refs = table
tableRef := utils.FormatKeywordName(table)

log.Infof("create partial snapshot %s.%s", s.Database, snapshotName)

db, err := s.Connect()
if err != nil {
return err
Expand All @@ -820,10 +815,12 @@ func (s *Spec) CreatePartialSnapshot(snapshotName, table string, partitions []st
if len(partitions) > 0 {
partitionRefs = " PARTITION (`" + strings.Join(partitions, "`,`") + "`)"
}
tableRef := utils.FormatKeywordName(table)
backupSnapshotSql := fmt.Sprintf(
"BACKUP SNAPSHOT %s.%s TO `__keep_on_local__` ON (%s%s) PROPERTIES (\"type\" = \"full\")",
utils.FormatKeywordName(s.Database), snapshotName, tableRef, partitionRefs)
log.Debugf("backup partial snapshot sql: %s", backupSnapshotSql)
log.Infof("create partial snapshot %s.%s, backup snapshot sql: %s",
s.Database, snapshotName, backupSnapshotSql)
_, err = db.Exec(backupSnapshotSql)
if err != nil {
if strings.Contains(err.Error(), "Unknown table") {
Expand All @@ -840,7 +837,7 @@ func (s *Spec) CreatePartialSnapshot(snapshotName, table string, partitions []st

// TODO: Add TaskErrMsg
func (s *Spec) checkBackupFinished(snapshotName string) (BackupState, string, error) {
log.Debugf("check backup state of snapshot %s", snapshotName)
log.Tracef("check backup state of snapshot %s", snapshotName)

db, err := s.Connect()
if err != nil {
Expand Down Expand Up @@ -878,7 +875,7 @@ func (s *Spec) checkBackupFinished(snapshotName string) (BackupState, string, er
}

func (s *Spec) CheckBackupFinished(snapshotName string) (bool, error) {
log.Debugf("check backup state, spec: %s, snapshot: %s", s.String(), snapshotName)
log.Tracef("check backup state, spec: %s, snapshot: %s", s.String(), snapshotName)

// Retry network related error to avoid full sync when the target network is interrupted, process is restarted.
if backupState, status, err := s.checkBackupFinished(snapshotName); err != nil && !isNetworkRelated(err) {
Expand All @@ -899,7 +896,7 @@ func (s *Spec) CheckBackupFinished(snapshotName string) (bool, error) {
// Get the valid (running or finished) backup job with a unique prefix to indicate
// if a backup job needs to be issued again.
func (s *Spec) GetValidBackupJob(snapshotNamePrefix string) (string, error) {
log.Debugf("get valid backup job if exists, database: %s, label prefix: %s", s.Database, snapshotNamePrefix)
log.Tracef("get valid backup job if exists, database: %s, label prefix: %s", s.Database, snapshotNamePrefix)

db, err := s.Connect()
if err != nil {
Expand All @@ -908,7 +905,7 @@ func (s *Spec) GetValidBackupJob(snapshotNamePrefix string) (string, error) {

query := fmt.Sprintf("SHOW BACKUP FROM %s WHERE SnapshotName LIKE \"%s%%\"",
utils.FormatKeywordName(s.Database), snapshotNamePrefix)
log.Infof("show backup state sql: %s", query)
log.Debugf("show backup state sql: %s", query)
rows, err := db.Query(query)
if err != nil {
return "", xerror.Wrap(err, xerror.Normal, "query backup state failed")
Expand Down Expand Up @@ -952,7 +949,7 @@ func (s *Spec) GetValidBackupJob(snapshotNamePrefix string) (string, error) {
// Get the valid (running or finished) restore job with a unique prefix to indicate
// if a restore job needs to be issued again.
func (s *Spec) GetValidRestoreJob(snapshotNamePrefix string) (string, error) {
log.Debugf("get valid restore job if exists, label prefix: %s", snapshotNamePrefix)
log.Tracef("get valid restore job if exists, label prefix: %s", snapshotNamePrefix)

db, err := s.Connect()
if err != nil {
Expand All @@ -961,7 +958,7 @@ func (s *Spec) GetValidRestoreJob(snapshotNamePrefix string) (string, error) {

query := fmt.Sprintf("SHOW RESTORE FROM %s WHERE Label LIKE \"%s%%\"",
utils.FormatKeywordName(s.Database), snapshotNamePrefix)
log.Infof("show restore state sql: %s", query)
log.Debugf("show restore state sql: %s", query)
rows, err := db.Query(query)
if err != nil {
return "", xerror.Wrap(err, xerror.Normal, "query restore state failed")
Expand Down Expand Up @@ -1039,7 +1036,7 @@ func (s *Spec) queryRestoreInfo(db *sql.DB, snapshotName string) (*RestoreInfo,
}

func (s *Spec) checkRestoreFinished(snapshotName string) (RestoreState, string, error) {
log.Debugf("check restore state %s", snapshotName)
log.Tracef("check restore state %s", snapshotName)

db, err := s.Connect()
if err != nil {
Expand All @@ -1059,7 +1056,7 @@ func (s *Spec) checkRestoreFinished(snapshotName string) (RestoreState, string,
}

func (s *Spec) CheckRestoreFinished(snapshotName string) (bool, error) {
log.Debugf("check restore state is finished, spec: %s, snapshot: %s", s.String(), snapshotName)
log.Tracef("check restore state is finished, spec: %s, snapshot: %s", s.String(), snapshotName)

// Retry network related error to avoid full sync when the target network is interrupted, process is restarted.
if restoreState, status, err := s.checkRestoreFinished(snapshotName); err != nil && !isNetworkRelated(err) {
Expand All @@ -1080,7 +1077,7 @@ func (s *Spec) CheckRestoreFinished(snapshotName string) (bool, error) {
}

func (s *Spec) GetRestoreSignatureNotMatchedTableOrView(snapshotName string) (string, bool, error) {
log.Debugf("get restore signature not matched table, spec: %s, snapshot: %s", s.String(), snapshotName)
log.Tracef("get restore signature not matched table, spec: %s, snapshot: %s", s.String(), snapshotName)

for i := 0; i < MAX_CHECK_RETRY_TIMES; i++ {
if restoreState, status, err := s.checkRestoreFinished(snapshotName); err != nil {
Expand Down Expand Up @@ -1226,7 +1223,7 @@ func (s *Spec) Update(event SpecEvent) {
}

func (s *Spec) LightningSchemaChange(srcDatabase, tableAlias string, lightningSchemaChange *record.ModifyTableAddOrDropColumns) error {
log.Debugf("lightningSchemaChange %v", lightningSchemaChange)
log.Tracef("lighting schema change %v", lightningSchemaChange)

rawSql := lightningSchemaChange.RawSql

Expand Down
40 changes: 20 additions & 20 deletions pkg/ccr/ingest_binlog_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,11 @@ type tabletIngestBinlogHandler struct {
// handle Replica
func (h *tabletIngestBinlogHandler) handleReplica(srcReplica, destReplica *ReplicaMeta) bool {
destReplicaId := destReplica.Id
log.Debugf("handle dest replica id: %d", destReplicaId)
log.Tracef("txn %d tablet ingest binlog: handle dest replica id: %d, dest tablet id %d",
h.ingestJob.txnId, destReplicaId, h.destTablet.Id)

if h.cancel.Load() {
log.Infof("job canceled, replica id: %d", destReplicaId)
log.Infof("txn %d job canceled, replica id: %d", h.ingestJob.txnId, destReplicaId)
return true
}

Expand Down Expand Up @@ -188,7 +189,7 @@ func (h *tabletIngestBinlogHandler) handleReplica(srcReplica, destReplica *Repli
return
}

log.Debugf("ingest resp: %v", resp)
log.Tracef("txn %d tablet ingest binlog resp: %v", j.txnId, resp)
if !resp.IsSetStatus() {
err = xerror.Errorf(xerror.BE, "ingest resp status not set, req: %+v", req)
j.setError(err)
Expand All @@ -211,7 +212,8 @@ func (h *tabletIngestBinlogHandler) handleReplica(srcReplica, destReplica *Repli
}

func (h *tabletIngestBinlogHandler) handle() {
log.Debugf("handle tablet ingest binlog, src tablet id: %d, dest tablet id: %d", h.srcTablet.Id, h.destTablet.Id)
log.Tracef("txn %d, tablet ingest binlog, src tablet id: %d, dest tablet id: %d, total %d replicas",
h.ingestJob.txnId, h.srcTablet.Id, h.destTablet.Id, h.srcTablet.ReplicaMetas.Len())

// all src replicas version > binlogVersion
srcReplicas := make([]*ReplicaMeta, 0, h.srcTablet.ReplicaMetas.Len())
Expand Down Expand Up @@ -365,10 +367,10 @@ type prepareIndexArg struct {
}

func (j *IngestBinlogJob) prepareIndex(arg *prepareIndexArg) {
log.Debugf("prepareIndex: %v", arg)
log.Tracef("txn %d ingest binlog: prepare index %s, src %d, dest %d",
j.txnId, arg.srcIndexMeta.Name, arg.srcIndexMeta.Id, arg.destIndexMeta.Id)

// Step 1: check tablets
log.Debugf("arg %+v", arg)
srcTablets, err := j.srcMeta.GetTablets(arg.srcTableId, arg.srcPartitionId, arg.srcIndexMeta.Id)
if err != nil {
j.setError(err)
Expand All @@ -387,7 +389,7 @@ func (j *IngestBinlogJob) prepareIndex(arg *prepareIndexArg) {
}

if srcTablets.Len() == 0 {
log.Warn("src tablets length: 0, skip")
log.Warnf("txn %d ingest binlog: src tablets length: 0, skip", j.txnId)
return
}

Expand Down Expand Up @@ -430,7 +432,7 @@ func (j *IngestBinlogJob) prepareIndex(arg *prepareIndexArg) {
}

func (j *IngestBinlogJob) preparePartition(srcTableId, destTableId int64, partitionRecord record.PartitionRecord, indexIds []int64) {
log.Debugf("partitionRecord: %v", partitionRecord)
log.Tracef("txn %d ingest binlog: prepare partition: %v", j.txnId, partitionRecord)
// 废弃 preparePartition, 上面index的那部分是这里的实现
// 还是要求一下和下游对齐的index length,这个是不可以recover的
// 思考那些是recover用的,主要就是tablet那块的
Expand Down Expand Up @@ -491,7 +493,6 @@ func (j *IngestBinlogJob) preparePartition(srcTableId, destTableId int64, partit
}

srcIndexName := getSrcIndexName(job, srcIndexMeta)
log.Debugf("src idx id %d, name %s", indexId, srcIndexName)
if _, ok := destIndexNameMap[srcIndexName]; !ok {
j.setError(xerror.Errorf(xerror.Meta,
"index name %v not found in dest meta, is base index: %t, src index id: %d",
Expand All @@ -511,12 +512,12 @@ func (j *IngestBinlogJob) preparePartition(srcTableId, destTableId int64, partit
}
for _, indexId := range indexIds {
if j.srcMeta.IsIndexDropped(indexId) {
log.Infof("skip the dropped index %d", indexId)
log.Infof("txn %d ingest binlog: skip the dropped index %d", j.txnId, indexId)
continue
}
if featureFilterShadowIndexesUpsert {
if _, ok := j.ccrJob.progress.ShadowIndexes[indexId]; ok {
log.Infof("skip the shadow index %d", indexId)
log.Infof("txn %d ingest binlog: skip the shadow index %d", j.txnId, indexId)
continue
}
}
Expand All @@ -530,9 +531,9 @@ func (j *IngestBinlogJob) preparePartition(srcTableId, destTableId int64, partit
}

func (j *IngestBinlogJob) prepareTable(tableRecord *record.TableRecord) {
log.Debugf("tableRecord: %v", tableRecord)
log.Tracef("txn %d ingest binlog: prepare table: %d", j.txnId, tableRecord.Id)
if j.srcMeta.IsTableDropped(tableRecord.Id) {
log.Infof("skip the dropped table %d", tableRecord.Id)
log.Infof("txn %d ingest binlog: skip the dropped table %d", j.txnId, tableRecord.Id)
return
}

Expand Down Expand Up @@ -604,16 +605,16 @@ func (j *IngestBinlogJob) prepareTable(tableRecord *record.TableRecord) {
continue
}
if j.srcMeta.IsPartitionDropped(partitionRecord.Id) {
log.Infof("skip the dropped partition %d, range: %s, version: %d",
partitionRecord.Id, partitionRecord.Range, partitionRecord.Version)
log.Infof("txn %d skip the dropped partition %d, range: %s, version: %d",
j.txnId, partitionRecord.Id, partitionRecord.Range, partitionRecord.Version)
continue
}
j.preparePartition(srcTableId, destTableId, partitionRecord, tableRecord.IndexIds)
}
}

func (j *IngestBinlogJob) prepareBackendMap() {
log.Debug("prepareBackendMap")
log.Tracef("txn %d ingest binlog: prepare backend map", j.txnId)

var err error
j.srcBackendMap, err = j.srcMeta.GetBackendMap()
Expand All @@ -630,7 +631,7 @@ func (j *IngestBinlogJob) prepareBackendMap() {
}

func (j *IngestBinlogJob) prepareTabletIngestJobs() {
log.Debugf("prepareTabletIngestJobs, table length: %d", len(j.tableRecords))
log.Tracef("txn %d ingest binlog: prepare tablet ingest jobs, table length: %d", j.txnId, len(j.tableRecords))

j.tabletIngestJobs = make([]*tabletIngestBinlogHandler, 0)
for _, tableRecord := range j.tableRecords {
Expand All @@ -642,8 +643,7 @@ func (j *IngestBinlogJob) prepareTabletIngestJobs() {
}

func (j *IngestBinlogJob) runTabletIngestJobs() {
log.Debugf("runTabletIngestJobs, job length: %d", len(j.tabletIngestJobs))

log.Infof("txn %d ingest binlog: run %d tablet ingest jobs", j.txnId, len(j.tabletIngestJobs))
for _, tabletIngestJob := range j.tabletIngestJobs {
j.wg.Add(1)
go func(tabletIngestJob *tabletIngestBinlogHandler) {
Expand All @@ -655,7 +655,7 @@ func (j *IngestBinlogJob) runTabletIngestJobs() {
}

func (j *IngestBinlogJob) prepareMeta() {
log.Debug("prepareMeta")
log.Tracef("txn %d ingest binlog: prepare meta with %d table records", j.txnId, len(j.tableRecords))
srcTableIds := make([]int64, 0, len(j.tableRecords))
job := j.ccrJob
factory := j.factory
Expand Down
Loading

0 comments on commit 005db63

Please sign in to comment.