diff --git a/metrics/clients.go b/metrics/clients.go index 538db881..445943be 100644 --- a/metrics/clients.go +++ b/metrics/clients.go @@ -6,6 +6,7 @@ package metrics import ( + "bytes" "context" "encoding/json" "errors" @@ -31,7 +32,6 @@ func callgRPCServiceHeartbeat(grpcAddress string) ([]byte, error) { return nil, err } defer conn.Close() - // create the client instance client := pb.NewHeartbeatClient(conn) // connect to the server and call the required method @@ -79,7 +79,8 @@ func callHTTPServiceHeartbeat(serviceURL string) ([]byte, error) { // calls the corresponding the service to send the registration information func callRegisterService(daemonID string, serviceURL string) (status bool) { - req, err := http.NewRequest("POST", serviceURL, nil) + //Send the Daemon ID and the Network ID to register the Daemon + req, err := http.NewRequest("POST", serviceURL, bytes.NewBuffer(buildPayLoadForServiceRegistration())) req.Header.Set("Content-Type", "application/json") req.Header.Set("Access-Token", daemonID) if err != nil { @@ -93,7 +94,15 @@ func callRegisterService(daemonID string, serviceURL string) (status bool) { log.WithError(err).Info("unable to reach registration service : %v", err) return false } - // process the response - return checkForSuccessfulResponse(response) - //todo add access token related code in next iteration + // process the response and set the Authorization token + daemonAuthorizationToken, status = getTokenFromResponse(response) + log.Debugf("daemonAuthorizationToken %v", daemonAuthorizationToken) + return +} + +func buildPayLoadForServiceRegistration() []byte { + payload := &RegisterDaemonPayload{DaemonID: GetDaemonID()} + body, _ := ConvertStructToJSON(payload) + log.Debugf("buildPayLoadForServiceRegistration() %v", string(body)) + return body } diff --git a/metrics/heartbeat_test.go b/metrics/heartbeat_test.go index 31c072d7..a34082a4 100644 --- a/metrics/heartbeat_test.go +++ b/metrics/heartbeat_test.go @@ -47,7 +47,7 @@ func TestHeartbeatHandler(t *testing.T) { assert.Equal(t, dHeartbeat.Status, Online.String(), "Invalid State") assert.NotEqual(t, dHeartbeat.Status, Offline.String(), "Invalid State") - assert.Equal(t, dHeartbeat.DaemonID, "cc48d343313a1e06093c81830103b45496749e9ee632fd03207d042c277f3210", + assert.Equal(t, dHeartbeat.DaemonID, "10990b62daf504a0ae6094d548f25aed4928b1e991a9221a31693890c20d6916", "Incorrect daemon ID") assert.NotEqual(t, dHeartbeat.ServiceHeartbeat, `{}`, "Service Heartbeat must not be empty.") @@ -66,7 +66,7 @@ func Test_GetHeartbeat(t *testing.T) { assert.Equal(t, dHeartbeat.Status, Online.String(), "Invalid State") assert.NotEqual(t, dHeartbeat.Status, Offline.String(), "Invalid State") - assert.Equal(t, dHeartbeat.DaemonID, "cc48d343313a1e06093c81830103b45496749e9ee632fd03207d042c277f3210", + assert.Equal(t, dHeartbeat.DaemonID, "10990b62daf504a0ae6094d548f25aed4928b1e991a9221a31693890c20d6916", "Incorrect daemon ID") assert.NotEqual(t, dHeartbeat.ServiceHeartbeat, `{}`, "Service Heartbeat must not be empty.") diff --git a/metrics/register.go b/metrics/register.go index 871bead6..bb415e9d 100644 --- a/metrics/register.go +++ b/metrics/register.go @@ -20,7 +20,7 @@ Post beta, this ID will be used to enable Token based authentication for accessi // generates DaemonID nad returns i.e. DaemonID = HASH (Org Name, Service Name, daemon endpoint) func GetDaemonID() string { - rawID := config.GetString(config.OrganizationId) + config.GetString(config.ServiceId) + daemonGroupId + config.GetString(config.DaemonEndPoint) + rawID := config.GetString(config.OrganizationId) + config.GetString(config.ServiceId) + daemonGroupId + config.GetString(config.DaemonEndPoint) + config.GetString(config.RegistryAddressKey) //get hash of the string id combination hasher := sha256.New() hasher.Write([]byte(rawID)) @@ -28,8 +28,21 @@ func GetDaemonID() string { return hash } +type RegisterDaemonPayload struct { + NetworkID int `json:"netId"` + DaemonID string `json:"daemonId"` +} +type TokenGenerated struct { + Status string `json:"status"` + Data struct { + Token string `json:"token"` + } `json:"data"` +} + var daemonGroupId string +var daemonAuthorizationToken string + // setter method for daemonGroupID func SetDaemonGrpId(grpId string) { daemonGroupId = grpId diff --git a/metrics/register_test.go b/metrics/register_test.go index 04ff33f8..da32d291 100644 --- a/metrics/register_test.go +++ b/metrics/register_test.go @@ -16,7 +16,7 @@ func TestGetDaemonID(t *testing.T) { assert.NotNil(t, daemonID, "daemon ID must not be nil") assert.NotEmpty(t, daemonID, "daemon ID must not be empty") - assert.Equal(t, "cc48d343313a1e06093c81830103b45496749e9ee632fd03207d042c277f3210", daemonID) + assert.Equal(t, "10990b62daf504a0ae6094d548f25aed4928b1e991a9221a31693890c20d6916", daemonID) assert.NotEqual(t, "48d343313a1e06093c81830103b45496cc7c277f321049e9ee632fd03207d042", daemonID) } diff --git a/metrics/request_stats.go b/metrics/request_stats.go index 9857a788..ccec5e24 100644 --- a/metrics/request_stats.go +++ b/metrics/request_stats.go @@ -9,16 +9,17 @@ import ( //Request stats that will be captured type RequestStats struct { - RequestID string `json:"request_id"` - InputDataSize string `json:"input_data_size"` - ContentType string `json:"content-type"` - ServiceMethod string `json:"service_method"` - UserAgent string `json:"user-agent"` - RequestReceivedTime string `json:"request_arrival_time"` - OrganizationID string `json:"organization_id"` - ServiceID string `json:"service_id"` - GroupID string `json:"Group_id"` - DaemonEndPoint string `json:"Daemon_end_point"` + Type string `json:"type"` + RegistryAddressKey string `json:"registry_address_key"` + EthereumJsonRpcEndpointKey string `json:"ethereum_json_rpc_endpoint"` + RequestID string `json:"request_id"` + InputDataSize string `json:"input_data_size"` + ServiceMethod string `json:"service_method"` + RequestReceivedTime string `json:"request_received_time"` + OrganizationID string `json:"organization_id"` + ServiceID string `json:"service_id"` + GroupID string `json:"group_id"` + DaemonEndPoint string `json:"daemon_end_point"` } //Create a request Object and Publish this to a service end point @@ -31,19 +32,22 @@ func PublishRequestStats(commonStat *CommonStats, inStream grpc.ServerStream) bo } func (request *RequestStats) setDataFromContext(md metadata.MD) { - request.UserAgent = GetValue(md, "user-agent") - request.ContentType = GetValue(md, "content-type") request.InputDataSize = strconv.FormatUint(GetSize(md), 10) + } func createRequestStat(commonStat *CommonStats) *RequestStats { request := &RequestStats{ - RequestID: commonStat.ID, - GroupID: commonStat.GroupID, - DaemonEndPoint: commonStat.DaemonEndPoint, - OrganizationID: commonStat.OrganizationID, - ServiceID: commonStat.ServiceID, - RequestReceivedTime: commonStat.RequestReceivedTime, + Type: "request", + RegistryAddressKey: config.GetString(config.RegistryAddressKey), + EthereumJsonRpcEndpointKey: config.GetString(config.EthereumJsonRpcEndpointKey), + RequestID: commonStat.ID, + GroupID: commonStat.GroupID, + DaemonEndPoint: commonStat.DaemonEndPoint, + OrganizationID: commonStat.OrganizationID, + ServiceID: commonStat.ServiceID, + RequestReceivedTime: commonStat.RequestReceivedTime, + ServiceMethod: commonStat.ServiceMethod, } return request } diff --git a/metrics/request_stats_test.go b/metrics/request_stats_test.go index 50d600d3..bf8c85a2 100644 --- a/metrics/request_stats_test.go +++ b/metrics/request_stats_test.go @@ -12,8 +12,6 @@ func TestSetDataFromContext(t *testing.T) { md := metadata.Pairs("user-agent", "Test user agent", "time", "2018-09-93", "content-type", "application/") request := &RequestStats{} request.setDataFromContext(md) - assert.Equal(t, request.UserAgent, "Test user agent") - assert.Equal(t, request.ContentType, "application/") } func TestCreateRequestStat(t *testing.T) { @@ -26,4 +24,5 @@ func TestCreateRequestStat(t *testing.T) { assert.Equal(t, request.OrganizationID, config.GetString(config.OrganizationId)) assert.Equal(t, request.ServiceID, config.GetString(config.ServiceId)) assert.Equal(t, request.RequestReceivedTime, arrivalTime.String()) + assert.Equal(t, request.Type, "request") } diff --git a/metrics/response_stats.go b/metrics/response_stats.go index 9ff5d6ab..6c7cbd9c 100644 --- a/metrics/response_stats.go +++ b/metrics/response_stats.go @@ -34,17 +34,20 @@ func BuildCommonStats(receivedTime time.Time, methodName string) *CommonStats { //Response stats that will be captured and published type ResponseStats struct { - RequestID string `json:"request_id"` - OrganizationID string `json:"organization_id"` - ServiceID string `json:"service_id"` - GroupID string `json:"group_id"` - DaemonEndPoint string `json:"daemon_end_point"` - ServiceMethod string `json:"service_method"` - ResponseSentTime string `json:"response_sent_time"` - RequestReceivedTime string `json:"request_received_time"` - ResponseTime string `json:"response_time"` - ResponseCode string `json:"response_code"` - ErrorMessage string `json:"error_message"` + Type string `json:"type"` + RegistryAddressKey string `json:"registry_address_key"` + EthereumJsonRpcEndpointKey string `json:"ethereum_json_rpc_endpoint"` + RequestID string `json:"request_id"` + OrganizationID string `json:"organization_id"` + ServiceID string `json:"service_id"` + GroupID string `json:"group_id"` + DaemonEndPoint string `json:"daemon_end_point"` + ServiceMethod string `json:"service_method"` + ResponseSentTime string `json:"response_sent_time"` + RequestReceivedTime string `json:"request_received_time"` + ResponseTime string `json:"response_time"` + ResponseCode string `json:"response_code"` + ErrorMessage string `json:"error_message"` } //Publish response received as a payload for reporting /metrics analysis @@ -56,17 +59,20 @@ func PublishResponseStats(commonStats *CommonStats, duration time.Duration, err func createResponseStats(commonStat *CommonStats, duration time.Duration, err error) *ResponseStats { response := &ResponseStats{ - RequestID: commonStat.ID, - ResponseTime: strconv.FormatFloat(duration.Seconds(), 'f', 4, 64), - GroupID: daemonGroupId, - DaemonEndPoint: commonStat.DaemonEndPoint, - OrganizationID: commonStat.OrganizationID, - ServiceID: commonStat.ServiceID, - ServiceMethod: commonStat.ServiceMethod, - RequestReceivedTime: commonStat.RequestReceivedTime, - ResponseSentTime: time.Now().String(), - ErrorMessage: getErrorMessage(err), - ResponseCode: getErrorCode(err), + Type: "response", + RegistryAddressKey: config.GetString(config.RegistryAddressKey), + EthereumJsonRpcEndpointKey: config.GetString(config.EthereumJsonRpcEndpointKey), + RequestID: commonStat.ID, + ResponseTime: strconv.FormatFloat(duration.Seconds(), 'f', 4, 64), + GroupID: daemonGroupId, + DaemonEndPoint: commonStat.DaemonEndPoint, + OrganizationID: commonStat.OrganizationID, + ServiceID: commonStat.ServiceID, + ServiceMethod: commonStat.ServiceMethod, + RequestReceivedTime: commonStat.RequestReceivedTime, + ResponseSentTime: time.Now().String(), + ErrorMessage: getErrorMessage(err), + ResponseCode: getErrorCode(err), } return response } diff --git a/metrics/response_stats_test.go b/metrics/response_stats_test.go index 1f417231..ae3fb6c9 100644 --- a/metrics/response_stats_test.go +++ b/metrics/response_stats_test.go @@ -15,6 +15,7 @@ func TestCreateResponseStats(t *testing.T) { assert.Equal(t, response.RequestID, commonStat.ID) assert.Equal(t, response.GroupID, daemonGroupId) assert.Equal(t, response.ResponseTime, "1.2346") + assert.Equal(t, response.Type, "response") assert2.NotEqual(t, response.ResponseSentTime, "") } diff --git a/metrics/utils.go b/metrics/utils.go index 75d9316a..30eb93ae 100644 --- a/metrics/utils.go +++ b/metrics/utils.go @@ -6,8 +6,10 @@ import ( "errors" "github.com/OneOfOne/go-utils/memory" "github.com/rs/xid" + "github.com/singnet/snet-daemon/config" log "github.com/sirupsen/logrus" "google.golang.org/grpc/metadata" + "io/ioutil" "net/http" "time" ) @@ -48,49 +50,83 @@ func Publish(payload interface{}, serviceUrl string) bool { if err != nil { return false } - status := publishJson(jsonBytes, serviceUrl) + status := publishJson(jsonBytes, serviceUrl, true) if !status { log.WithField("payload", string(jsonBytes)).WithField("url", serviceUrl).Warning("Unable to publish metrics") } return status } -// Publish the json on the service end point -func publishJson(json []byte, serviceURL string) bool { - //prepare the request payload +// Publish the json on the service end point, retry will be set to false when trying to re publish the payload +func publishJson(json []byte, serviceURL string, reTry bool) bool { + response, err := sendRequest(json, serviceURL) + if err != nil { + log.WithError(err) + } else { + status, reRegister := checkForSuccessfulResponse(response) + if reRegister && reTry { + //if Daemon was registered successfully , retry to publish the payload + status = publishJson(json, serviceURL, false) + } + return status + } + return false +} + +//Set all the headers before publishing +func sendRequest(json []byte, serviceURL string) (*http.Response, error) { req, err := http.NewRequest("POST", serviceURL, bytes.NewBuffer(json)) if err != nil { log.WithField("serviceURL", serviceURL).WithError(err).Warningf("Unable to create service request to publish stats") - return false + return nil, err } - req.Header.Set("Content-Type", "application/json") - req.Header.Set("Access-Token", GetDaemonID()) // sending the post request client := &http.Client{} - response, err := client.Do(req) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Daemonid", GetDaemonID()) + req.Header.Set("X-Token", daemonAuthorizationToken) + return client.Do(req) - if err != nil { - log.WithError(err).Warningf("r") - } else { - return checkForSuccessfulResponse(response) - } - log.WithField("json", json).WithField("url", serviceURL).Warningf("Unable to publish the json to the service ") - return false } //Check if the response received was proper -func checkForSuccessfulResponse(response *http.Response) bool { +func checkForSuccessfulResponse(response *http.Response) (status bool, retry bool) { if response == nil { log.Warningf("Empty response received.") - return false + return false, false } if response.StatusCode != http.StatusOK { log.Warningf("Service call failed with status code : %d ", response.StatusCode) - return false + //if response returned was forbidden error , then re register Daemon with fresh token and submit the request / response + //again ONLY if the Daemon was registered successfully + status = RegisterDaemon(config.GetString(config.MonitoringServiceEndpoint) + "/register") + return false, status } //close the body log.Debugf("Metrics posted successfully with status code : %d ", response.StatusCode) defer response.Body.Close() - return true + return true, false +} + +//Check if the response received was proper +func getTokenFromResponse(response *http.Response) (string, bool) { + if response == nil { + log.Warningf("Empty response received.") + return "", false + } + if response.StatusCode != http.StatusOK { + log.Warningf("Service call failed with status code : %d ", response.StatusCode) + return "", false + } + body, err := ioutil.ReadAll(response.Body) + if err != nil { + log.Infof("Unable to retrieve Token from Body , : %f ", err.Error()) + return "", false + } + var data TokenGenerated + json.Unmarshal(body, &data) + //close the body + defer response.Body.Close() + return data.Data.Token, true } //Generic utility to determine the size of the srtuct passed diff --git a/metrics/utils_test.go b/metrics/utils_test.go index 6b0c42bb..41f89408 100644 --- a/metrics/utils_test.go +++ b/metrics/utils_test.go @@ -40,11 +40,10 @@ func TestPublish(t *testing.T) { } func TestCheckSuccessfulResponse(t *testing.T) { - status := checkForSuccessfulResponse(nil) + status, _ := checkForSuccessfulResponse(nil) assert.Equal(t, status, false) - status = checkForSuccessfulResponse(&http.Response{StatusCode: http.StatusForbidden}) + status, _ = checkForSuccessfulResponse(&http.Response{StatusCode: http.StatusForbidden}) assert.Equal(t, status, false) - } func TestGetSize(t *testing.T) { diff --git a/snetd/cmd/components.go b/snetd/cmd/components.go index acb64a1b..068d0bc8 100644 --- a/snetd/cmd/components.go +++ b/snetd/cmd/components.go @@ -2,6 +2,7 @@ package cmd import ( "github.com/grpc-ecosystem/go-grpc-middleware" + "github.com/singnet/snet-daemon/metrics" "os" log "github.com/sirupsen/logrus" @@ -199,8 +200,14 @@ func (components *Components) GrpcInterceptor() grpc.StreamServerInterceptor { if components.grpcInterceptor != nil { return components.grpcInterceptor } - if config.GetBool(config.MonitoringEnabled) && config.IsValidUrl(config.MonitoringServiceEndpoint) { - //If monitoring is enabled and the endpoint URL is valid , add this interceptor to the chain of interceptors + //If monitoring is enabled and the endpoint URL is valid and if the + // Daemon has successfully registered itself and has obtained a valid token to publish metrics + // , ONLY then add this interceptor to the chain of interceptors + metrics.SetDaemonGrpId(components.ServiceMetaData().GetDaemonGroupIDString()) + if config.GetBool(config.MonitoringEnabled) && + config.IsValidUrl(config.GetString(config.MonitoringServiceEndpoint)) && + metrics.RegisterDaemon(config.GetString(config.MonitoringServiceEndpoint)+"/register") { + components.grpcInterceptor = grpc_middleware.ChainStreamServer( handler.GrpcMonitoringInterceptor(), handler.GrpcRateLimitInterceptor(), components.GrpcPaymentValidationInterceptor()) @@ -230,12 +237,15 @@ func (components *Components) PaymentChannelStateService() (service *escrow.Paym return components.paymentChannelStateService } + //NewProviderControlService func (components *Components) ProviderControlService() (service *escrow.ProviderControlService) { if components.providerControlService != nil { return components.providerControlService } + components.providerControlService = escrow.NewProviderControlService(components.PaymentChannelService(),components.ServiceMetaData()) return components.providerControlService -} \ No newline at end of file +} + diff --git a/snetd/cmd/serve.go b/snetd/cmd/serve.go index d6b536a5..acf67ee7 100644 --- a/snetd/cmd/serve.go +++ b/snetd/cmd/serve.go @@ -245,7 +245,7 @@ func (d daemon) start() { go http.Serve(d.lis, handlers.CORS(corsOptions...)(httphandler.NewHTTPHandler(d.blockProc))) } - metrics.SetDaemonGrpId(d.components.ServiceMetaData().GetDaemonGroupIDString()) + } func (d daemon) stop() {