-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathfetch_request.go
158 lines (138 loc) · 4.84 KB
/
fetch_request.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package healer
import (
"encoding/binary"
)
// PartitionBlock is the partition to fetch.
type PartitionBlock struct {
Partition int32
CurrentLeaderEpoch int32
FetchOffset int64
LogStartOffset int64
MaxBytes int32
}
// FetchRequest holds all the parameters of fetch request
type FetchRequest struct {
*RequestHeader
ReplicaID int32
MaxWaitTime int32
MinBytes int32
MaxBytes int32
ISOLationLevel int8
SessionID int32
SessionEpoch int32
Topics map[string][]*PartitionBlock
ForgottenTopicsDatas map[string][]int32
}
// NewFetchRequest creates a new FetchRequest
func NewFetchRequest(clientID string, maxWaitTime int32, minBytes int32) *FetchRequest {
requestHeader := &RequestHeader{
APIKey: API_FetchRequest,
APIVersion: 10,
ClientID: &clientID,
}
topics := make(map[string][]*PartitionBlock)
return &FetchRequest{
RequestHeader: requestHeader,
ReplicaID: -1,
MaxWaitTime: maxWaitTime,
MinBytes: minBytes,
Topics: topics,
ForgottenTopicsDatas: make(map[string][]int32),
}
}
func (fetchRequest *FetchRequest) addPartition(topic string, partitionID int32, fetchOffset int64, maxBytes int32, currentLeaderEpoch int32) {
logger.V(4).Info("add partition to fetch request", "topic", topic, "partitionID", partitionID, "fetchOffset", fetchOffset, "maxBytes", maxBytes)
fetchRequest.MaxBytes += maxBytes
partitionBlock := &PartitionBlock{
Partition: partitionID,
CurrentLeaderEpoch: currentLeaderEpoch,
FetchOffset: fetchOffset,
MaxBytes: maxBytes,
}
if value, ok := fetchRequest.Topics[topic]; ok {
fetchRequest.Topics[topic] = append(value, partitionBlock)
} else {
fetchRequest.Topics[topic] = []*PartitionBlock{partitionBlock}
}
}
func (fetchRequest *FetchRequest) length(version uint16) int {
length := 4 + fetchRequest.RequestHeader.length()
length += 25 + 4 + 4
for topicname := range fetchRequest.Topics {
length += 2 + len(topicname) + 28
}
for topicname, partitionIDs := range fetchRequest.ForgottenTopicsDatas {
length += 2 + len(topicname) + 4 + len(partitionIDs)*4
}
return length * 2
}
// Encode encodes request to []byte
func (fetchRequest *FetchRequest) Encode(version uint16) []byte {
fetchRequest.RequestHeader.APIVersion = version
requestLength := fetchRequest.length(version)
payload := make([]byte, requestLength)
offset := 4 // payload[:4] is requestLength, it will be filled at the end
offset += fetchRequest.RequestHeader.EncodeTo(payload[offset:])
binary.BigEndian.PutUint32(payload[offset:], uint32(fetchRequest.ReplicaID))
offset += 4
binary.BigEndian.PutUint32(payload[offset:], uint32(fetchRequest.MaxWaitTime))
offset += 4
binary.BigEndian.PutUint32(payload[offset:], uint32(fetchRequest.MinBytes))
offset += 4
if version >= 7 {
binary.BigEndian.PutUint32(payload[offset:], uint32(fetchRequest.MaxBytes))
offset += 4
payload[offset] = byte(fetchRequest.ISOLationLevel)
offset++
}
if version >= 7 {
binary.BigEndian.PutUint32(payload[offset:], uint32(fetchRequest.SessionID))
offset += 4
}
if version >= 7 {
binary.BigEndian.PutUint32(payload[offset:], uint32(fetchRequest.SessionEpoch))
offset += 4
}
binary.BigEndian.PutUint32(payload[offset:], uint32(len(fetchRequest.Topics)))
offset += 4
for topicname, partitionBlocks := range fetchRequest.Topics {
binary.BigEndian.PutUint16(payload[offset:], uint16(len(topicname)))
offset += 2
offset += copy(payload[offset:], topicname)
binary.BigEndian.PutUint32(payload[offset:], uint32(len(partitionBlocks)))
offset += 4
for _, partitionBlock := range partitionBlocks {
binary.BigEndian.PutUint32(payload[offset:], uint32(partitionBlock.Partition))
offset += 4
if version >= 10 {
binary.BigEndian.PutUint32(payload[offset:], uint32(partitionBlock.CurrentLeaderEpoch))
offset += 4
}
binary.BigEndian.PutUint64(payload[offset:], uint64(partitionBlock.FetchOffset))
offset += 8
if version >= 7 {
binary.BigEndian.PutUint64(payload[offset:], uint64(partitionBlock.LogStartOffset))
offset += 8
}
binary.BigEndian.PutUint32(payload[offset:], uint32(partitionBlock.MaxBytes))
offset += 4
}
}
if version >= 7 {
binary.BigEndian.PutUint32(payload[offset:], uint32(len(fetchRequest.ForgottenTopicsDatas)))
offset += 4
for topicName, partitions := range fetchRequest.ForgottenTopicsDatas {
binary.BigEndian.PutUint16(payload[offset:], uint16(len(topicName)))
offset += 2
offset += copy(payload[offset:], topicName)
binary.BigEndian.PutUint32(payload[offset:], uint32(len(partitions)))
offset += 4
for _, p := range partitions {
binary.BigEndian.PutUint32(payload[offset:], uint32(p))
offset += 4
}
}
}
binary.BigEndian.PutUint32(payload, uint32(offset-4))
return payload[:offset]
}