Skip to content

Commit

Permalink
Add fetch support (#82)
Browse files Browse the repository at this point in the history
* fetch support, add lambda export logs

* update docker-compose command

* Skip fetch on < 18

* Update test skip

* Don't repeatedly instrument

* Use func not =>

* lower threshold for skipped tests on node <18

* Skip nest on node14

* Raise nestjs test version

* Fix test for skip cases
  • Loading branch information
prodion23 authored Aug 30, 2024
1 parent e9aa1f8 commit 1a508fc
Show file tree
Hide file tree
Showing 8 changed files with 506 additions and 116 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr_build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
- run: tsc

- name: Standup Test Docker Services
run: cd ./test/externalServices && docker-compose up -d
run: cd ./test/externalServices && docker compose up -d
- name: Wait for Docker
run: sleep 100
- name: Check test containers state
Expand Down
2 changes: 1 addition & 1 deletion .nycrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
"check-coverage": true,
"include": "src/**/*.ts",
"exclude": "src/config/generated.*, src/instrumentation/HttpInstrumentationWrapper.ts, src/instrumentation/*.original.ts",
"lines": 79
"lines": 75
}
4 changes: 3 additions & 1 deletion src/HypertraceAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import {Framework} from "./instrumentation/Framework";
import {LambdaRequestHook, LambdaResponseHook} from "./instrumentation/LambdaInstrumentationWrapper";
import {patchHapi} from "./instrumentation/wrapper/HapiWrapper";
import {ChannelCredentials} from "@grpc/grpc-js";
import {patchFetch} from "./instrumentation/wrapper/FetchWrapper";

const api = require("@opentelemetry/api");
const grpc = require('@grpc/grpc-js');
Expand Down Expand Up @@ -82,6 +83,7 @@ export class HypertraceAgent {
patchClientRequest()
patchExpress()
patchSails()
patchFetch()

let instrumentations = [
new ExtendedAwsLambdaInstrumentation({
Expand All @@ -107,7 +109,7 @@ export class HypertraceAgent {
new MySQL2Instrumentation(),
new PgInstrumentation(),
new MongoDBInstrumentation(),
new MongooseInstrumentation()
new MongooseInstrumentation(),
]

if (isCompatible("12.0.0")) {
Expand Down
17 changes: 17 additions & 0 deletions src/instrumentation/BodyCapture.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,22 @@
import sizeof from 'object-sizeof'

const _RECORDABLE_CONTENT_TYPES = ['application/json', 'application/graphql', 'application/x-www-form-urlencoded']
export function shouldCapture(configField: boolean, contentTypeValue: any): boolean {
if (!configField) {
return false
}
if(!contentTypeValue){
return false
}

for (let recordableType of _RECORDABLE_CONTENT_TYPES) {
if (contentTypeValue.includes(recordableType)) {
return true
}
}
return false
}

export class BodyCapture {
private data: string;
private currentSize : number;
Expand Down
2 changes: 2 additions & 0 deletions src/instrumentation/ExtendedAwsLambdaInstrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export class ExtendedAwsLambdaInstrumentation extends AwsLambdaInstrumentation {

span.end();
try {
logger.debug("Exporting spans from node...")
await traceProvider.forceFlush()
}catch(e){
logger.error("Error exporting trace in extendedLambdaHandler original invoke attempt, continue without export")
Expand All @@ -62,6 +63,7 @@ export class ExtendedAwsLambdaInstrumentation extends AwsLambdaInstrumentation {
});
span.end();
try {
logger.debug("Exporting spans from node-agent...")
await traceProvider.forceFlush()
}catch(e){
logger.error("Error exporting trace in extendedLambdaHandler error handler, continue without export")
Expand Down
171 changes: 171 additions & 0 deletions src/instrumentation/wrapper/FetchWrapper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
import {SemanticAttributes} from "@opentelemetry/semantic-conventions";
import {BodyCapture, shouldCapture} from "../BodyCapture";
import {Config} from "../../config/config";
import {logger} from "../../Logging";
import {SpanKind} from "@opentelemetry/api";
const {context, trace} = require('@opentelemetry/api');

let PATCHED = false;

export function patchFetch() {
if (typeof fetch !== 'undefined') {
logger.info("Adding fetch patch")
} else {
return
}
if(PATCHED){
return
} else {
PATCHED = true
}
const originalFetch = fetch;
global.fetch = async function (urlOrRequest: RequestInfo, options: RequestInit = {}) {
let response
let span = startSpanFromRequest(urlOrRequest, options)
try {
response = await originalFetch(urlOrRequest, options);
} catch(e){
if(span){
span.recordException(e);
span.end()
}
// re-raise underlying fetch error
throw e
}

if (span === null) {
// this means something errored during req cap, just return response to caller without res cap
return response;
} else {
await recordResponseData(span, response)
}
return response
};
}

function startSpanFromRequest(urlOrRequest: RequestInfo, options: RequestInit = {}) {
let span;
try {
let url: URL;
let optionsToUse = options;

// Handle if urlOrRequest is a Request object
if (urlOrRequest instanceof Request) {
url = new URL(urlOrRequest.url);
optionsToUse = {
...options,
method: urlOrRequest.method,
headers: urlOrRequest.headers,
body: urlOrRequest.body,
};
} else {
url = new URL(urlOrRequest)
}
const parsedUrl = new URL(url);
const scheme = parsedUrl.protocol.slice(0, -1);
const currentContext = context.active();
const tracer = trace.getTracer('hypertrace-fetch');
const requestMethod = getRequestMethod(options['method']);
const attributes = {
[SemanticAttributes.HTTP_METHOD]: requestMethod,
[SemanticAttributes.HTTP_SCHEME]: scheme,
[SemanticAttributes.HTTP_URL]: parsedUrl.toString(),
[SemanticAttributes.NET_PEER_NAME]: parsedUrl.hostname,
[SemanticAttributes.NET_PEER_PORT]: parsedUrl.port,
[SemanticAttributes.HTTP_TARGET]: parsedUrl.pathname,
};

let spanOptions = {
kind: SpanKind.CLIENT,
attributes: attributes
}

span = tracer.startSpan(`${requestMethod} ${parsedUrl.pathname}`, spanOptions, currentContext);

const headers = optionsToUse.headers || (urlOrRequest instanceof Request ? urlOrRequest.headers : {});
let reqContentType = ''
if (headers instanceof Headers) {
headers.forEach((value, key) => {
let lKey = key.toLowerCase()
if (lKey === "content-type") {
reqContentType = value
}
span.setAttribute(`http.request.header.${lKey}`, value);
});
} else if (typeof headers === 'object') {
for (const key in headers) {
if (headers.hasOwnProperty(key)) {
let lKey = key.toLowerCase()
if (lKey === "content-type") {
reqContentType = headers[key]
}
span.setAttribute(`http.request.header.${lKey}`, headers[key]);
}
}
}

let bodyCapture: BodyCapture = new BodyCapture(<number>Config.getInstance().config.data_capture!.body_max_size_bytes!,
<number>Config.getInstance().config.data_capture!.body_max_processing_size_bytes!)
if (shouldCapture(<boolean>Config.getInstance().config.data_capture!.http_body!.request!, reqContentType)) {
if (options.body && typeof options.body === 'string') {
bodyCapture.appendData(options.body)
span.setAttribute("http.request.body", bodyCapture.dataString())
}
}
} catch (e) {
if (span) {
span.end()
}
logger.error("Error during request capture phase of fetch instrumentation", e)
return null
}
return span
}

async function recordResponseData(span, response) {
try {
let responseBody;
const clone = response.clone();
let resContentType
if (response.headers instanceof Headers) {
response.headers.forEach((value, key) => {
let lKey = key.toLowerCase()
if (lKey === "content-type") {
resContentType = value
}
span.setAttribute(`http.response.header.${lKey}`, value);
});
} else if (typeof response.headers === 'object') {
for (const key in response.headers) {
if (response.headers.hasOwnProperty(key)) {
let lKey = key.toLowerCase()
if (lKey === "content-type") {
resContentType = response.headers[key]
}
span.setAttribute(`http.response.header.${lKey}`, response.headers[key]);
}
}
}

if (shouldCapture(<boolean>Config.getInstance().config.data_capture!.http_body!.response!, resContentType)) {
let resBodyCapture: BodyCapture = new BodyCapture(<number>Config.getInstance().config.data_capture!.body_max_size_bytes!,
<number>Config.getInstance().config.data_capture!.body_max_processing_size_bytes!)
let respData = await clone.text()
resBodyCapture.appendData(respData)
span.setAttribute("http.response.body", resBodyCapture.dataString())
}
span.setAttribute(SemanticAttributes.HTTP_STATUS_CODE, response.status)
span.end()
} catch (e) {
span.recordException(e)
span.end()
}

}

function getRequestMethod(methodStr) {
if (!methodStr) {
return 'GET'
}
return methodStr;
}
Loading

0 comments on commit 1a508fc

Please sign in to comment.