-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhandle_cancel.go
33 lines (31 loc) · 1.04 KB
/
handle_cancel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main
import (
pb "esdeath_go/esdeath_proto"
"esdeath_go/miku/persistent"
log "github.com/sirupsen/logrus"
)
func cancelReqHandle(cancel *pb.DelayMsgCancel) (*pb.CancelDelayMsgResult, error) {
tempMsgId := cancel.MsgId + keyDelimiter + defConsumerGroup
item, err := newKeyEntityFromStr(tempMsgId)
if err != nil {
return nil, err
}
var pairs []*persistent.KVPair
// 如果一个topic有多个consumerGroup,那么消息会全部删除
if cgs, ok := topicToManyConsumerGroupsMap.Get(item.topic); ok {
for _, consumeGroup := range cgs.Keys() {
message := persistent.NewKPair(cancel.MsgId + keyDelimiter + consumeGroup)
log.Debugln("取消延迟消息:", message.Key)
pairs = append(pairs, message)
}
} else {
message := persistent.NewKPair(tempMsgId)
log.Debugln("取消延迟消息:", message.Key)
pairs = append(pairs, message)
}
if err := rkv.Delete(pairs); err != nil {
return nil, err
}
log.Debugln("取消延迟消息:", cancel.MsgId)
return &pb.CancelDelayMsgResult{BaseResult: successResult()}, nil
}