Skip to content

Commit

Permalink
golang: allow accessing {request,response}/{headers,trailer} in the O…
Browse files Browse the repository at this point in the history
…nLog phase (envoyproxy#34810)

Signed-off-by: spacewander <[email protected]>
  • Loading branch information
spacewander authored Aug 14, 2024
1 parent 6f2d549 commit 69ee8cf
Show file tree
Hide file tree
Showing 17 changed files with 235 additions and 50 deletions.
4 changes: 4 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ behavior_changes:
change: |
Removed support for (long deprecated) opentracing. See `issue 27401
<https://github.com/envoyproxy/envoy/issues/27401>`_ for details.
- area: golang
change: |
Change ``OnLogDownstreamStart``, ``OnLogDownstreamPeriodic`` and ``OnLog`` methods so that user can get the request/response's
headers and trailers when producing access log.
- area: http
change: |
Added HTTP1-safe option for :ref:`max_connection_duration
Expand Down
7 changes: 5 additions & 2 deletions contrib/golang/common/dso/dso.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,12 @@ GoUint64 HttpFilterDsoImpl::envoyGoFilterOnHttpData(processState* p0, GoUint64 p
return envoy_go_filter_on_http_data_(p0, p1, p2, p3);
}

void HttpFilterDsoImpl::envoyGoFilterOnHttpLog(httpRequest* p0, int p1) {
void HttpFilterDsoImpl::envoyGoFilterOnHttpLog(httpRequest* p0, int p1, processState* p2,
processState* p3, GoUint64 p4, GoUint64 p5,
GoUint64 p6, GoUint64 p7, GoUint64 p8, GoUint64 p9,
GoUint64 p10, GoUint64 p11) {
ASSERT(envoy_go_filter_on_http_log_ != nullptr);
envoy_go_filter_on_http_log_(p0, GoUint64(p1));
envoy_go_filter_on_http_log_(p0, GoUint64(p1), p2, p3, p4, p5, p6, p7, p8, p9, p10, p11);
}

void HttpFilterDsoImpl::envoyGoFilterOnHttpDestroy(httpRequest* p0, int p1) {
Expand Down
13 changes: 10 additions & 3 deletions contrib/golang/common/dso/dso.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ class HttpFilterDso : public Dso {
GoUint64 p3) PURE;
virtual GoUint64 envoyGoFilterOnHttpData(processState* p0, GoUint64 p1, GoUint64 p2,
GoUint64 p3) PURE;
virtual void envoyGoFilterOnHttpLog(httpRequest* p0, int p1) PURE;
virtual void envoyGoFilterOnHttpLog(httpRequest* p0, int p1, processState* p2, processState* p3,
GoUint64 p4, GoUint64 p5, GoUint64 p6, GoUint64 p7,
GoUint64 p8, GoUint64 p9, GoUint64 p10, GoUint64 p11) PURE;
virtual void envoyGoFilterOnHttpDestroy(httpRequest* p0, int p1) PURE;
virtual void envoyGoRequestSemaDec(httpRequest* p0) PURE;
};
Expand All @@ -61,7 +63,9 @@ class HttpFilterDsoImpl : public HttpFilterDso {
GoUint64 p3) override;
GoUint64 envoyGoFilterOnHttpData(processState* p0, GoUint64 p1, GoUint64 p2,
GoUint64 p3) override;
void envoyGoFilterOnHttpLog(httpRequest* p0, int p1) override;
void envoyGoFilterOnHttpLog(httpRequest* p0, int p1, processState* p2, processState* p3,
GoUint64 p4, GoUint64 p5, GoUint64 p6, GoUint64 p7, GoUint64 p8,
GoUint64 p9, GoUint64 p10, GoUint64 p11) override;
void envoyGoFilterOnHttpDestroy(httpRequest* p0, int p1) override;
void envoyGoRequestSemaDec(httpRequest* p0) override;
void cleanup() override;
Expand All @@ -75,7 +79,10 @@ class HttpFilterDsoImpl : public HttpFilterDso {
GoUint64 p3) = {nullptr};
GoUint64 (*envoy_go_filter_on_http_data_)(processState* p0, GoUint64 p1, GoUint64 p2,
GoUint64 p3) = {nullptr};
void (*envoy_go_filter_on_http_log_)(httpRequest* p0, GoUint64 p1) = {nullptr};
void (*envoy_go_filter_on_http_log_)(httpRequest* p0, int p1, processState* p2, processState* p3,
GoUint64 p4, GoUint64 p5, GoUint64 p6, GoUint64 p7,
GoUint64 p8, GoUint64 p9, GoUint64 p10,
GoUint64 p11) = {nullptr};
void (*envoy_go_filter_on_http_destroy_)(httpRequest* p0, GoUint64 p1) = {nullptr};
void (*envoy_go_filter_go_request_sema_dec_)(httpRequest* p0) = {nullptr};
void (*envoy_go_filter_cleanup_)() = {nullptr};
Expand Down
7 changes: 6 additions & 1 deletion contrib/golang/common/dso/libgolang.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,12 @@ extern GoUint64 envoyGoFilterOnHttpData(processState* s, GoUint64 end_stream, Go

// go:linkname envoyGoFilterOnHttpLog
// github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http.envoyGoFilterOnHttpLog
extern void envoyGoFilterOnHttpLog(httpRequest* r, GoUint64 type);
extern void envoyGoFilterOnHttpLog(httpRequest* r, GoUint64 type, processState* decoding_state,
processState* encoding_state, GoUint64 req_header_num,
GoUint64 req_header_bytes, GoUint64 req_trailer_num,
GoUint64 req_trailer_bytes, GoUint64 resp_header_num,
GoUint64 resp_header_bytes, GoUint64 resp_trailer_num,
GoUint64 resp_trailer_bytes);

// go:linkname envoyGoFilterOnHttpDestroy
// github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http.envoyGoFilterOnHttpDestroy
Expand Down
5 changes: 4 additions & 1 deletion contrib/golang/common/dso/test/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ class MockHttpFilterDsoImpl : public HttpFilterDso {
(processState * p0, GoUint64 p1, GoUint64 p2, GoUint64 p3));
MOCK_METHOD(GoUint64, envoyGoFilterOnHttpData,
(processState * p0, GoUint64 p1, GoUint64 p2, GoUint64 p3));
MOCK_METHOD(void, envoyGoFilterOnHttpLog, (httpRequest * p0, int p1));
MOCK_METHOD(void, envoyGoFilterOnHttpLog,
(httpRequest * p0, int p1, processState* p2, processState* p3, GoUint64 p4,
GoUint64 p5, GoUint64 p6, GoUint64 p7, GoUint64 p8, GoUint64 p9, GoUint64 p10,
GoUint64 p11));
MOCK_METHOD(void, envoyGoFilterOnHttpDestroy, (httpRequest * p0, int p1));
MOCK_METHOD(void, envoyGoRequestSemaDec, (httpRequest * p0));
MOCK_METHOD(void, envoyGoFilterCleanUp, ());
Expand Down
4 changes: 3 additions & 1 deletion contrib/golang/common/dso/test/test_data/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ func envoyGoFilterOnHttpData(s *C.processState, endStream, buffer, length uint64
}

//export envoyGoFilterOnHttpLog
func envoyGoFilterOnHttpLog(r *C.httpRequest, logType uint64) {
func envoyGoFilterOnHttpLog(r *C.httpRequest, logType uint64, decodingState *C.processState, encodingState *C.processState,
reqHeaderNum, reqHeaderBytes, reqTrailerNum, reqTrailerBytes,
respHeaderNum, respHeaderBytes, respTrailerNum, respTrailerBytes uint64) {
}

//export envoyGoFilterOnHttpDestroy
Expand Down
8 changes: 8 additions & 0 deletions contrib/golang/common/go/api/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@
// NOLINT(namespace-envoy)

#ifdef __cplusplus
#include <atomic>

#define _Atomic(X) std::atomic<X>

extern "C" {
#else
#include <stdatomic.h> // NOLINT(modernize-deprecated-headers)
#endif

#include <stdint.h> // NOLINT(modernize-deprecated-headers)
Expand All @@ -27,6 +33,8 @@ typedef struct httpRequest { // NOLINT(modernize-use-using)
// The ID of the worker that is processing this request, this enables the go filter to dedicate
// memory to each worker and not require locks
uint32_t worker_id;
// This flag will be read & written by different threads, so it need to be atomic
_Atomic(int) is_golang_processing_log;
} httpRequest;

typedef struct { // NOLINT(modernize-use-using)
Expand Down
12 changes: 6 additions & 6 deletions contrib/golang/common/go/api/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,22 +81,22 @@ type StreamFilter interface {
StreamEncoderFilter

// log
OnLog()
OnLogDownstreamStart()
OnLogDownstreamPeriodic()
OnLog(RequestHeaderMap, RequestTrailerMap, ResponseHeaderMap, ResponseTrailerMap)
OnLogDownstreamStart(RequestHeaderMap)
OnLogDownstreamPeriodic(RequestHeaderMap, RequestTrailerMap, ResponseHeaderMap, ResponseTrailerMap)

// destroy filter
OnDestroy(DestroyReason)
// TODO add more for stream complete
}

func (*PassThroughStreamFilter) OnLog() {
func (*PassThroughStreamFilter) OnLog(RequestHeaderMap, RequestTrailerMap, ResponseHeaderMap, ResponseTrailerMap) {
}

func (*PassThroughStreamFilter) OnLogDownstreamStart() {
func (*PassThroughStreamFilter) OnLogDownstreamStart(RequestHeaderMap) {
}

func (*PassThroughStreamFilter) OnLogDownstreamPeriodic() {
func (*PassThroughStreamFilter) OnLogDownstreamPeriodic(RequestHeaderMap, RequestTrailerMap, ResponseHeaderMap, ResponseTrailerMap) {
}

func (*PassThroughStreamFilter) OnDestroy(DestroyReason) {
Expand Down
71 changes: 65 additions & 6 deletions contrib/golang/filters/http/source/go/pkg/http/shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,13 @@ func envoyGoFilterOnHttpData(s *C.processState, endStream, buffer, length uint64
}

//export envoyGoFilterOnHttpLog
func envoyGoFilterOnHttpLog(r *C.httpRequest, logType uint64) {
func envoyGoFilterOnHttpLog(r *C.httpRequest, logType uint64,
decodingStateWrapper *C.processState, encodingStateWrapper *C.processState,
reqHeaderNum, reqHeaderBytes, reqTrailerNum, reqTrailerBytes,
respHeaderNum, respHeaderBytes, respTrailerNum, respTrailerBytes uint64) {

decodingState := getOrCreateState(decodingStateWrapper)
encodingState := getOrCreateState(encodingStateWrapper)
req := getRequest(r)
if req == nil {
req = createRequest(r)
Expand All @@ -276,14 +282,67 @@ func envoyGoFilterOnHttpLog(r *C.httpRequest, logType uint64) {

v := api.AccessLogType(logType)

// Request headers must exist because the HTTP filter won't be run if the headers are
// not sent yet.
// TODO: make the headers/trailers read-only
reqHeader := &requestHeaderMapImpl{
requestOrResponseHeaderMapImpl{
headerMapImpl{
state: decodingState,
headerNum: reqHeaderNum,
headerBytes: reqHeaderBytes,
},
},
}

var reqTrailer api.RequestTrailerMap
if reqTrailerNum != 0 {
reqTrailer = &requestTrailerMapImpl{
requestOrResponseTrailerMapImpl{
headerMapImpl{
state: decodingState,
headerNum: reqTrailerNum,
headerBytes: reqTrailerBytes,
},
},
}
}

var respHeader api.ResponseHeaderMap
if respHeaderNum != 0 {
respHeader = &responseHeaderMapImpl{
requestOrResponseHeaderMapImpl{
headerMapImpl{
state: encodingState,
headerNum: respHeaderNum,
headerBytes: respHeaderBytes,
},
},
}
}

var respTrailer api.ResponseTrailerMap
if respTrailerNum != 0 {
respTrailer = &responseTrailerMapImpl{
requestOrResponseTrailerMapImpl{
headerMapImpl{
state: encodingState,
headerNum: respTrailerNum,
headerBytes: respTrailerBytes,
},
},
}
}

f := req.httpFilter

switch v {
case api.AccessLogDownstreamStart:
f.OnLogDownstreamStart()
case api.AccessLogDownstreamPeriodic:
f.OnLogDownstreamPeriodic()
case api.AccessLogDownstreamEnd:
f.OnLog()
f.OnLog(reqHeader, reqTrailer, respHeader, respTrailer)
case api.AccessLogDownstreamPeriodic:
f.OnLogDownstreamPeriodic(reqHeader, reqTrailer, respHeader, respTrailer)
case api.AccessLogDownstreamStart:
f.OnLogDownstreamStart(reqHeader)
default:
api.LogErrorf("access log type %d is not supported yet", logType)
}
Expand Down
64 changes: 53 additions & 11 deletions contrib/golang/filters/http/source/golang_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ Http::FilterTrailersStatus Filter::decodeTrailers(Http::RequestTrailerMap& trail
ProcessorState& state = decoding_state_;
ENVOY_LOG(debug, "golang filter decodeTrailers, decoding state: {}", state.stateStr());

request_trailers_ = &trailers;

bool done = doTrailer(state, trailers);

return done ? Http::FilterTrailersStatus::Continue : Http::FilterTrailersStatus::StopIteration;
Expand All @@ -91,10 +93,10 @@ Http::FilterHeadersStatus Filter::encodeHeaders(Http::ResponseHeaderMap& headers
activation_response_headers_ = dynamic_cast<const Http::ResponseHeaderMap*>(&headers);

// NP: may enter encodeHeaders in any state,
// since other filters or filtermanager could call encodeHeaders or sendLocalReply in any time.
// eg. filtermanager may invoke sendLocalReply, when scheme is invalid,
// with "Sending local reply with details // http1.invalid_scheme" details.
// This means DecodeXXX & EncodeXXX may run concurrently in Golang side.
// since other filters or filtermanager could call encodeHeaders or sendLocalReply in any
// time. eg. filtermanager may invoke sendLocalReply, when scheme is invalid, with "Sending
// local reply with details // http1.invalid_scheme" details. This means DecodeXXX & EncodeXXX
// may run concurrently in Golang side.

bool done = doHeaders(encoding_state_, headers, end_stream);

Expand Down Expand Up @@ -159,21 +161,61 @@ void Filter::onDestroy() {
// access_log is executed before the log of the stream filter
void Filter::log(const Formatter::HttpFormatterContext& log_context,
const StreamInfo::StreamInfo&) {
uint64_t req_header_num = 0;
uint64_t req_header_bytes = 0;
uint64_t req_trailer_num = 0;
uint64_t req_trailer_bytes = 0;
uint64_t resp_header_num = 0;
uint64_t resp_header_bytes = 0;
uint64_t resp_trailer_num = 0;
uint64_t resp_trailer_bytes = 0;

auto decoding_state = dynamic_cast<processState*>(&decoding_state_);
auto encoding_state = dynamic_cast<processState*>(&encoding_state_);

// `log` may be called multiple times with different log type
switch (log_context.accessLogType()) {
case Envoy::AccessLog::AccessLogType::DownstreamStart:
case Envoy::AccessLog::AccessLogType::DownstreamPeriodic:
case Envoy::AccessLog::AccessLogType::DownstreamEnd:
// log called by AccessLogDownstreamStart will happen before doHeaders
if (initRequest()) {
request_headers_ = static_cast<Http::RequestOrResponseHeaderMap*>(
const_cast<Http::RequestHeaderMap*>(&log_context.requestHeaders()));
request_headers_ = const_cast<Http::RequestHeaderMap*>(&log_context.requestHeaders());
}

if (request_headers_ != nullptr) {
req_header_num = request_headers_->size();
req_header_bytes = request_headers_->byteSize();
decoding_state_.headers = request_headers_;
}

if (request_trailers_ != nullptr) {
req_trailer_num = request_trailers_->size();
req_trailer_bytes = request_trailers_->byteSize();
decoding_state_.trailers = request_trailers_;
}

activation_response_headers_ = &log_context.responseHeaders();
if (activation_response_headers_ != nullptr) {
resp_header_num = activation_response_headers_->size();
resp_header_bytes = activation_response_headers_->byteSize();
encoding_state_.headers = const_cast<Http::ResponseHeaderMap*>(activation_response_headers_);
}

activation_response_trailers_ = &log_context.responseTrailers();
if (activation_response_trailers_ != nullptr) {
resp_trailer_num = activation_response_trailers_->size();
resp_trailer_bytes = activation_response_trailers_->byteSize();
encoding_state_.trailers =
const_cast<Http::ResponseTrailerMap*>(activation_response_trailers_);
}

// This only run in the work thread, it's safe even without lock.
is_golang_processing_log_ = true;
dynamic_lib_->envoyGoFilterOnHttpLog(req_, int(log_context.accessLogType()));
is_golang_processing_log_ = false;
req_->is_golang_processing_log = 1;
dynamic_lib_->envoyGoFilterOnHttpLog(req_, int(log_context.accessLogType()), decoding_state,
encoding_state, req_header_num, req_header_bytes,
req_trailer_num, req_trailer_bytes, resp_header_num,
resp_header_bytes, resp_trailer_num, resp_trailer_bytes);
req_->is_golang_processing_log = 0;
break;
default:
// skip calling with unsupported log types
Expand Down Expand Up @@ -1127,7 +1169,7 @@ CAPIStatus Filter::getStringProperty(absl::string_view path, uint64_t* value_dat
}

// to access the headers_ and its friends we need to hold the lock
activation_request_headers_ = dynamic_cast<const Http::RequestHeaderMap*>(request_headers_);
activation_request_headers_ = request_headers_;

if (isThreadSafe()) {
return getStringPropertyCommon(path, value_data, value_len);
Expand Down
8 changes: 3 additions & 5 deletions contrib/golang/filters/http/source/golang_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,7 @@ class Filter : public Http::StreamFilter,
GoInt32* rc);

bool isProcessingInGo() {
return is_golang_processing_log_ || decoding_state_.isProcessingInGo() ||
encoding_state_.isProcessingInGo();
return decoding_state_.isProcessingInGo() || encoding_state_.isProcessingInGo();
}
void deferredDeleteRequest(HttpRequestInternal* req);

Expand Down Expand Up @@ -346,7 +345,8 @@ class Filter : public Http::StreamFilter,

// save temp values for fetching request attributes in the later phase,
// like getting request size
Http::RequestOrResponseHeaderMap* request_headers_{nullptr};
Http::RequestHeaderMap* request_headers_{nullptr};
Http::RequestTrailerMap* request_trailers_{nullptr};

HttpRequestInternal* req_{nullptr};

Expand All @@ -360,8 +360,6 @@ class Filter : public Http::StreamFilter,
// back from go).
Thread::MutexBasicLockable mutex_{};
bool has_destroyed_ ABSL_GUARDED_BY(mutex_){false};

bool is_golang_processing_log_{false};
};

struct httpConfigInternal : httpConfig {
Expand Down
2 changes: 1 addition & 1 deletion contrib/golang/filters/http/source/processor_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class ProcessorState : public processState, public Logger::Loggable<Logger::Id::
bool isProcessingInGo() {
return filterState() == FilterState::ProcessingHeader ||
filterState() == FilterState::ProcessingData ||
filterState() == FilterState::ProcessingTrailer;
filterState() == FilterState::ProcessingTrailer || req->is_golang_processing_log;
}
bool isProcessingHeader() { return filterState() == FilterState::ProcessingHeader; }

Expand Down
6 changes: 4 additions & 2 deletions contrib/golang/filters/http/test/golang_filter_fuzz_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ DEFINE_PROTO_FUZZER(const envoy::extensions::filters::http::golang::GolangFilter
.WillByDefault(Return(static_cast<uint64_t>(GolangStatus::Continue)));
ON_CALL(*dso_lib.get(), envoyGoFilterOnHttpData(_, _, _, _))
.WillByDefault(Return(static_cast<uint64_t>(GolangStatus::Continue)));
ON_CALL(*dso_lib.get(), envoyGoFilterOnHttpLog(_, _))
.WillByDefault(Invoke([&](httpRequest*, int) -> void {}));
ON_CALL(*dso_lib.get(), envoyGoFilterOnHttpLog(_, _, _, _, _, _, _, _, _, _, _, _))
.WillByDefault(
Invoke([&](httpRequest*, int, processState*, processState*, GoUint64, GoUint64, GoUint64,
GoUint64, GoUint64, GoUint64, GoUint64, GoUint64) -> void {}));
ON_CALL(*dso_lib.get(), envoyGoFilterOnHttpDestroy(_, _))
.WillByDefault(Invoke([&](httpRequest* p0, int) -> void {
// delete the filter->req_, make LeakSanitizer happy.
Expand Down
Loading

0 comments on commit 69ee8cf

Please sign in to comment.