Skip to content

Commit

Permalink
Re-use same buffer when receiving messages in TCP collector (#399)
Browse files Browse the repository at this point in the history
Not a huge improvement all things considered, but an improvement
nonetheless and it makes the TCP collector consistent with the UDP
collector.

Signed-off-by: Antonin Bas <[email protected]>
  • Loading branch information
antoninbas authored Jan 15, 2025
1 parent 8ded9f6 commit e668a28
Showing 1 changed file with 12 additions and 2 deletions.
14 changes: 12 additions & 2 deletions pkg/collector/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func (cp *CollectingProcess) handleTCPClient(conn net.Conn) {
go func() {
defer cp.wg.Done()
defer close(doneCh)
var b bytes.Buffer
for {
length, err := getMessageLength(reader)
if errors.Is(err, io.EOF) {
Expand All @@ -97,13 +98,21 @@ func (cp *CollectingProcess) handleTCPClient(conn net.Conn) {
klog.ErrorS(err, "Error when retrieving message length")
return
}
buff := make([]byte, length)
// Make sure we have enough capacity for the message.
b.Grow(length)
// The buff slice is guaranteed to have a capacity >= length, and will have
// a length of 0.
buff := b.AvailableBuffer()
// Increase the length of buff to fit the message. Note that slices can be
// resliced up to their capacity.
buff = buff[:length]
_, err = io.ReadFull(reader, buff)
if err != nil {
klog.ErrorS(err, "Error when reading the message")
return
}
message, err := cp.decodePacket(session, bytes.NewBuffer(buff), address)
b.Write(buff)
message, err := cp.decodePacket(session, &b, address)
if err != nil {
// This can be an invalid template record, or invalid data record.
// We close the connection, which is the best way to let the client
Expand All @@ -113,6 +122,7 @@ func (cp *CollectingProcess) handleTCPClient(conn net.Conn) {
}
klog.V(4).InfoS("Processed message from exporter",
"observationDomainID", message.GetObsDomainID(), "setType", message.GetSet().GetSetType(), "numRecords", message.GetSet().GetNumberOfRecords())
b.Reset()
}
}()
select {
Expand Down

0 comments on commit e668a28

Please sign in to comment.