Skip to content

Commit

Permalink
Merge pull request #69 from UlfBj/master
Browse files Browse the repository at this point in the history
JSON schema validation on requests
  • Loading branch information
UlfBj authored Jan 7, 2025
2 parents 11617fa + 7371dca commit 96fb4f8
Show file tree
Hide file tree
Showing 10 changed files with 748 additions and 166 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ require (
github.com/apache/thrift v0.15.0 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/gomega v1.32.0 // indirect
github.com/qri-io/jsonpointer v0.1.1 // indirect
github.com/qri-io/jsonschema v0.2.1 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.16.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,17 @@ github.com/petervolvowinz/viss-rl-interfaces v0.1.0 h1:R9iU0C+5nVgcjqvbzx4spjKQy
github.com/petervolvowinz/viss-rl-interfaces v0.1.0/go.mod h1:CrEuOgBagZ/frCWYx3Zr5TIN4pyEXz8joWMI9ntHAz0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/qri-io/jsonpointer v0.1.1 h1:prVZBZLL6TW5vsSB9fFHFAMBLI4b0ri5vribQlTJiBA=
github.com/qri-io/jsonpointer v0.1.1/go.mod h1:DnJPaYgiKu56EuDp8TU5wFLdZIcAnb/uH9v37ZaMV64=
github.com/qri-io/jsonschema v0.2.1 h1:NNFoKms+kut6ABPf6xiKNM5214jzxAhDBrPHCJ97Wg0=
github.com/qri-io/jsonschema v0.2.1/go.mod h1:g7DPkiOsK1xv6T/Ao5scXRkd+yTFygcANPBaaqW+VrI=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down
11 changes: 11 additions & 0 deletions server/vissv2server/httpMgr/httpMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/covesa/vissr/utils"
)

var errorResponseMap = map[string]interface{}{}

// All HTTP app clients share same channel
var HttpClientChan = []chan string{
make(chan string),
Expand All @@ -24,6 +26,7 @@ func RemoveRoutingForwardResponse(response string, transportMgrChan chan string)

func HttpMgrInit(mgrId int, transportMgrChan chan string) {
utils.ReadTransportSecConfig()
utils.JsonSchemaInit()

go utils.HttpServer{}.InitClientServer(utils.MuxServer[0], HttpClientChan) // go routine needed due to listenAndServe call...
utils.Info.Println("HTTP manager data session initiated.")
Expand All @@ -33,6 +36,14 @@ func HttpMgrInit(mgrId int, transportMgrChan chan string) {
select {
case reqMessage := <-HttpClientChan[0]:
utils.Info.Printf("HTTP mgr hub: Request from client:%s\n", reqMessage)
validationError := utils.JsonSchemaValidate(reqMessage)
if len(validationError) > 0 {
var requestMap map[string]interface{}
utils.MapRequest(reqMessage, &requestMap)
utils.SetErrorResponse(requestMap, errorResponseMap, 0, validationError) //bad_request
HttpClientChan[0] <- utils.FinalizeMessage(errorResponseMap)
continue
}
utils.AddRoutingForwardRequest(reqMessage, mgrId, 0, transportMgrChan)
case respMessage := <-transportMgrChan:
utils.Info.Printf("HTTP mgr hub: Response from server core:%s\n", respMessage)
Expand Down
32 changes: 26 additions & 6 deletions server/vissv2server/mqttMgr/mqttMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

var mqttChannel chan string

var errorResponseMap = map[string]interface{}{}

type NodeValue struct {
topicId int
topic string
Expand All @@ -47,7 +49,7 @@ func vissV2Receiver(transportMgrChan chan string, vissv2Channel chan string) {
break
}*/
response := <-transportMgrChan
utils.Info.Printf("MQTT mgr: Response from server core:%s\n", string(response))
utils.Info.Printf("MQTT mgr: Response from server core:%s", string(response))
vissv2Channel <- string(response) // send message to hub
}
}
Expand Down Expand Up @@ -197,12 +199,17 @@ func extractVin(response string) string {
func decomposeMqttPayload(mqttPayload string) (string, string) { // {"topic":"X", "request":"{...}"}
var payloadMap = make(map[string]interface{})
utils.MapRequest(mqttPayload, &payloadMap)
topic, err := json.Marshal(payloadMap["topic"])
if err != nil {
utils.Error.Printf("decomposeMqttPayload: cannot marshal topic in: %s", mqttPayload)
return "", ""
}
payload, err := json.Marshal(payloadMap["request"])
if err != nil {
utils.Error.Printf("decomposeMqttPayload: cannot marshal request in response=%s", mqttPayload)
os.Exit(1)
utils.Error.Printf("decomposeMqttPayload: cannot marshal request in:%s", mqttPayload)
return string(topic), "corrupt request"
}
return payloadMap["topic"].(string), string(payload)
return string(topic), string(payload)
}

func AddRoutingInfoAndForward(reqMessage string, mgrId int, clientId int, transportMgrChan chan string) {
Expand All @@ -220,6 +227,8 @@ func MqttMgrInit(mgrId int, transportMgrChan chan string) {
topicId := 0
topicList.nodes = 0

utils.JsonSchemaInit()

go vissV2Receiver(transportMgrChan, vissv2Channel) //message reception from server core

utils.Info.Println("**** MQTT manager hub entering server loop... ****")
Expand All @@ -229,13 +238,24 @@ func MqttMgrInit(mgrId int, transportMgrChan chan string) {

case mqttPayload := <-mqttChannel:
topic, payload := decomposeMqttPayload(mqttPayload)
utils.Info.Printf("MQTT mgr hub: Message from broker:Topic=%s, Payload=%s\n", topic, payload)
if len(topic) == 0 {
utils.Error.Printf("MQTT: Message from broker is corrupt:%s\nNot possible to respond to client", mqttPayload)
continue
}
utils.Info.Printf("MQTT mgr hub: Message from broker:Topic=%s, Payload=%s", topic, payload)
validationError := utils.JsonSchemaValidate(payload)
if len(validationError) > 0 {
var requestMap map[string]interface{}
utils.MapRequest(payload, &requestMap)
utils.SetErrorResponse(requestMap, errorResponseMap, 0, validationError) //bad_request
publishMessage(brokerSocket, topic, utils.FinalizeMessage(errorResponseMap))
}
pushTopic(topic, topicId)
AddRoutingInfoAndForward(payload, mgrId, topicId, transportMgrChan)
topicId++

case vissv2Message := <-vissv2Channel:
utils.Info.Printf("MQTT hub: Message from VISSv2 server:%s\n", vissv2Message)
utils.Info.Printf("MQTT hub: Message from VISSv2 server:%s", vissv2Message)
// link routerId to topic, remove routerId from message, create mqtt message, send message to mqtt transport
payload, topicHandle := utils.RemoveInternalData(string(vissv2Message))
publishMessage(brokerSocket, getTopic(topicHandle), payload)
Expand Down
9 changes: 1 addition & 8 deletions server/vissv2server/serviceMgr/serviceMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,7 @@ var feederConnected bool
//var feederConn net.Conn
//var hostIp string

var errorResponseMap = map[string]interface{}{
"RouterId": "0?0",
"action": "unknown",
"requestId": "XX",
"error": `{"number":AA, "reason": "BB", "message": "CC"}`,
"ts": "yy",
}
var errorResponseMap = map[string]interface{}{}

var dbHandle *sql.DB
var dbErr error
Expand Down Expand Up @@ -1245,7 +1239,6 @@ func feederReader(udsConn net.Conn, fromFeeder chan string) {
}
}

//func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType string, histSupport bool, dbFile string) {
func ServiceMgrInit(mgrId int, serviceMgrChan chan map[string]interface{}, stateStorageType string, histSupport bool, dbFile string) {
stateDbType = stateStorageType
historySupport = histSupport
Expand Down
147 changes: 2 additions & 145 deletions server/vissv2server/vissv2server.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,11 @@ func transportDataSession(transportMgrChannel chan string, transportDataChannel

case msg := <-transportMgrChannel:
utils.Info.Printf("request: %s", msg)
var msgMap map[string]interface{}
utils.MapRequest(msg, &msgMap)
var msgMap map[string]interface{}
utils.MapRequest(msg, &msgMap)
transportDataChannel <- msgMap // send request to server hub
// transportDataChannel <- msg // send request to server hub
case message := <-backendChannel:
// utils.Info.Printf("Transport mgr server: message= %s", message)
transportMgrChannel <- utils.FinalizeMessage(message)
// transportMgrChannel <- message
}
}
}
Expand Down Expand Up @@ -406,144 +403,10 @@ func getTokenContext(reqMap map[string]interface{}) string {
return ""
}

func validRequest(request map[string]interface{}) bool {
switch request["action"].(string) {
case "get":
return isValidGetParams(request)
case "set":
return isValidSetParams(request)
case "subscribe":
return isValidSubscribeParams(request)
case "unsubscribe":
return isValidUnsubscribeParams(request)
case "internal-killsubscriptions":
return true
case "internal-cancelsubscription":
return true
}
return false
}

func isValidGetParams(request map[string]interface{}) bool {
if request["path"] == nil {
return false
}
if request["filter"] != nil {
return isValidGetFilter(request)
}
return true
}

func isValidGetFilter(request map[string]interface{}) bool { // paths, history, metadata supported
return true // needs to be fixed
// if strings.Contains(request, "paths") == true {
if request["paths"] != nil {
// if strings.Contains(request, "parameter") == true {
if request["parameter"] != nil {
return true
}
}
// if strings.Contains(request, "history") == true {
if request["history"] != nil {
// if strings.Contains(request, "parameter") == true {
if request["parameter"] != nil {
return true
}
}
// if strings.Contains(request, "metadata") == true {
if request["metadata"] != nil {
// if strings.Contains(request, "parameter") == true {
if request["parameter"] != nil {
return true
}
}
return false
}

func isValidSetParams(request map[string]interface{}) bool {
return request["path"] != nil && request["value"] != nil
}

func isValidSubscribeParams(request map[string]interface{}) bool {
if request["path"] == nil {
return false
}
if request["filter"] != nil {
return true
// return isValidSubscribeFilter(request)
}
return true
}

func isValidSubscribeFilter(request map[string]interface{}) bool { // paths, timebased, range, change, curvelog supported
return true // needs to be fixed
// if strings.Contains(request, "paths") == true {
if request["paths"] != nil {
// if strings.Contains(request, "parameter") == true {
if request["parameter"] != nil {
return true
}
}
// if strings.Contains(request, "timebased") == true {
if request["timebased"] != nil {
// if strings.Contains(request, "parameter") == true && strings.Contains(request, "period") == true {
if request["parameter"] != nil && request["period"] != nil {
return true
}
}
// if strings.Contains(request, "range") == true {
if request["range"] != nil {
// if strings.Contains(request, "parameter") == true && strings.Contains(request, "logic-op") == true &&
// strings.Contains(request, "boundary") == true {
if request["parameter"] != nil && request["logic-op"] != nil && request["boundary"] != nil {
return true
}
}
// if strings.Contains(request, "change") == true {
if request["change"] != nil {
// if strings.Contains(request, "parameter") == true && strings.Contains(request, "logic-op") == true &&
// strings.Contains(request, "diff") == true {
if request["parameter"] != nil && request["logic-op"] != nil && request["diff"] != nil {
return true
}
}
// if strings.Contains(request, "curvelog") == true {
if request["curvelog"] != nil {
// if strings.Contains(request, "parameter") == true && strings.Contains(request, "maxerr") == true &&
// strings.Contains(request, "bufsize") == true {
if request["parameter"] != nil && request["maxerr"] != nil && request["bufsize"] != nil {
return true
}
}
return false
}

func isValidUnsubscribeParams(request map[string]interface{}) bool {
return request["subscriptionId"] != nil
}

func serveRequest(requestMap map[string]interface{}, tDChanIndex int, sDChanIndex int) {
if requestMap["action"] == nil || validRequest(requestMap) == false {
utils.Error.Printf("serveRequest():invalid action params=%s", requestMap["action"])
utils.SetErrorResponse(requestMap, errorResponseMap, 1, "") //invalid_data
backendChan[tDChanIndex] <- errorResponseMap
return
}
if requestMap["path"] != nil && strings.Contains(requestMap["path"].(string), "*") == true {
utils.Error.Printf("serveRequest():path contained wildcard=%s", requestMap["path"])
utils.SetErrorResponse(requestMap, errorResponseMap, 1, "") //invalid_data
backendChan[tDChanIndex] <- errorResponseMap
return
}
if requestMap["path"] != nil {
requestMap["path"] = utils.UrlToPath(requestMap["path"].(string)) // replace slash with dot
}
if requestMap["action"] == "set" && requestMap["filter"] != nil {
utils.Error.Printf("serveRequest():Set request combined with filtering.")
utils.SetErrorResponse(requestMap, errorResponseMap, 0, "") //bad_request
backendChan[tDChanIndex] <- errorResponseMap
return
}
if requestMap["action"] == "unsubscribe" {
serviceDataChan[sDChanIndex] <- requestMap
return
Expand All @@ -556,12 +419,6 @@ func issueServiceRequest(requestMap map[string]interface{}, tDChanIndex int, sDC
serviceDataChan[sDChanIndex] <- requestMap // internal message
return
}
if requestMap["path"] == nil {
utils.Error.Printf("Unmarshal filter path array failed.")
utils.SetErrorResponse(requestMap, errorResponseMap, 0, "") //bad_request
backendChan[tDChanIndex] <- errorResponseMap
return
}
rootPath := requestMap["path"].(string)
VSSTreeRoot := utils.SetRootNodePointer(rootPath)
if VSSTreeRoot == nil {
Expand Down
Loading

0 comments on commit 96fb4f8

Please sign in to comment.