diff --git a/pkg/rpc/kitex_gen/frontendservice/FrontendService.go b/pkg/rpc/kitex_gen/frontendservice/FrontendService.go index 159b689c..984920b9 100644 --- a/pkg/rpc/kitex_gen/frontendservice/FrontendService.go +++ b/pkg/rpc/kitex_gen/frontendservice/FrontendService.go @@ -62575,9 +62575,13 @@ func (p *TGetMasterTokenResult_) Field3DeepEqual(src *types.TNetworkAddress) boo } type TGetBinlogLagResult_ struct { - Status *status.TStatus `thrift:"status,1,optional" frugal:"1,optional,status.TStatus" json:"status,omitempty"` - Lag *int64 `thrift:"lag,2,optional" frugal:"2,optional,i64" json:"lag,omitempty"` - MasterAddress *types.TNetworkAddress `thrift:"master_address,3,optional" frugal:"3,optional,types.TNetworkAddress" json:"master_address,omitempty"` + Status *status.TStatus `thrift:"status,1,optional" frugal:"1,optional,status.TStatus" json:"status,omitempty"` + Lag *int64 `thrift:"lag,2,optional" frugal:"2,optional,i64" json:"lag,omitempty"` + MasterAddress *types.TNetworkAddress `thrift:"master_address,3,optional" frugal:"3,optional,types.TNetworkAddress" json:"master_address,omitempty"` + FirstCommitSeq *int64 `thrift:"first_commit_seq,4,optional" frugal:"4,optional,i64" json:"first_commit_seq,omitempty"` + LastCommitSeq *int64 `thrift:"last_commit_seq,5,optional" frugal:"5,optional,i64" json:"last_commit_seq,omitempty"` + FirstBinlogTimestamp *int64 `thrift:"first_binlog_timestamp,6,optional" frugal:"6,optional,i64" json:"first_binlog_timestamp,omitempty"` + LastBinlogTimestamp *int64 `thrift:"last_binlog_timestamp,7,optional" frugal:"7,optional,i64" json:"last_binlog_timestamp,omitempty"` } func NewTGetBinlogLagResult_() *TGetBinlogLagResult_ { @@ -62613,6 +62617,42 @@ func (p *TGetBinlogLagResult_) GetMasterAddress() (v *types.TNetworkAddress) { } return p.MasterAddress } + +var TGetBinlogLagResult__FirstCommitSeq_DEFAULT int64 + +func (p *TGetBinlogLagResult_) GetFirstCommitSeq() (v int64) { + if !p.IsSetFirstCommitSeq() { + return TGetBinlogLagResult__FirstCommitSeq_DEFAULT + } + return *p.FirstCommitSeq +} + +var TGetBinlogLagResult__LastCommitSeq_DEFAULT int64 + +func (p *TGetBinlogLagResult_) GetLastCommitSeq() (v int64) { + if !p.IsSetLastCommitSeq() { + return TGetBinlogLagResult__LastCommitSeq_DEFAULT + } + return *p.LastCommitSeq +} + +var TGetBinlogLagResult__FirstBinlogTimestamp_DEFAULT int64 + +func (p *TGetBinlogLagResult_) GetFirstBinlogTimestamp() (v int64) { + if !p.IsSetFirstBinlogTimestamp() { + return TGetBinlogLagResult__FirstBinlogTimestamp_DEFAULT + } + return *p.FirstBinlogTimestamp +} + +var TGetBinlogLagResult__LastBinlogTimestamp_DEFAULT int64 + +func (p *TGetBinlogLagResult_) GetLastBinlogTimestamp() (v int64) { + if !p.IsSetLastBinlogTimestamp() { + return TGetBinlogLagResult__LastBinlogTimestamp_DEFAULT + } + return *p.LastBinlogTimestamp +} func (p *TGetBinlogLagResult_) SetStatus(val *status.TStatus) { p.Status = val } @@ -62622,11 +62662,27 @@ func (p *TGetBinlogLagResult_) SetLag(val *int64) { func (p *TGetBinlogLagResult_) SetMasterAddress(val *types.TNetworkAddress) { p.MasterAddress = val } +func (p *TGetBinlogLagResult_) SetFirstCommitSeq(val *int64) { + p.FirstCommitSeq = val +} +func (p *TGetBinlogLagResult_) SetLastCommitSeq(val *int64) { + p.LastCommitSeq = val +} +func (p *TGetBinlogLagResult_) SetFirstBinlogTimestamp(val *int64) { + p.FirstBinlogTimestamp = val +} +func (p *TGetBinlogLagResult_) SetLastBinlogTimestamp(val *int64) { + p.LastBinlogTimestamp = val +} var fieldIDToName_TGetBinlogLagResult_ = map[int16]string{ 1: "status", 2: "lag", 3: "master_address", + 4: "first_commit_seq", + 5: "last_commit_seq", + 6: "first_binlog_timestamp", + 7: "last_binlog_timestamp", } func (p *TGetBinlogLagResult_) IsSetStatus() bool { @@ -62641,6 +62697,22 @@ func (p *TGetBinlogLagResult_) IsSetMasterAddress() bool { return p.MasterAddress != nil } +func (p *TGetBinlogLagResult_) IsSetFirstCommitSeq() bool { + return p.FirstCommitSeq != nil +} + +func (p *TGetBinlogLagResult_) IsSetLastCommitSeq() bool { + return p.LastCommitSeq != nil +} + +func (p *TGetBinlogLagResult_) IsSetFirstBinlogTimestamp() bool { + return p.FirstBinlogTimestamp != nil +} + +func (p *TGetBinlogLagResult_) IsSetLastBinlogTimestamp() bool { + return p.LastBinlogTimestamp != nil +} + func (p *TGetBinlogLagResult_) Read(iprot thrift.TProtocol) (err error) { var fieldTypeId thrift.TType @@ -62684,6 +62756,38 @@ func (p *TGetBinlogLagResult_) Read(iprot thrift.TProtocol) (err error) { } else if err = iprot.Skip(fieldTypeId); err != nil { goto SkipFieldError } + case 4: + if fieldTypeId == thrift.I64 { + if err = p.ReadField4(iprot); err != nil { + goto ReadFieldError + } + } else if err = iprot.Skip(fieldTypeId); err != nil { + goto SkipFieldError + } + case 5: + if fieldTypeId == thrift.I64 { + if err = p.ReadField5(iprot); err != nil { + goto ReadFieldError + } + } else if err = iprot.Skip(fieldTypeId); err != nil { + goto SkipFieldError + } + case 6: + if fieldTypeId == thrift.I64 { + if err = p.ReadField6(iprot); err != nil { + goto ReadFieldError + } + } else if err = iprot.Skip(fieldTypeId); err != nil { + goto SkipFieldError + } + case 7: + if fieldTypeId == thrift.I64 { + if err = p.ReadField7(iprot); err != nil { + goto ReadFieldError + } + } else if err = iprot.Skip(fieldTypeId); err != nil { + goto SkipFieldError + } default: if err = iprot.Skip(fieldTypeId); err != nil { goto SkipFieldError @@ -62740,6 +62844,50 @@ func (p *TGetBinlogLagResult_) ReadField3(iprot thrift.TProtocol) error { p.MasterAddress = _field return nil } +func (p *TGetBinlogLagResult_) ReadField4(iprot thrift.TProtocol) error { + + var _field *int64 + if v, err := iprot.ReadI64(); err != nil { + return err + } else { + _field = &v + } + p.FirstCommitSeq = _field + return nil +} +func (p *TGetBinlogLagResult_) ReadField5(iprot thrift.TProtocol) error { + + var _field *int64 + if v, err := iprot.ReadI64(); err != nil { + return err + } else { + _field = &v + } + p.LastCommitSeq = _field + return nil +} +func (p *TGetBinlogLagResult_) ReadField6(iprot thrift.TProtocol) error { + + var _field *int64 + if v, err := iprot.ReadI64(); err != nil { + return err + } else { + _field = &v + } + p.FirstBinlogTimestamp = _field + return nil +} +func (p *TGetBinlogLagResult_) ReadField7(iprot thrift.TProtocol) error { + + var _field *int64 + if v, err := iprot.ReadI64(); err != nil { + return err + } else { + _field = &v + } + p.LastBinlogTimestamp = _field + return nil +} func (p *TGetBinlogLagResult_) Write(oprot thrift.TProtocol) (err error) { var fieldId int16 @@ -62759,6 +62907,22 @@ func (p *TGetBinlogLagResult_) Write(oprot thrift.TProtocol) (err error) { fieldId = 3 goto WriteFieldError } + if err = p.writeField4(oprot); err != nil { + fieldId = 4 + goto WriteFieldError + } + if err = p.writeField5(oprot); err != nil { + fieldId = 5 + goto WriteFieldError + } + if err = p.writeField6(oprot); err != nil { + fieldId = 6 + goto WriteFieldError + } + if err = p.writeField7(oprot); err != nil { + fieldId = 7 + goto WriteFieldError + } } if err = oprot.WriteFieldStop(); err != nil { goto WriteFieldStopError @@ -62834,6 +62998,82 @@ WriteFieldEndError: return thrift.PrependError(fmt.Sprintf("%T write field 3 end error: ", p), err) } +func (p *TGetBinlogLagResult_) writeField4(oprot thrift.TProtocol) (err error) { + if p.IsSetFirstCommitSeq() { + if err = oprot.WriteFieldBegin("first_commit_seq", thrift.I64, 4); err != nil { + goto WriteFieldBeginError + } + if err := oprot.WriteI64(*p.FirstCommitSeq); err != nil { + return err + } + if err = oprot.WriteFieldEnd(); err != nil { + goto WriteFieldEndError + } + } + return nil +WriteFieldBeginError: + return thrift.PrependError(fmt.Sprintf("%T write field 4 begin error: ", p), err) +WriteFieldEndError: + return thrift.PrependError(fmt.Sprintf("%T write field 4 end error: ", p), err) +} + +func (p *TGetBinlogLagResult_) writeField5(oprot thrift.TProtocol) (err error) { + if p.IsSetLastCommitSeq() { + if err = oprot.WriteFieldBegin("last_commit_seq", thrift.I64, 5); err != nil { + goto WriteFieldBeginError + } + if err := oprot.WriteI64(*p.LastCommitSeq); err != nil { + return err + } + if err = oprot.WriteFieldEnd(); err != nil { + goto WriteFieldEndError + } + } + return nil +WriteFieldBeginError: + return thrift.PrependError(fmt.Sprintf("%T write field 5 begin error: ", p), err) +WriteFieldEndError: + return thrift.PrependError(fmt.Sprintf("%T write field 5 end error: ", p), err) +} + +func (p *TGetBinlogLagResult_) writeField6(oprot thrift.TProtocol) (err error) { + if p.IsSetFirstBinlogTimestamp() { + if err = oprot.WriteFieldBegin("first_binlog_timestamp", thrift.I64, 6); err != nil { + goto WriteFieldBeginError + } + if err := oprot.WriteI64(*p.FirstBinlogTimestamp); err != nil { + return err + } + if err = oprot.WriteFieldEnd(); err != nil { + goto WriteFieldEndError + } + } + return nil +WriteFieldBeginError: + return thrift.PrependError(fmt.Sprintf("%T write field 6 begin error: ", p), err) +WriteFieldEndError: + return thrift.PrependError(fmt.Sprintf("%T write field 6 end error: ", p), err) +} + +func (p *TGetBinlogLagResult_) writeField7(oprot thrift.TProtocol) (err error) { + if p.IsSetLastBinlogTimestamp() { + if err = oprot.WriteFieldBegin("last_binlog_timestamp", thrift.I64, 7); err != nil { + goto WriteFieldBeginError + } + if err := oprot.WriteI64(*p.LastBinlogTimestamp); err != nil { + return err + } + if err = oprot.WriteFieldEnd(); err != nil { + goto WriteFieldEndError + } + } + return nil +WriteFieldBeginError: + return thrift.PrependError(fmt.Sprintf("%T write field 7 begin error: ", p), err) +WriteFieldEndError: + return thrift.PrependError(fmt.Sprintf("%T write field 7 end error: ", p), err) +} + func (p *TGetBinlogLagResult_) String() string { if p == nil { return "" @@ -62857,6 +63097,18 @@ func (p *TGetBinlogLagResult_) DeepEqual(ano *TGetBinlogLagResult_) bool { if !p.Field3DeepEqual(ano.MasterAddress) { return false } + if !p.Field4DeepEqual(ano.FirstCommitSeq) { + return false + } + if !p.Field5DeepEqual(ano.LastCommitSeq) { + return false + } + if !p.Field6DeepEqual(ano.FirstBinlogTimestamp) { + return false + } + if !p.Field7DeepEqual(ano.LastBinlogTimestamp) { + return false + } return true } @@ -62886,6 +63138,54 @@ func (p *TGetBinlogLagResult_) Field3DeepEqual(src *types.TNetworkAddress) bool } return true } +func (p *TGetBinlogLagResult_) Field4DeepEqual(src *int64) bool { + + if p.FirstCommitSeq == src { + return true + } else if p.FirstCommitSeq == nil || src == nil { + return false + } + if *p.FirstCommitSeq != *src { + return false + } + return true +} +func (p *TGetBinlogLagResult_) Field5DeepEqual(src *int64) bool { + + if p.LastCommitSeq == src { + return true + } else if p.LastCommitSeq == nil || src == nil { + return false + } + if *p.LastCommitSeq != *src { + return false + } + return true +} +func (p *TGetBinlogLagResult_) Field6DeepEqual(src *int64) bool { + + if p.FirstBinlogTimestamp == src { + return true + } else if p.FirstBinlogTimestamp == nil || src == nil { + return false + } + if *p.FirstBinlogTimestamp != *src { + return false + } + return true +} +func (p *TGetBinlogLagResult_) Field7DeepEqual(src *int64) bool { + + if p.LastBinlogTimestamp == src { + return true + } else if p.LastBinlogTimestamp == nil || src == nil { + return false + } + if *p.LastBinlogTimestamp != *src { + return false + } + return true +} type TUpdateFollowerStatsCacheRequest struct { Key *string `thrift:"key,1,optional" frugal:"1,optional,string" json:"key,omitempty"` diff --git a/pkg/rpc/kitex_gen/frontendservice/k-FrontendService.go b/pkg/rpc/kitex_gen/frontendservice/k-FrontendService.go index aa1acef3..428ad4b1 100644 --- a/pkg/rpc/kitex_gen/frontendservice/k-FrontendService.go +++ b/pkg/rpc/kitex_gen/frontendservice/k-FrontendService.go @@ -45540,6 +45540,62 @@ func (p *TGetBinlogLagResult_) FastRead(buf []byte) (int, error) { goto SkipFieldError } } + case 4: + if fieldTypeId == thrift.I64 { + l, err = p.FastReadField4(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldError + } + } else { + l, err = bthrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + case 5: + if fieldTypeId == thrift.I64 { + l, err = p.FastReadField5(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldError + } + } else { + l, err = bthrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + case 6: + if fieldTypeId == thrift.I64 { + l, err = p.FastReadField6(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldError + } + } else { + l, err = bthrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + case 7: + if fieldTypeId == thrift.I64 { + l, err = p.FastReadField7(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldError + } + } else { + l, err = bthrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } default: l, err = bthrift.Binary.Skip(buf[offset:], fieldTypeId) offset += l @@ -45614,6 +45670,58 @@ func (p *TGetBinlogLagResult_) FastReadField3(buf []byte) (int, error) { return offset, nil } +func (p *TGetBinlogLagResult_) FastReadField4(buf []byte) (int, error) { + offset := 0 + + if v, l, err := bthrift.Binary.ReadI64(buf[offset:]); err != nil { + return offset, err + } else { + offset += l + p.FirstCommitSeq = &v + + } + return offset, nil +} + +func (p *TGetBinlogLagResult_) FastReadField5(buf []byte) (int, error) { + offset := 0 + + if v, l, err := bthrift.Binary.ReadI64(buf[offset:]); err != nil { + return offset, err + } else { + offset += l + p.LastCommitSeq = &v + + } + return offset, nil +} + +func (p *TGetBinlogLagResult_) FastReadField6(buf []byte) (int, error) { + offset := 0 + + if v, l, err := bthrift.Binary.ReadI64(buf[offset:]); err != nil { + return offset, err + } else { + offset += l + p.FirstBinlogTimestamp = &v + + } + return offset, nil +} + +func (p *TGetBinlogLagResult_) FastReadField7(buf []byte) (int, error) { + offset := 0 + + if v, l, err := bthrift.Binary.ReadI64(buf[offset:]); err != nil { + return offset, err + } else { + offset += l + p.LastBinlogTimestamp = &v + + } + return offset, nil +} + // for compatibility func (p *TGetBinlogLagResult_) FastWrite(buf []byte) int { return 0 @@ -45624,6 +45732,10 @@ func (p *TGetBinlogLagResult_) FastWriteNocopy(buf []byte, binaryWriter bthrift. offset += bthrift.Binary.WriteStructBegin(buf[offset:], "TGetBinlogLagResult") if p != nil { offset += p.fastWriteField2(buf[offset:], binaryWriter) + offset += p.fastWriteField4(buf[offset:], binaryWriter) + offset += p.fastWriteField5(buf[offset:], binaryWriter) + offset += p.fastWriteField6(buf[offset:], binaryWriter) + offset += p.fastWriteField7(buf[offset:], binaryWriter) offset += p.fastWriteField1(buf[offset:], binaryWriter) offset += p.fastWriteField3(buf[offset:], binaryWriter) } @@ -45639,6 +45751,10 @@ func (p *TGetBinlogLagResult_) BLength() int { l += p.field1Length() l += p.field2Length() l += p.field3Length() + l += p.field4Length() + l += p.field5Length() + l += p.field6Length() + l += p.field7Length() } l += bthrift.Binary.FieldStopLength() l += bthrift.Binary.StructEndLength() @@ -45676,6 +45792,50 @@ func (p *TGetBinlogLagResult_) fastWriteField3(buf []byte, binaryWriter bthrift. return offset } +func (p *TGetBinlogLagResult_) fastWriteField4(buf []byte, binaryWriter bthrift.BinaryWriter) int { + offset := 0 + if p.IsSetFirstCommitSeq() { + offset += bthrift.Binary.WriteFieldBegin(buf[offset:], "first_commit_seq", thrift.I64, 4) + offset += bthrift.Binary.WriteI64(buf[offset:], *p.FirstCommitSeq) + + offset += bthrift.Binary.WriteFieldEnd(buf[offset:]) + } + return offset +} + +func (p *TGetBinlogLagResult_) fastWriteField5(buf []byte, binaryWriter bthrift.BinaryWriter) int { + offset := 0 + if p.IsSetLastCommitSeq() { + offset += bthrift.Binary.WriteFieldBegin(buf[offset:], "last_commit_seq", thrift.I64, 5) + offset += bthrift.Binary.WriteI64(buf[offset:], *p.LastCommitSeq) + + offset += bthrift.Binary.WriteFieldEnd(buf[offset:]) + } + return offset +} + +func (p *TGetBinlogLagResult_) fastWriteField6(buf []byte, binaryWriter bthrift.BinaryWriter) int { + offset := 0 + if p.IsSetFirstBinlogTimestamp() { + offset += bthrift.Binary.WriteFieldBegin(buf[offset:], "first_binlog_timestamp", thrift.I64, 6) + offset += bthrift.Binary.WriteI64(buf[offset:], *p.FirstBinlogTimestamp) + + offset += bthrift.Binary.WriteFieldEnd(buf[offset:]) + } + return offset +} + +func (p *TGetBinlogLagResult_) fastWriteField7(buf []byte, binaryWriter bthrift.BinaryWriter) int { + offset := 0 + if p.IsSetLastBinlogTimestamp() { + offset += bthrift.Binary.WriteFieldBegin(buf[offset:], "last_binlog_timestamp", thrift.I64, 7) + offset += bthrift.Binary.WriteI64(buf[offset:], *p.LastBinlogTimestamp) + + offset += bthrift.Binary.WriteFieldEnd(buf[offset:]) + } + return offset +} + func (p *TGetBinlogLagResult_) field1Length() int { l := 0 if p.IsSetStatus() { @@ -45707,6 +45867,50 @@ func (p *TGetBinlogLagResult_) field3Length() int { return l } +func (p *TGetBinlogLagResult_) field4Length() int { + l := 0 + if p.IsSetFirstCommitSeq() { + l += bthrift.Binary.FieldBeginLength("first_commit_seq", thrift.I64, 4) + l += bthrift.Binary.I64Length(*p.FirstCommitSeq) + + l += bthrift.Binary.FieldEndLength() + } + return l +} + +func (p *TGetBinlogLagResult_) field5Length() int { + l := 0 + if p.IsSetLastCommitSeq() { + l += bthrift.Binary.FieldBeginLength("last_commit_seq", thrift.I64, 5) + l += bthrift.Binary.I64Length(*p.LastCommitSeq) + + l += bthrift.Binary.FieldEndLength() + } + return l +} + +func (p *TGetBinlogLagResult_) field6Length() int { + l := 0 + if p.IsSetFirstBinlogTimestamp() { + l += bthrift.Binary.FieldBeginLength("first_binlog_timestamp", thrift.I64, 6) + l += bthrift.Binary.I64Length(*p.FirstBinlogTimestamp) + + l += bthrift.Binary.FieldEndLength() + } + return l +} + +func (p *TGetBinlogLagResult_) field7Length() int { + l := 0 + if p.IsSetLastBinlogTimestamp() { + l += bthrift.Binary.FieldBeginLength("last_binlog_timestamp", thrift.I64, 7) + l += bthrift.Binary.I64Length(*p.LastBinlogTimestamp) + + l += bthrift.Binary.FieldEndLength() + } + return l +} + func (p *TUpdateFollowerStatsCacheRequest) FastRead(buf []byte) (int, error) { var err error var offset int diff --git a/pkg/rpc/thrift/FrontendService.thrift b/pkg/rpc/thrift/FrontendService.thrift index c1a4d106..fca9c484 100644 --- a/pkg/rpc/thrift/FrontendService.thrift +++ b/pkg/rpc/thrift/FrontendService.thrift @@ -1465,6 +1465,10 @@ struct TGetBinlogLagResult { 1: optional Status.TStatus status 2: optional i64 lag 3: optional Types.TNetworkAddress master_address + 4: optional i64 first_commit_seq + 5: optional i64 last_commit_seq + 6: optional i64 first_binlog_timestamp + 7: optional i64 last_binlog_timestamp } struct TUpdateFollowerStatsCacheRequest { diff --git a/pkg/service/http_service.go b/pkg/service/http_service.go index bc155cd8..dbac7168 100644 --- a/pkg/service/http_service.go +++ b/pkg/service/http_service.go @@ -25,6 +25,7 @@ import ( "reflect" "strconv" "strings" + "time" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/selectdb/ccr_syncer/pkg/ccr" @@ -222,7 +223,12 @@ func (s *HttpService) getLagHandler(w http.ResponseWriter, r *http.Request) { type result struct { *defaultResult - Lag int64 `json:"lag"` + Lag int64 `json:"lag"` + FirstCommitSeq int64 `json:"first_commit_seq"` + LastCommitSeq int64 `json:"last_commit_seq"` + FirstBinlogTimestamp string `json:"first_binlog_timestamp"` + LastBinlogTimestamp string `json:"last_binlog_timestamp"` + TimeInterval float64 `json:"time_interval"` } var lagResult *result defer func() { writeJson(w, lagResult) }() @@ -308,10 +314,31 @@ func (s *HttpService) getLagHandler(w http.ResponseWriter, r *http.Request) { } lag := resp.GetLag() + firstCommitSeq := resp.GetFirstCommitSeq() + lastCommitSeq := resp.GetLastCommitSeq() + var firstBinlogTimestamp, lastBinlogTimestamp string + + if ts := resp.GetFirstBinlogTimestamp(); ts != -1 { + firstBinlogTimestamp = ConvertTimestampToString(resp.GetFirstBinlogTimestamp()) + } else { + firstBinlogTimestamp = "1970-01-01 08:00:00" + } + if ts := resp.GetLastBinlogTimestamp(); ts != -1 { + lastBinlogTimestamp = ConvertTimestampToString(resp.GetLastBinlogTimestamp()) + } else { + lastBinlogTimestamp = "1970-01-01 08:00:00" + } + + timeInterval := CalculateTimeDifferenceInSeconds(lastBinlogTimestamp, firstBinlogTimestamp) lagResult = &result{ - defaultResult: newSuccessResult(), - Lag: lag, + defaultResult: newSuccessResult(), + Lag: lag, + FirstCommitSeq: firstCommitSeq, + LastCommitSeq: lastCommitSeq, + FirstBinlogTimestamp: firstBinlogTimestamp, + LastBinlogTimestamp: lastBinlogTimestamp, + TimeInterval: timeInterval, } } @@ -907,3 +934,14 @@ func (s *HttpService) Stop() error { } return nil } + +func ConvertTimestampToString(timestamp int64) string { + return time.Unix(0, timestamp*int64(time.Millisecond)).Format(time.DateTime) +} + +func CalculateTimeDifferenceInSeconds(timeStr1, timeStr2 string) float64 { + t1, _ := time.Parse(time.DateTime, timeStr1) + t2, _ := time.Parse(time.DateTime, timeStr2) + diff := t1.Sub(t2) + return diff.Seconds() +}