Skip to content

Commit

Permalink
Merge pull request #196 from Dinesh2521/token
Browse files Browse the repository at this point in the history
Daemon Monitoring - Token Registration and Validation
  • Loading branch information
raamb authored Jan 22, 2019
2 parents b997926 + 78b690e commit f480a48
Show file tree
Hide file tree
Showing 12 changed files with 154 additions and 77 deletions.
19 changes: 14 additions & 5 deletions metrics/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package metrics

import (
"bytes"
"context"
"encoding/json"
"errors"
Expand All @@ -31,7 +32,6 @@ func callgRPCServiceHeartbeat(grpcAddress string) ([]byte, error) {
return nil, err
}
defer conn.Close()

// create the client instance
client := pb.NewHeartbeatClient(conn)
// connect to the server and call the required method
Expand Down Expand Up @@ -79,7 +79,8 @@ func callHTTPServiceHeartbeat(serviceURL string) ([]byte, error) {

// calls the corresponding the service to send the registration information
func callRegisterService(daemonID string, serviceURL string) (status bool) {
req, err := http.NewRequest("POST", serviceURL, nil)
//Send the Daemon ID and the Network ID to register the Daemon
req, err := http.NewRequest("POST", serviceURL, bytes.NewBuffer(buildPayLoadForServiceRegistration()))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Access-Token", daemonID)
if err != nil {
Expand All @@ -93,7 +94,15 @@ func callRegisterService(daemonID string, serviceURL string) (status bool) {
log.WithError(err).Info("unable to reach registration service : %v", err)
return false
}
// process the response
return checkForSuccessfulResponse(response)
//todo add access token related code in next iteration
// process the response and set the Authorization token
daemonAuthorizationToken, status = getTokenFromResponse(response)
log.Debugf("daemonAuthorizationToken %v", daemonAuthorizationToken)
return
}

func buildPayLoadForServiceRegistration() []byte {
payload := &RegisterDaemonPayload{DaemonID: GetDaemonID()}
body, _ := ConvertStructToJSON(payload)
log.Debugf("buildPayLoadForServiceRegistration() %v", string(body))
return body
}
4 changes: 2 additions & 2 deletions metrics/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestHeartbeatHandler(t *testing.T) {
assert.Equal(t, dHeartbeat.Status, Online.String(), "Invalid State")
assert.NotEqual(t, dHeartbeat.Status, Offline.String(), "Invalid State")

assert.Equal(t, dHeartbeat.DaemonID, "cc48d343313a1e06093c81830103b45496749e9ee632fd03207d042c277f3210",
assert.Equal(t, dHeartbeat.DaemonID, "10990b62daf504a0ae6094d548f25aed4928b1e991a9221a31693890c20d6916",
"Incorrect daemon ID")

assert.NotEqual(t, dHeartbeat.ServiceHeartbeat, `{}`, "Service Heartbeat must not be empty.")
Expand All @@ -66,7 +66,7 @@ func Test_GetHeartbeat(t *testing.T) {
assert.Equal(t, dHeartbeat.Status, Online.String(), "Invalid State")
assert.NotEqual(t, dHeartbeat.Status, Offline.String(), "Invalid State")

assert.Equal(t, dHeartbeat.DaemonID, "cc48d343313a1e06093c81830103b45496749e9ee632fd03207d042c277f3210",
assert.Equal(t, dHeartbeat.DaemonID, "10990b62daf504a0ae6094d548f25aed4928b1e991a9221a31693890c20d6916",
"Incorrect daemon ID")

assert.NotEqual(t, dHeartbeat.ServiceHeartbeat, `{}`, "Service Heartbeat must not be empty.")
Expand Down
15 changes: 14 additions & 1 deletion metrics/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,29 @@ Post beta, this ID will be used to enable Token based authentication for accessi

// generates DaemonID nad returns i.e. DaemonID = HASH (Org Name, Service Name, daemon endpoint)
func GetDaemonID() string {
rawID := config.GetString(config.OrganizationId) + config.GetString(config.ServiceId) + daemonGroupId + config.GetString(config.DaemonEndPoint)
rawID := config.GetString(config.OrganizationId) + config.GetString(config.ServiceId) + daemonGroupId + config.GetString(config.DaemonEndPoint) + config.GetString(config.RegistryAddressKey)
//get hash of the string id combination
hasher := sha256.New()
hasher.Write([]byte(rawID))
hash := hex.EncodeToString(hasher.Sum(nil))
return hash
}

type RegisterDaemonPayload struct {
NetworkID int `json:"netId"`
DaemonID string `json:"daemonId"`
}
type TokenGenerated struct {
Status string `json:"status"`
Data struct {
Token string `json:"token"`
} `json:"data"`
}

var daemonGroupId string

var daemonAuthorizationToken string

// setter method for daemonGroupID
func SetDaemonGrpId(grpId string) {
daemonGroupId = grpId
Expand Down
2 changes: 1 addition & 1 deletion metrics/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestGetDaemonID(t *testing.T) {

assert.NotNil(t, daemonID, "daemon ID must not be nil")
assert.NotEmpty(t, daemonID, "daemon ID must not be empty")
assert.Equal(t, "cc48d343313a1e06093c81830103b45496749e9ee632fd03207d042c277f3210", daemonID)
assert.Equal(t, "10990b62daf504a0ae6094d548f25aed4928b1e991a9221a31693890c20d6916", daemonID)
assert.NotEqual(t, "48d343313a1e06093c81830103b45496cc7c277f321049e9ee632fd03207d042", daemonID)
}

Expand Down
40 changes: 22 additions & 18 deletions metrics/request_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@ import (

//Request stats that will be captured
type RequestStats struct {
RequestID string `json:"request_id"`
InputDataSize string `json:"input_data_size"`
ContentType string `json:"content-type"`
ServiceMethod string `json:"service_method"`
UserAgent string `json:"user-agent"`
RequestReceivedTime string `json:"request_arrival_time"`
OrganizationID string `json:"organization_id"`
ServiceID string `json:"service_id"`
GroupID string `json:"Group_id"`
DaemonEndPoint string `json:"Daemon_end_point"`
Type string `json:"type"`
RegistryAddressKey string `json:"registry_address_key"`
EthereumJsonRpcEndpointKey string `json:"ethereum_json_rpc_endpoint"`
RequestID string `json:"request_id"`
InputDataSize string `json:"input_data_size"`
ServiceMethod string `json:"service_method"`
RequestReceivedTime string `json:"request_received_time"`
OrganizationID string `json:"organization_id"`
ServiceID string `json:"service_id"`
GroupID string `json:"group_id"`
DaemonEndPoint string `json:"daemon_end_point"`
}

//Create a request Object and Publish this to a service end point
Expand All @@ -31,19 +32,22 @@ func PublishRequestStats(commonStat *CommonStats, inStream grpc.ServerStream) bo
}

func (request *RequestStats) setDataFromContext(md metadata.MD) {
request.UserAgent = GetValue(md, "user-agent")
request.ContentType = GetValue(md, "content-type")
request.InputDataSize = strconv.FormatUint(GetSize(md), 10)

}

func createRequestStat(commonStat *CommonStats) *RequestStats {
request := &RequestStats{
RequestID: commonStat.ID,
GroupID: commonStat.GroupID,
DaemonEndPoint: commonStat.DaemonEndPoint,
OrganizationID: commonStat.OrganizationID,
ServiceID: commonStat.ServiceID,
RequestReceivedTime: commonStat.RequestReceivedTime,
Type: "request",
RegistryAddressKey: config.GetString(config.RegistryAddressKey),
EthereumJsonRpcEndpointKey: config.GetString(config.EthereumJsonRpcEndpointKey),
RequestID: commonStat.ID,
GroupID: commonStat.GroupID,
DaemonEndPoint: commonStat.DaemonEndPoint,
OrganizationID: commonStat.OrganizationID,
ServiceID: commonStat.ServiceID,
RequestReceivedTime: commonStat.RequestReceivedTime,
ServiceMethod: commonStat.ServiceMethod,
}
return request
}
3 changes: 1 addition & 2 deletions metrics/request_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ func TestSetDataFromContext(t *testing.T) {
md := metadata.Pairs("user-agent", "Test user agent", "time", "2018-09-93", "content-type", "application/")
request := &RequestStats{}
request.setDataFromContext(md)
assert.Equal(t, request.UserAgent, "Test user agent")
assert.Equal(t, request.ContentType, "application/")
}

func TestCreateRequestStat(t *testing.T) {
Expand All @@ -26,4 +24,5 @@ func TestCreateRequestStat(t *testing.T) {
assert.Equal(t, request.OrganizationID, config.GetString(config.OrganizationId))
assert.Equal(t, request.ServiceID, config.GetString(config.ServiceId))
assert.Equal(t, request.RequestReceivedTime, arrivalTime.String())
assert.Equal(t, request.Type, "request")
}
50 changes: 28 additions & 22 deletions metrics/response_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,20 @@ func BuildCommonStats(receivedTime time.Time, methodName string) *CommonStats {

//Response stats that will be captured and published
type ResponseStats struct {
RequestID string `json:"request_id"`
OrganizationID string `json:"organization_id"`
ServiceID string `json:"service_id"`
GroupID string `json:"group_id"`
DaemonEndPoint string `json:"daemon_end_point"`
ServiceMethod string `json:"service_method"`
ResponseSentTime string `json:"response_sent_time"`
RequestReceivedTime string `json:"request_received_time"`
ResponseTime string `json:"response_time"`
ResponseCode string `json:"response_code"`
ErrorMessage string `json:"error_message"`
Type string `json:"type"`
RegistryAddressKey string `json:"registry_address_key"`
EthereumJsonRpcEndpointKey string `json:"ethereum_json_rpc_endpoint"`
RequestID string `json:"request_id"`
OrganizationID string `json:"organization_id"`
ServiceID string `json:"service_id"`
GroupID string `json:"group_id"`
DaemonEndPoint string `json:"daemon_end_point"`
ServiceMethod string `json:"service_method"`
ResponseSentTime string `json:"response_sent_time"`
RequestReceivedTime string `json:"request_received_time"`
ResponseTime string `json:"response_time"`
ResponseCode string `json:"response_code"`
ErrorMessage string `json:"error_message"`
}

//Publish response received as a payload for reporting /metrics analysis
Expand All @@ -56,17 +59,20 @@ func PublishResponseStats(commonStats *CommonStats, duration time.Duration, err

func createResponseStats(commonStat *CommonStats, duration time.Duration, err error) *ResponseStats {
response := &ResponseStats{
RequestID: commonStat.ID,
ResponseTime: strconv.FormatFloat(duration.Seconds(), 'f', 4, 64),
GroupID: daemonGroupId,
DaemonEndPoint: commonStat.DaemonEndPoint,
OrganizationID: commonStat.OrganizationID,
ServiceID: commonStat.ServiceID,
ServiceMethod: commonStat.ServiceMethod,
RequestReceivedTime: commonStat.RequestReceivedTime,
ResponseSentTime: time.Now().String(),
ErrorMessage: getErrorMessage(err),
ResponseCode: getErrorCode(err),
Type: "response",
RegistryAddressKey: config.GetString(config.RegistryAddressKey),
EthereumJsonRpcEndpointKey: config.GetString(config.EthereumJsonRpcEndpointKey),
RequestID: commonStat.ID,
ResponseTime: strconv.FormatFloat(duration.Seconds(), 'f', 4, 64),
GroupID: daemonGroupId,
DaemonEndPoint: commonStat.DaemonEndPoint,
OrganizationID: commonStat.OrganizationID,
ServiceID: commonStat.ServiceID,
ServiceMethod: commonStat.ServiceMethod,
RequestReceivedTime: commonStat.RequestReceivedTime,
ResponseSentTime: time.Now().String(),
ErrorMessage: getErrorMessage(err),
ResponseCode: getErrorCode(err),
}
return response
}
Expand Down
1 change: 1 addition & 0 deletions metrics/response_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func TestCreateResponseStats(t *testing.T) {
assert.Equal(t, response.RequestID, commonStat.ID)
assert.Equal(t, response.GroupID, daemonGroupId)
assert.Equal(t, response.ResponseTime, "1.2346")
assert.Equal(t, response.Type, "response")
assert2.NotEqual(t, response.ResponseSentTime, "")
}

Expand Down
74 changes: 55 additions & 19 deletions metrics/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"errors"
"github.com/OneOfOne/go-utils/memory"
"github.com/rs/xid"
"github.com/singnet/snet-daemon/config"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/metadata"
"io/ioutil"
"net/http"
"time"
)
Expand Down Expand Up @@ -48,49 +50,83 @@ func Publish(payload interface{}, serviceUrl string) bool {
if err != nil {
return false
}
status := publishJson(jsonBytes, serviceUrl)
status := publishJson(jsonBytes, serviceUrl, true)
if !status {
log.WithField("payload", string(jsonBytes)).WithField("url", serviceUrl).Warning("Unable to publish metrics")
}
return status
}

// Publish the json on the service end point
func publishJson(json []byte, serviceURL string) bool {
//prepare the request payload
// Publish the json on the service end point, retry will be set to false when trying to re publish the payload
func publishJson(json []byte, serviceURL string, reTry bool) bool {
response, err := sendRequest(json, serviceURL)
if err != nil {
log.WithError(err)
} else {
status, reRegister := checkForSuccessfulResponse(response)
if reRegister && reTry {
//if Daemon was registered successfully , retry to publish the payload
status = publishJson(json, serviceURL, false)
}
return status
}
return false
}

//Set all the headers before publishing
func sendRequest(json []byte, serviceURL string) (*http.Response, error) {
req, err := http.NewRequest("POST", serviceURL, bytes.NewBuffer(json))
if err != nil {
log.WithField("serviceURL", serviceURL).WithError(err).Warningf("Unable to create service request to publish stats")
return false
return nil, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Access-Token", GetDaemonID())
// sending the post request
client := &http.Client{}
response, err := client.Do(req)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Daemonid", GetDaemonID())
req.Header.Set("X-Token", daemonAuthorizationToken)
return client.Do(req)

if err != nil {
log.WithError(err).Warningf("r")
} else {
return checkForSuccessfulResponse(response)
}
log.WithField("json", json).WithField("url", serviceURL).Warningf("Unable to publish the json to the service ")
return false
}

//Check if the response received was proper
func checkForSuccessfulResponse(response *http.Response) bool {
func checkForSuccessfulResponse(response *http.Response) (status bool, retry bool) {
if response == nil {
log.Warningf("Empty response received.")
return false
return false, false
}
if response.StatusCode != http.StatusOK {
log.Warningf("Service call failed with status code : %d ", response.StatusCode)
return false
//if response returned was forbidden error , then re register Daemon with fresh token and submit the request / response
//again ONLY if the Daemon was registered successfully
status = RegisterDaemon(config.GetString(config.MonitoringServiceEndpoint) + "/register")
return false, status
} //close the body
log.Debugf("Metrics posted successfully with status code : %d ", response.StatusCode)
defer response.Body.Close()
return true
return true, false
}

//Check if the response received was proper
func getTokenFromResponse(response *http.Response) (string, bool) {
if response == nil {
log.Warningf("Empty response received.")
return "", false
}
if response.StatusCode != http.StatusOK {
log.Warningf("Service call failed with status code : %d ", response.StatusCode)
return "", false
}
body, err := ioutil.ReadAll(response.Body)
if err != nil {
log.Infof("Unable to retrieve Token from Body , : %f ", err.Error())
return "", false
}
var data TokenGenerated
json.Unmarshal(body, &data)
//close the body
defer response.Body.Close()
return data.Data.Token, true
}

//Generic utility to determine the size of the srtuct passed
Expand Down
5 changes: 2 additions & 3 deletions metrics/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,10 @@ func TestPublish(t *testing.T) {
}

func TestCheckSuccessfulResponse(t *testing.T) {
status := checkForSuccessfulResponse(nil)
status, _ := checkForSuccessfulResponse(nil)
assert.Equal(t, status, false)
status = checkForSuccessfulResponse(&http.Response{StatusCode: http.StatusForbidden})
status, _ = checkForSuccessfulResponse(&http.Response{StatusCode: http.StatusForbidden})
assert.Equal(t, status, false)

}

func TestGetSize(t *testing.T) {
Expand Down
Loading

0 comments on commit f480a48

Please sign in to comment.