-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsflow.go
172 lines (142 loc) · 4.83 KB
/
sflow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package main
import (
"fmt"
log "github.com/sirupsen/logrus"
)
const (
vxlanPort = 4789
ipProtoUDP = 0x11
etherType8021Q = 0x8100
etherTypeIPv4 = 0x0800
etherTypeIPv6 = 0x86DD
)
// sFlow struct diagrams could be found here:
// https://sflow.org/developers/diagrams/sFlowV5Datagram.pdf
// https://sflow.org/developers/diagrams/sFlowV5Sample.pdf
// https://sflow.org/developers/diagrams/sFlowV5FlowData.pdf
func processDatagram(c *copier) {
// Do not process unsupported datagram versions
if dv := c.copyUint32(); dv != 5 {
panic(fmt.Sprintf("Unsupported datagram version %d", dv))
}
switch at := c.copyUint32(); at {
case 1:
// Copy IPv4 address (4 bytes), agentID (4 bytes),
// sequenceNumber (4 bytes), agentUptime (4 bytes)
c.copyBytes(16)
case 2:
// Copy IPv6 address (16 bytes), agentID (4 bytes),
// sequenceNumber (4 bytes), agentUptime (4 bytes)
c.copyBytes(28)
default:
panic(fmt.Sprintf("Unsupported agent address type %d", at))
}
sampleCount := c.copyUint32()
for i := uint32(0); i < sampleCount; i++ {
processSample(c)
}
}
func processSample(c *copier) {
enterpriseID, format := c.copyDataFormat()
oldSampleLength := int(c.copyUint32())
srcSampleStart := c.srcOffset()
dstSampleStart := c.dstOffset()
if enterpriseID != 0 {
log.Debugf("Skipping unsupported sample enterpriseID %d", enterpriseID)
c.copyBytesAt(oldSampleLength, srcSampleStart, dstSampleStart)
return
} else if format != 1 {
log.Debugf("Skipping unsupported sample type %d", format)
c.copyBytesAt(oldSampleLength, srcSampleStart, dstSampleStart)
return
}
// Copy sampleSequenceNumber (4 bytes),
// sampleDataSource (4 bytes), samplingRate (4 bytes),
// samplePool (4 bytes), dropped (4 bytes), inputInterface (4 bytes),
// outputInterface (4 bytes)
c.copyBytes(28)
recordCount := c.copyUint32()
for i := uint32(0); i < recordCount; i++ {
processRecord(c)
}
// Update the sample lenght field
newSampleLength := uint32(c.dstOffset() - dstSampleStart)
c.writeUint32At(newSampleLength, dstSampleStart-4)
}
func processRecord(c *copier) {
enterpriseID, format := c.copyDataFormat()
oldRecordLength := int(c.copyUint32())
srcRecordStart := c.srcOffset()
dstRecordStart := c.dstOffset()
if enterpriseID != 0 {
log.Debugf("Skipping unsupported record enterpriseID %d", enterpriseID)
c.copyBytesAt(oldRecordLength, srcRecordStart, dstRecordStart)
return
} else if format != 1 {
log.Debugf("Skipping unsupported record type %d", format)
c.copyBytesAt(oldRecordLength, srcRecordStart, dstRecordStart)
return
}
// Parse headerProtocol
if hp := c.copyUint32(); hp != 1 {
log.Debugf("Skipping unsupported frame type %d", hp)
c.copyBytesAt(oldRecordLength, srcRecordStart, dstRecordStart)
return
}
frameLength := c.copyUint32()
// Copy payloadRemoved (4 bytes)
c.copyBytes(4)
oldHeaderLength := c.copyUint32()
// Skip dstMAC (6 bytes), srcMAC (6 bytes)
c.skip(12)
var ipProto uint8
PARSE_FRAME:
switch etherType := c.readUint16(); etherType {
case etherType8021Q:
c.skip(2) // Skip VLANID
goto PARSE_FRAME // Re-parse the frame
case etherTypeIPv4:
ihl := int(c.readUint8() & 0x0F) // IP header length in 32-bit words
c.skip(8) // Skip several IPv4 header fields
ipProto = c.readUint8()
c.skip(ihl*4 - 10) // Skip all IPv4 fields left
case etherTypeIPv6:
c.skip(6) // Skip several IPv6 header fields
ipProto = c.readUint8()
c.skip(33) // Skip all IPv6 fields left
default:
log.Debugf("Skipping unsupported ethertype %d", etherType)
c.copyBytesAt(oldRecordLength, srcRecordStart, dstRecordStart)
return
}
if ipProto != ipProtoUDP {
log.Debug("Skipping non-UDP packet")
c.copyBytesAt(oldRecordLength, srcRecordStart, dstRecordStart)
return
}
c.skip(2) // skip UDP src port (2 bytes)
if dstUDPPort := c.readUint16(); dstUDPPort != vxlanPort {
log.Debug("Skipping non-VXLAN packet")
c.copyBytesAt(oldRecordLength, srcRecordStart, dstRecordStart)
return
}
c.skip(12) // Skip UDP packet length (2 bytes), UDP checksum (2 bytes), VXLAN header (8 bytes)
// Copy the rest of the frame
dstHeaderStart := c.dstOffset()
c.copyBytes(oldRecordLength - (c.srcOffset() - srcRecordStart))
// XDR format requies 4-byte alignment, and the record headers
// is the only variable-length record field
headerLength := c.dstOffset() - dstHeaderStart
if mod := headerLength % 4; mod != 0 {
c.pad(4 - mod)
headerLength += 4 - mod
}
// Update the record lenght field
newRecordLength := uint32(c.dstOffset() - dstRecordStart)
c.writeUint32At(newRecordLength, dstRecordStart-4)
// Update the frameLength to reflect the absence of the stripped headers
frameLength -= oldHeaderLength - uint32(headerLength)
c.writeUint32At(frameLength, dstRecordStart+4)
// Update the headerLength to reflect the absence of the stripped headers
c.writeUint32At(uint32(headerLength), dstRecordStart+12)
}