diff --git a/ingestors/golang/gin/go.sum b/ingestors/golang/gin/go.sum index ee49d247..a5fac2d9 100644 --- a/ingestors/golang/gin/go.sum +++ b/ingestors/golang/gin/go.sum @@ -746,8 +746,6 @@ github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/ github.com/mattn/go-isatty v0.0.18 h1:DOKFKCQ7FNG2L1rbrmstDN4QVRdS89Nkh85u68Uwp98= github.com/mattn/go-isatty v0.0.18/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-sqlite3 v1.14.14/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= -github.com/metlo-labs/metlo/ingestors/golang/metlo v1.0.0 h1:ePTrnizCXI5HzKe4qpEfYlcxGOEyySOAi9EAmUfWoRU= -github.com/metlo-labs/metlo/ingestors/golang/metlo v1.0.0/go.mod h1:IrHJdeANVNi9deZW3NmZfCp8nk0I4ERfA+x+4DC1OnU= github.com/metlo-labs/metlo/ingestors/golang/metlo v1.0.1 h1:r+B3lFyM+bl1GqBg9gNmDNZ1eQuSaRZuwujarTLczr8= github.com/metlo-labs/metlo/ingestors/golang/metlo v1.0.1/go.mod h1:ZPUPd2/cfz4DKH/xJfN4F0xxSyAcPHVApeQSGd1aYW0= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= diff --git a/ingestors/golang/gin/metlo_gin.go b/ingestors/golang/gin/metlo_gin.go index 21d0e09f..feca59ca 100644 --- a/ingestors/golang/gin/metlo_gin.go +++ b/ingestors/golang/gin/metlo_gin.go @@ -16,6 +16,7 @@ const MAX_BODY int = 10 * 1024 type metloApp interface { Send(data metlo.MetloTrace) + Block(req metlo.TraceReq, meta metlo.TraceMeta) bool Allow() bool } @@ -29,6 +30,14 @@ type metloInstrumentation struct { app metloApp serverHost string serverPort int + rejectFn func(*gin.Context) +} + +type CustomInitParams struct { + app metloApp + serverHost string + serverPort int + rejectFn func(*gin.Context) } func Init(app metloApp) metloInstrumentation { @@ -40,6 +49,7 @@ func CustomInit(app metloApp, serverHost string, serverPort int) metloInstrument app: app, serverHost: serverHost, serverPort: serverPort, + rejectFn: nil, } } @@ -59,6 +69,10 @@ func (w bodyLogWriter) Write(b []byte) (int, error) { return w.ResponseWriter.Write(b) } +func (m *metloInstrumentation) SetRejectFn(rejectFn func(*gin.Context)) { + m.rejectFn = rejectFn +} + func (m *metloInstrumentation) Middleware(c *gin.Context) { var bytesWritten *int = new(int) *bytesWritten = 0 @@ -88,8 +102,37 @@ func (m *metloInstrumentation) Middleware(c *gin.Context) { log.Println("Metlo couldn't find source port for incoming request") } - c.Next() + req := metlo.TraceReq{ + Url: metlo.TraceUrl{ + Host: c.Request.Host, + Path: c.Request.URL.Path, + Parameters: queryParams, + }, + Headers: reqHeaders, + Body: string(body), + Method: c.Request.Method, + } + + meta := metlo.TraceMeta{ + Environment: "production", + Incoming: true, + Source: c.ClientIP(), + SourcePort: sourcePort, + Destination: m.serverHost, + DestinationPort: m.serverPort, + MetloSource: "go/gin", + } + if m.app.Block(req, meta) { + if m.rejectFn != nil { + m.rejectFn(c) + } else { + c.String(403, "Forbidden") + } + c.Abort() + } else { + c.Next() + } resHeaderMap := c.Writer.Header() resHeaders := make([]metlo.NV, 0) for k := range resHeaderMap { @@ -97,30 +140,13 @@ func (m *metloInstrumentation) Middleware(c *gin.Context) { } tr := metlo.MetloTrace{ - Request: metlo.TraceReq{ - Url: metlo.TraceUrl{ - Host: c.Request.Host, - Path: c.Request.URL.Path, - Parameters: queryParams, - }, - Headers: reqHeaders, - Body: string(body), - Method: c.Request.Method, - }, + Request: req, Response: metlo.TraceRes{ Status: blw.Status(), Body: blw.body.String(), Headers: resHeaders, }, - Meta: metlo.TraceMeta{ - Environment: "production", - Incoming: true, - Source: c.ClientIP(), - SourcePort: sourcePort, - Destination: m.serverHost, - DestinationPort: m.serverPort, - MetloSource: "go/gin", - }, + Meta: meta, } go m.app.Send(tr) diff --git a/ingestors/golang/gorilla/go.mod b/ingestors/golang/gorilla/go.mod index a2e26839..b1f6fa15 100644 --- a/ingestors/golang/gorilla/go.mod +++ b/ingestors/golang/gorilla/go.mod @@ -2,7 +2,4 @@ module github.com/metlo-labs/metlo/ingestors/golang/gorilla go 1.11 -require ( - github.com/metlo-labs/metlo/ingestors/golang/metlo v1.0.1 - golang.org/x/net v0.10.0 // indirect -) +require github.com/metlo-labs/metlo/ingestors/golang/metlo v1.0.1 diff --git a/ingestors/golang/gorilla/go.sum b/ingestors/golang/gorilla/go.sum index 0d79d621..7a83a52c 100644 --- a/ingestors/golang/gorilla/go.sum +++ b/ingestors/golang/gorilla/go.sum @@ -710,8 +710,6 @@ github.com/lyft/protoc-gen-star/v2 v2.0.1/go.mod h1:RcCdONR2ScXaYnQC5tUzxzlpA3WV github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-sqlite3 v1.14.14/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= -github.com/metlo-labs/metlo/ingestors/golang/metlo v1.0.0 h1:ePTrnizCXI5HzKe4qpEfYlcxGOEyySOAi9EAmUfWoRU= -github.com/metlo-labs/metlo/ingestors/golang/metlo v1.0.0/go.mod h1:IrHJdeANVNi9deZW3NmZfCp8nk0I4ERfA+x+4DC1OnU= github.com/metlo-labs/metlo/ingestors/golang/metlo v1.0.1 h1:r+B3lFyM+bl1GqBg9gNmDNZ1eQuSaRZuwujarTLczr8= github.com/metlo-labs/metlo/ingestors/golang/metlo v1.0.1/go.mod h1:ZPUPd2/cfz4DKH/xJfN4F0xxSyAcPHVApeQSGd1aYW0= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= diff --git a/ingestors/golang/gorilla/metlo_gorilla.go b/ingestors/golang/gorilla/metlo_gorilla.go index a8033ee3..de564d50 100644 --- a/ingestors/golang/gorilla/metlo_gorilla.go +++ b/ingestors/golang/gorilla/metlo_gorilla.go @@ -16,6 +16,7 @@ const MAX_BODY int = 10 * 1024 type metloApp interface { Send(data metlo.MetloTrace) + Block(req metlo.TraceReq, meta metlo.TraceMeta) bool Allow() bool } @@ -30,6 +31,7 @@ type metloInstrumentation struct { app metloApp serverHost string serverPort int + rejectFn func(http.ResponseWriter, http.Handler) } func Init(app metloApp) metloInstrumentation { @@ -79,7 +81,6 @@ func (m *metloInstrumentation) Middleware(next http.Handler) http.Handler { r.Body = ioutil.NopCloser(bytes.NewReader(body)) if m.app.Allow() { - next.ServeHTTP(logRespWriter, r) reqHeaders := make([]metlo.NV, 0) for k := range r.Header { reqHeaders = append(reqHeaders, metlo.NV{Name: k, Value: strings.Join(r.Header[k], ",")}) @@ -105,36 +106,51 @@ func (m *metloInstrumentation) Middleware(next http.Handler) http.Handler { log.Println("Metlo couldn't find source port for incoming request") } + request := metlo.TraceReq{ + Url: metlo.TraceUrl{ + Host: r.Host, + Path: r.URL.Path, + Parameters: queryParams, + }, + Headers: reqHeaders, + Body: string(body), + Method: r.Method, + } + + meta := metlo.TraceMeta{ + Environment: "production", + Incoming: true, + Source: sourceIp, + SourcePort: sourcePort, + Destination: m.serverHost, + DestinationPort: m.serverPort, + MetloSource: "go/gorilla", + } + + if m.app.Block(request, meta) { + if m.rejectFn != nil { + m.rejectFn(logRespWriter, next) + } else { + logRespWriter.Write([]byte("Forbidden")) + logRespWriter.statusCode = 403 + } + } else { + next.ServeHTTP(logRespWriter, r) + } + statusCode := logRespWriter.statusCode if statusCode == 0 { statusCode = 200 } tr := metlo.MetloTrace{ - Request: metlo.TraceReq{ - Url: metlo.TraceUrl{ - Host: r.Host, - Path: r.URL.Path, - Parameters: queryParams, - }, - Headers: reqHeaders, - Body: string(body), - Method: r.Method, - }, + Request: request, Response: metlo.TraceRes{ Status: statusCode, Body: logRespWriter.buf.String(), Headers: resHeaders, }, - Meta: metlo.TraceMeta{ - Environment: "production", - Incoming: true, - Source: sourceIp, - SourcePort: sourcePort, - Destination: m.serverHost, - DestinationPort: m.serverPort, - MetloSource: "go/gorilla", - }, + Meta: meta, } go m.app.Send(tr) diff --git a/ingestors/golang/metlo/go_interface.h b/ingestors/golang/metlo/go_interface.h new file mode 100644 index 00000000..72a9b6d9 --- /dev/null +++ b/ingestors/golang/metlo/go_interface.h @@ -0,0 +1,11 @@ +#include "structs.h" + +unsigned char Metlo_block_trace(Metlo_ExchangeStruct data); +void Metlo_ingest_trace(Metlo_ApiTrace trace); +unsigned char Metlo_startup( + char *metlo_url, + char *api_key, + unsigned short backend_port, + unsigned short collector_port, + char *log_level, + char *encryption_key); diff --git a/ingestors/golang/metlo/mappers.go b/ingestors/golang/metlo/mappers.go index 9bb01064..240c7c97 100644 --- a/ingestors/golang/metlo/mappers.go +++ b/ingestors/golang/metlo/mappers.go @@ -1,45 +1,119 @@ package metlo -import ( - mi "github.com/metlo-labs/metlo/ingestors/golang/metlo/proto" -) - -func MapMetloTraceToMetloIngestRPC(trace MetloTrace) mi.ApiTrace { - reqURLParams := make([]*mi.KeyVal, 0) - for _, k := range trace.Request.Url.Parameters { - reqURLParams = append(reqURLParams, &mi.KeyVal{Name: k.Name, Value: k.Value}) - } - reqHeaders := make([]*mi.KeyVal, 0) - for _, k := range trace.Request.Headers { - reqHeaders = append(reqHeaders, &mi.KeyVal{Name: k.Name, Value: k.Value}) - } - respHeaders := make([]*mi.KeyVal, 0) - for _, k := range trace.Response.Headers { - respHeaders = append(respHeaders, &mi.KeyVal{Name: k.Name, Value: k.Value}) - } - return mi.ApiTrace{ - Response: &mi.ApiResponse{ - Status: int32(trace.Response.Status), - Headers: respHeaders, - Body: trace.Response.Body, - }, - Request: &mi.ApiRequest{ - Method: trace.Request.Method, - Url: &mi.ApiUrl{ - Host: trace.Request.Url.Host, - Path: trace.Request.Url.Path, - Parameters: reqURLParams, - }, - Headers: reqHeaders, - Body: trace.Request.Body, - }, - Meta: &mi.ApiMeta{ - Environment: trace.Meta.Environment, - Incoming: trace.Meta.Incoming, - Source: trace.Meta.Source, - SourcePort: int32(trace.Meta.SourcePort), - Destination: trace.Meta.Destination, - DestinationPort: int32(trace.Meta.DestinationPort), +/* + #include "stdlib.h" + #include "go_interface.h" +*/ +import "C" +import "unsafe" + +func MapMetloTraceToCStruct(trace MetloTrace) C.Metlo_ApiTrace { + return C.Metlo_ApiTrace{ + Req: MapMetloRequestToCStruct(trace.Request), + Res: MapMetloResponseToCStruct(trace.Response), + Meta: MapMetloMetadataToCStruct(trace.Meta), + } +} + +func MapMetloRequestToCStruct(request TraceReq) C.Metlo_Request { + reqURLParams := make([]C.Metlo_NVPair, 0) + for _, k := range request.Url.Parameters { + reqURLParams = append(reqURLParams, C.Metlo_NVPair{Name: C.CString(k.Name), Value: C.CString(k.Value)}) + } + reqHeaders := make([]C.Metlo_NVPair, 0) + for _, k := range request.Headers { + reqHeaders = append(reqHeaders, C.Metlo_NVPair{Name: C.CString(k.Name), Value: C.CString(k.Value)}) + } + + var reqURLParamsPtr *C.Metlo_NVPair = nil + if len(request.Url.Parameters) > 0 { + reqURLParamsPtr = &reqURLParams[0] + } + + var reqHeadersPtr *C.Metlo_NVPair = nil + if len(request.Headers) > 0 { + reqHeadersPtr = &reqHeaders[0] + } + + return C.Metlo_Request{ + Method: C.CString(request.Method), + Url: C.Metlo_ApiUrl{ + Host: C.CString(request.Url.Host), + Path: C.CString(request.Url.Path), + Parameters: reqURLParamsPtr, + Parameters_size: C.uint(len(reqURLParams)), }, + Headers: reqHeadersPtr, + Headers_size: C.uint(len(reqHeaders)), + Body: C.CString(request.Body), + } +} + +func MapMetloResponseToCStruct(response TraceRes) C.Metlo_Response { + respHeaders := make([]C.Metlo_NVPair, 0) + for _, k := range response.Headers { + respHeaders = append(respHeaders, C.Metlo_NVPair{Name: C.CString(k.Name), Value: C.CString(k.Value)}) + } + + var resHeadersPtr *C.Metlo_NVPair = nil + if len(response.Headers) > 0 { + resHeadersPtr = &respHeaders[0] + } + + return C.Metlo_Response{ + Status: C.ushort(response.Status), + Headers: resHeadersPtr, + Headers_size: C.uint(len(respHeaders)), + Body: C.CString(response.Body), } } + +func MapMetloMetadataToCStruct(metadata TraceMeta) C.Metlo_Metadata { + return C.Metlo_Metadata{ + Environment: C.CString(metadata.Environment), + Incoming: C.int(1), + Source: C.CString(metadata.Source), + Source_port: C.ushort(metadata.SourcePort), + Destination: C.CString(metadata.Destination), + Destination_port: C.ushort(metadata.DestinationPort), + } +} + +func FreeMetloTrace(trace C.Metlo_ApiTrace) { + FreeMetloRequest(trace.Req) + FreeMetloResponse(trace.Res) + FreeMetloMetadata(trace.Meta) +} + +func FreeMetloRequest(request C.Metlo_Request) { + body := unsafe.Pointer(request.Body) + C.free(body) + C.free(unsafe.Pointer(request.Method)) + C.free(unsafe.Pointer(request.Url.Host)) + C.free(unsafe.Pointer(request.Url.Path)) + params := (*[1 << 16]C.Metlo_NVPair)(unsafe.Pointer(request.Url.Parameters)) + for i := 0; i < int(request.Url.Parameters_size); i++ { + C.free(unsafe.Pointer(params[i].Name)) + C.free(unsafe.Pointer(params[i].Value)) + } + + headers := (*[1 << 16]C.Metlo_NVPair)(unsafe.Pointer(request.Headers)) + for i := 0; i < int(request.Headers_size); i++ { + C.free(unsafe.Pointer(headers[i].Name)) + C.free(unsafe.Pointer(headers[i].Value)) + } +} +func FreeMetloResponse(response C.Metlo_Response) { + C.free(unsafe.Pointer(response.Body)) + headers := (*[1 << 16]C.Metlo_NVPair)(unsafe.Pointer(response.Headers)) + for i := 0; i < int(response.Headers_size); i++ { + C.free(unsafe.Pointer(headers[i].Name)) + C.free(unsafe.Pointer(headers[i].Value)) + } +} + +func FreeMetloMetadata(meta C.Metlo_Metadata) { + C.free(unsafe.Pointer(meta.Environment)) + C.free(unsafe.Pointer(meta.Source)) + C.free(unsafe.Pointer(meta.Destination)) +} diff --git a/ingestors/golang/metlo/metlo.c b/ingestors/golang/metlo/metlo.c new file mode 100644 index 00000000..aa341293 --- /dev/null +++ b/ingestors/golang/metlo/metlo.c @@ -0,0 +1,129 @@ +#include +#include +#include + +#ifdef _WIN32 +#include +#endif + +#include "go_interface.h" + +char hasInit = 0; +void *handle; + +void handle_cleanup(void) +{ + if (handle != 0) + { + dlclose(handle); + } +} + +unsigned char (*metlo_startup)(char *metlo_url, + char *api_key, + unsigned short backend_port, + unsigned short collector_port, + char *log_level, + char *encryption_key); +unsigned char (*metlo_block_trace)(Metlo_ExchangeStruct data); +void (*metlo_ingest_trace)(Metlo_ApiTrace trace); + +unsigned char Metlo_startup( + char *metlo_url, + char *api_key, + unsigned short backend_port, + unsigned short collector_port, + char *log_level, + char *encryption_key) +{ + + char *metlo_dl_path = getenv("LIBMETLO_PATH"); + if (metlo_dl_path != 0) + { + handle = dlopen(metlo_dl_path, RTLD_LAZY); + } + else + { + + // appdata_path +#ifdef __linux__ + // On linux + handle = dlopen("/opt/metlo/libmetlo.so", RTLD_LAZY); +#elif __APPLE__ + // On Mac + handle = dlopen("/opt/metlo/libmetlo.so", RTLD_LAZY); +#elif _WIN32 + // On Windows + char *appdata_path = getenv("APPDATA"); + char *appdata_length = strlen(appdata_path); + int supplemental_length = 5 + 1 + 5 + 1 + 11 + 1; // local + / + metlo + / + libmetlo.so + 0 + char *total_path = (char *)calloc(appdata_length + supplemental_length, sizeof(char)); + int total_length = appdata_length + supplemental_length; + strncpy(total_path, appdata_path, appdata_length); + strncat(total_path, "local/metlo/libmetlo.so", total_length); + total_path[total_length - 1] = 0; + handle = dlopen(total_path, RTLD_LAZY); +#else + printf("Metlo currently only supports Mac, Linux, and Windows platforms.\n"); + printf("Metlo could not locate the symbols for any of these platforms.\n"); + printf("Please ensure that the correct headers are present.\n"); + return 0; +#endif + } + if (handle != 0) + { + atexit(handle_cleanup); + metlo_startup = (unsigned char (*)(char *metlo_url, + char *api_key, + unsigned short backend_port, + unsigned short collector_port, + char *log_level, + char *encryption_key))dlsym(handle, "metlo_startup"); + if (metlo_startup == 0) + { + printf("Metlo: Error setting up metlo_startup: %s\n", dlerror()); + hasInit = 0; + return 0; + } + metlo_block_trace = (unsigned char (*)(Metlo_ExchangeStruct data))dlsym(handle, "metlo_block_trace"); + if (metlo_startup == 0) + { + printf("Metlo: Error setting up metlo_block_trace: %s\n", dlerror()); + return 0; + } + metlo_ingest_trace = (void (*)(Metlo_ApiTrace data))dlsym(handle, "metlo_ingest_trace"); + if (metlo_startup == 0) + { + printf("Metlo: Error setting up metlo_ingest_trace: %s\n", dlerror()); + return 0; + } + unsigned char resp = metlo_startup(metlo_url, api_key, backend_port, collector_port, log_level, encryption_key); + hasInit = resp; + return resp; + } + else + { + printf("Metlo: Error loading dynamic library: %s\n", dlerror()); + return 0; + } +} + +unsigned char Metlo_block_trace(Metlo_ExchangeStruct data) +{ + if (hasInit == 1) + { + return metlo_block_trace(data); + } + else + { + return 0; + } +} + +void Metlo_ingest_trace(Metlo_ApiTrace trace) +{ + if (hasInit == 1) + { + metlo_ingest_trace(trace); + } +} diff --git a/ingestors/golang/metlo/metlo.go b/ingestors/golang/metlo/metlo.go index 1706c4e5..77c5e24c 100644 --- a/ingestors/golang/metlo/metlo.go +++ b/ingestors/golang/metlo/metlo.go @@ -1,73 +1,27 @@ package metlo +/* + #include + #include "go_interface.h" +*/ +import "C" import ( - "context" - "os" - "os/exec" - "strconv" "strings" - "sync" - "time" - - pb "github.com/metlo-labs/metlo/ingestors/golang/metlo/proto" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" + "syscall" + "unsafe" ) -type metlo struct { - disable bool - processStream pb.MetloIngest_ProcessTraceAsyncClient - rps int - metloHost string - metloKey string - backendPort int - collectorPort int - encryptionKey *string - logLevel LogLevel - reconnectMutex sync.Mutex - restartCount int - connectionRetryCount int - spawnedTask bool +type Metlo struct { + disable bool + metloHost string + metloKey string + backendPort int + collectorPort int + encryptionKey *string + logLevel LogLevel } -const MetloDefaultRPS int = 100 -const MaxRestartTries int = 10 -const MaxConnectTries int = 10 -const MaxConnectionRetries int = 1000 - -func (m *metlo) ConnectLocalProcessAgent() (pb.MetloIngest_ProcessTraceAsyncClient, error) { - var connectErr error = nil - for i := 0; i < MaxConnectTries; i++ { - if m.logLevel <= Trace { - logger.Println("Socket Dial Attempt ", i) - } - conn, err := grpc.Dial("unix:///tmp/metlo.sock", grpc.WithTransportCredentials(insecure.NewCredentials())) - if err == nil { - metloConn := pb.NewMetloIngestClient(conn) - stream_process_trace, err := metloConn.ProcessTraceAsync(context.Background()) - if err == nil { - if m.logLevel <= Trace { - logger.Println("Established connection") - } - return stream_process_trace, err - } else { - connectErr = err - } - } else { - if m.logLevel <= Debug { - logger.Println("Couldn't connect to metlo agent over socket for attempt ", i, err) - } - connectErr = err - } - time.Sleep(time.Second) - } - if m.logLevel <= Trace { - logger.Println("Couldn't establish connection") - } - return nil, connectErr -} - -func InitMetlo(metloHost string, metloKey string) *metlo { +func InitMetlo(metloHost string, metloKey string) *Metlo { var collector_port *int = nil var backend_port *int = nil if strings.Contains(metloHost, "app.metlo.com") { @@ -79,147 +33,83 @@ func InitMetlo(metloHost string, metloKey string) *metlo { } default_collector_port := 8081 collector_port = &default_collector_port - return InitMetloCustom(metloHost, metloKey, MetloDefaultRPS, *backend_port, *collector_port, nil, Info, false) + return InitMetloCustom(metloHost, metloKey, *backend_port, *collector_port, nil, Info, false) } -func InitMetloCustom(metloHost string, metloKey string, rps int, backendPort int, collectorPort int, encryptionKey *string, logLevel LogLevel, disable bool) *metlo { - inst := &metlo{ - rps: rps, - metloHost: metloHost, - metloKey: metloKey, - disable: disable, - backendPort: backendPort, - collectorPort: collectorPort, - encryptionKey: encryptionKey, - logLevel: logLevel, - reconnectMutex: sync.Mutex{}, - restartCount: 0, - connectionRetryCount: 0, - spawnedTask: false, - processStream: nil, +func InitMetloCustom(metloHost string, metloKey string, backendPort int, collectorPort int, encryptionKey *string, logLevel LogLevel, disable bool) *Metlo { + inst := &Metlo{ + metloHost: metloHost, + metloKey: metloKey, + disable: disable, + backendPort: backendPort, + collectorPort: collectorPort, + encryptionKey: encryptionKey, + logLevel: logLevel, } go inst.BootstrapInstance() return inst } -func (m *metlo) BootstrapInstance() { - agentStartErr := m.StartLocalAgent() - if agentStartErr != nil { - if m.logLevel <= Error { - logger.Println("Couldn't start metlo agent", agentStartErr) - } - m.disable = true - } else { - m.spawnedTask = true - } - conn, err := m.ConnectLocalProcessAgent() - if err != nil { - if m.logLevel <= Error { - logger.Println("Couldn't connect to metlo agent", err) - } - m.disable = true - } else { - m.processStream = conn - } -} +func (m *Metlo) BootstrapInstance() { -func (m *metlo) StartLocalAgent() error { - args := make([]string, 0) - args = append(args, "-m", m.metloHost, "-a", m.metloKey, "--enable-grpc", "true", "--log-level", MapLogLevelToString(m.logLevel)) - if m.backendPort != 0 { - args = append(args, "-b", strconv.Itoa(m.backendPort)) - } - if m.collectorPort != 0 { - args = append(args, "-c", strconv.Itoa(m.collectorPort)) - } + var metloHost = C.CString(m.metloHost) + defer C.free(unsafe.Pointer(metloHost)) + var metloKey = C.CString(m.metloKey) + defer C.free(unsafe.Pointer(metloKey)) + var metloBackendPort = C.ushort(m.backendPort) + var metloCollectorPort = C.ushort(m.collectorPort) + var metloLogLevel = C.CString(MapLogLevelToString(m.logLevel)) + defer C.free(unsafe.Pointer(metloLogLevel)) if m.encryptionKey != nil { - args = append(args, "-e", *m.encryptionKey) - } - cmd := exec.Command("metlo-agent", args...) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - err := cmd.Start() - if err != nil { - return err - } - if m.logLevel <= Debug { - logger.Println("Spawned Metlo agent") - } - go func() { - err := cmd.Wait() - logger.Println("Spawned task crashed") - m.spawnedTask = false - m.processStream = nil - if m.logLevel <= Error { - logger.Println("Metlo Agent Exited", err) - } - m.restartMetlo(true) - }() - return nil -} - -func (m *metlo) Send(data MetloTrace) { - if m.processStream == nil { - if m.logLevel <= Trace { - logger.Println("Metlo GRPC stream not setup") + var metloEncryptionKey = C.CString(*m.encryptionKey) + defer C.free(unsafe.Pointer(&metloEncryptionKey)) + _, err := C.Metlo_startup( + metloHost, + metloKey, + metloBackendPort, + metloCollectorPort, + metloLogLevel, + metloEncryptionKey, + ) + if err != nil && err != syscall.EINPROGRESS { + logger.Print(err) } - m.restartMetlo(!m.spawnedTask) - } - miTrace := MapMetloTraceToMetloIngestRPC(data) - if m.processStream != nil { - err := m.processStream.Send(&miTrace) - if err != nil { - if m.logLevel <= Error { - logger.Println("Encountered an error while sending message to GRPC for Process Trace", err) - } - m.restartMetlo(false) + } else { + _, err := C.Metlo_startup( + metloHost, + metloKey, + metloBackendPort, + metloCollectorPort, + metloLogLevel, + nil, + ) + if err != nil && err != syscall.EINPROGRESS { + logger.Print(err) } } } -func (m *metlo) Allow() bool { - return !m.disable -} - -func (m *metlo) restartMetlo(shouldSpawnTask bool) { - if m.reconnectMutex.TryLock() { - defer m.reconnectMutex.Unlock() - if shouldSpawnTask { - if m.restartCount < MaxRestartTries { - m.restartCount++ - m.restartMetloSubprocess() - } else { - m.disable = true - return - } - } - if m.connectionRetryCount < MaxConnectionRetries { - m.renewMetloConnection() - } else { - m.disable = true - } - } - return +func (m *Metlo) Send(data MetloTrace) { + mapped_data := MapMetloTraceToCStruct(data) + C.Metlo_ingest_trace(mapped_data) + FreeMetloTrace(mapped_data) } -func (m *metlo) renewMetloConnection() { - conn, err := m.ConnectLocalProcessAgent() - if err != nil { - if m.logLevel <= Error { - logger.Println("Couldn't connect to metlo agent when restarting") - } - } else { - m.processStream = conn - } +func (m *Metlo) Allow() bool { + return !m.disable } -func (m *metlo) restartMetloSubprocess() { - err := m.StartLocalAgent() - if err != nil { - if m.logLevel <= Error { - logger.Println("Couldn't spawn local Metlo Agent") - } +func (m *Metlo) Block(req TraceReq, meta TraceMeta) bool { + block_struct := C.Metlo_ExchangeStruct{ + Req: MapMetloRequestToCStruct(req), + Meta: MapMetloMetadataToCStruct(meta), + } + resp := C.Metlo_block_trace(block_struct) + FreeMetloRequest(block_struct.Req) + FreeMetloMetadata(block_struct.Meta) + if resp == 1 { + return true } else { - m.spawnedTask = true + return false } } diff --git a/ingestors/golang/metlo/structs.h b/ingestors/golang/metlo/structs.h new file mode 100644 index 00000000..174161f6 --- /dev/null +++ b/ingestors/golang/metlo/structs.h @@ -0,0 +1,79 @@ +typedef struct +{ + char *Name; + char *Value; +} Metlo_NVPair; + +typedef struct +{ + /* + * pub host: String, + * pub path: String, + * pub parameters: Vec, + **/ + char *Host; + char *Path; + Metlo_NVPair *Parameters; + unsigned int Parameters_size; +} Metlo_ApiUrl; + +typedef struct +{ + /** + * pub method: String, + * pub url: ApiUrl, + * pub headers: Vec, + * pub body: String, + * pub user: Option, + **/ + char *Method; + Metlo_ApiUrl Url; + Metlo_NVPair *Headers; + unsigned int Headers_size; + char *Body; +} Metlo_Request; + +typedef struct +{ + /** + * pub status: u16, + * pub headers: Vec, + * pub body: String, + **/ + unsigned short Status; + Metlo_NVPair *Headers; + unsigned int Headers_size; + char *Body; +} Metlo_Response; + +typedef struct +{ + /* + * pub environment: String, + * pub incoming: bool, + * pub source: String, + * pub source_port: u16, + * pub destination: String, + * pub destination_port: u16, + * pub original_source: Option, + **/ + char *Environment; // Always production + int Incoming; + char *Source; + unsigned short Source_port; + char *Destination; + unsigned short Destination_port; +} Metlo_Metadata; + +typedef struct +{ + Metlo_Request Req; + Metlo_Metadata Meta; +} Metlo_ExchangeStruct; + +typedef struct +{ + Metlo_Request Req; + Metlo_Response Res; + Metlo_Metadata Meta; +} Metlo_ApiTrace;