Skip to content

Commit

Permalink
feat: Add OTLP log endpoints (gRPC & HTTP) (#1187)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?

Adds support for receiving OTLP logs. Only logs with a trace ID
associated are handled and are sampled along with spans. Logs without a
trace ID are forwarded to Honeycomb immediately like events without
TraceIDs are.

Further work to add support for configurable correlation IDs will be
done in later PRs.

**Note**: This is the 2.x version of adding OTLP Logs, see the following
PR that tracks adding to 3.x on main branch:
- #1168 

## Short description of the changes
- Add OTLP endpoints gRPC & HTTP (http/proto & http/json) including
support for gzip & zstd
- Adds missing API key check for trace gRPC handler with tests
- Add unit tests to verify endpoints and different encodings are handled
- Add unit tests to verify logs with a Trace ID are added to collector
and logs without a Trace ID are sent to libhoney for dispatch
- Rename processTraceRequest to `processOTLPRequest` and use in
otlp_trace
- Rename OTLP trace endpoint func postOTLP to postOTLPTraces
- Add MockCollector to aid with testing

---------

Co-authored-by: Pierre Tessier <[email protected]>
Co-authored-by: Kent Quirk <[email protected]>
  • Loading branch information
3 people authored Jun 11, 2024
1 parent dc11e02 commit a1088e7
Show file tree
Hide file tree
Showing 6 changed files with 605 additions and 54 deletions.
49 changes: 49 additions & 0 deletions collect/mockCollector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package collect

import (
"github.com/honeycombio/refinery/types"
)

type MockCollector struct {
Spans chan *types.Span
}

func NewMockCollector() *MockCollector {
return &MockCollector{
Spans: make(chan *types.Span, 100),
}
}

func (m *MockCollector) AddSpan(span *types.Span) error {
m.Spans <- span
return nil
}

func (m *MockCollector) AddSpanFromPeer(span *types.Span) error {
m.Spans <- span
return nil
}

func (m *MockCollector) GetStressedSampleRate(traceID string) (rate uint, keep bool, reason string) {
return 0, false, ""
}

func (m *MockCollector) ProcessSpanImmediately(sp *types.Span, keep bool, sampleRate uint, reason string) {
m.Spans <- sp
}

func (m *MockCollector) Stressed() bool {
return false
}

func (m *MockCollector) Flush() {
for {
select {
case <-m.Spans:
default:
return
}
}
}

var _ Collector = (*MockCollector)(nil)
77 changes: 77 additions & 0 deletions route/otlp_logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package route

import (
"context"
"errors"
"fmt"
"net/http"

huskyotlp "github.com/honeycombio/husky/otlp"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

collectorlogs "go.opentelemetry.io/proto/otlp/collector/logs/v1"
)

func (r *Router) postOTLPLogs(w http.ResponseWriter, req *http.Request) {
ri := huskyotlp.GetRequestInfoFromHttpHeaders(req.Header)

if err := ri.ValidateLogsHeaders(); err != nil {
if errors.Is(err, huskyotlp.ErrInvalidContentType) {
r.handlerReturnWithError(w, ErrInvalidContentType, err)
} else {
r.handleOTLPFailureResponse(w, req, huskyotlp.OTLPError{Message: err.Error(), HTTPStatusCode: http.StatusUnauthorized})
}
return
}

if !r.Config.IsAPIKeyValid(ri.ApiKey) {
r.handleOTLPFailureResponse(w, req, huskyotlp.OTLPError{Message: fmt.Sprintf("api key %s not found in list of authorized keys", ri.ApiKey), HTTPStatusCode: http.StatusUnauthorized})
return
}

result, err := huskyotlp.TranslateLogsRequestFromReader(req.Context(), req.Body, ri)
if err != nil {
r.handleOTLPFailureResponse(w, req, huskyotlp.OTLPError{Message: err.Error(), HTTPStatusCode: http.StatusInternalServerError})
return
}

if err := r.processOTLPRequest(req.Context(), result.Batches, ri.ApiKey); err != nil {
r.handleOTLPFailureResponse(w, req, huskyotlp.OTLPError{Message: err.Error(), HTTPStatusCode: http.StatusInternalServerError})
return
}

_ = huskyotlp.WriteOtlpHttpTraceSuccessResponse(w, req)
}

type LogsServer struct {
router *Router
collectorlogs.UnimplementedLogsServiceServer
}

func NewLogsServer(router *Router) *LogsServer {
logsServer := LogsServer{router: router}
return &logsServer
}

func (l *LogsServer) Export(ctx context.Context, req *collectorlogs.ExportLogsServiceRequest) (*collectorlogs.ExportLogsServiceResponse, error) {
ri := huskyotlp.GetRequestInfoFromGrpcMetadata(ctx)
if err := ri.ValidateLogsHeaders(); err != nil {
return nil, huskyotlp.AsGRPCError(err)
}

if !l.router.Config.IsAPIKeyValid(ri.ApiKey) {
return nil, status.Error(codes.Unauthenticated, fmt.Sprintf("api key %s not found in list of authorized keys", ri.ApiKey))
}

result, err := huskyotlp.TranslateLogsRequest(ctx, req, ri)
if err != nil {
return nil, huskyotlp.AsGRPCError(err)
}

if err := l.router.processOTLPRequest(ctx, result.Batches, ri.ApiKey); err != nil {
return nil, huskyotlp.AsGRPCError(err)
}

return &collectorlogs.ExportLogsServiceResponse{}, nil
}
Loading

0 comments on commit a1088e7

Please sign in to comment.