diff --git a/Makefile b/Makefile index d4c1a8c..d7eda53 100644 --- a/Makefile +++ b/Makefile @@ -14,7 +14,7 @@ LINTER_BIN ?= golangci-lint DOCKER_BIN ?= docker -TARGETS := knavigator +TARGETS := knavigator klient CMD_DIR := ./cmd OUTPUT_DIR := ./bin diff --git a/cmd/klient/main.go b/cmd/klient/main.go new file mode 100644 index 0000000..4e4c1cc --- /dev/null +++ b/cmd/klient/main.go @@ -0,0 +1,135 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "bytes" + "encoding/json" + "flag" + "fmt" + "io" + "net/http" + "net/url" + "os" + + "k8s.io/klog/v2" + + "github.com/NVIDIA/knavigator/pkg/config" +) + +func mainInternal() error { + var addr, taskCfg string + flag.StringVar(&addr, "addr", "", "server address") + flag.StringVar(&taskCfg, "tasks", "", "comma-separated list of task config files and dirs") + flag.Parse() + + taskconfigs, err := config.NewFromPaths(taskCfg) + if err != nil { + return err + } + if len(taskconfigs) == 0 { + return fmt.Errorf("missing 'tasks' argument") + } + + taskURL, err := url.JoinPath(addr, "task") + if err != nil { + return err + } + resetURL, err := url.JoinPath(addr, "reset") + if err != nil { + return err + } + + for _, taskconfig := range taskconfigs { + fmt.Printf("Starting test %s\n", taskconfig.Name) + if err := execTaskConfig(taskURL, resetURL, taskconfig); err != nil { + return err + } + } + + return nil +} + +func execTaskConfig(taskURL, resetURL string, taskconfig *config.TaskConfig) error { + var err error + for _, task := range taskconfig.Tasks { + if err = execTask(taskURL, task); err != nil { + break + } + } + + if errReset := execReset(resetURL); errReset != nil { + if err == nil { + err = errReset + } + } + + return err +} + +func execTask(urlPath string, task *config.Task) error { + jsonData, err := json.Marshal(task) + if err != nil { + return fmt.Errorf("failed to marshal to JSON: %v", err) + } + + resp, err := http.Post(urlPath, "application/json", bytes.NewBuffer(jsonData)) // #nosec G107 + if err != nil { + return fmt.Errorf("failed to create request: %v", err) + } + + defer resp.Body.Close() //nolint:errcheck + body, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to read response: %v", err) + } + + if resp.StatusCode != http.StatusOK { + fmt.Printf("Error response: %s: %s\n", resp.Status, body) + return fmt.Errorf("%s", resp.Status) + } + + return nil +} + +func execReset(urlPath string) error { + resp, err := http.Get(urlPath) // #nosec G107 + if err != nil { + return fmt.Errorf("failed to create request: %v", err) + } + + defer func() { _ = resp.Body.Close() }() + body, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to read response: %v", err) + } + + if resp.StatusCode != http.StatusOK { + fmt.Printf("Error response: %s: %s\n", resp.Status, body) + return fmt.Errorf("%s", resp.Status) + } + + return nil +} + +func main() { + defer klog.Flush() + if err := mainInternal(); err != nil { + klog.Error(err.Error()) + os.Exit(1) + } +} diff --git a/cmd/knavigator/main.go b/cmd/knavigator/main.go index 7ebaf09..91b5bcf 100644 --- a/cmd/knavigator/main.go +++ b/cmd/knavigator/main.go @@ -27,59 +27,61 @@ import ( "github.com/NVIDIA/knavigator/pkg/config" "github.com/NVIDIA/knavigator/pkg/engine" + "github.com/NVIDIA/knavigator/pkg/server" "github.com/NVIDIA/knavigator/pkg/utils" ) +type Args struct { + kubeCfg config.KubeConfig + taskCfg string + port int + cleanupInfo engine.CleanupInfo +} + func mainInternal() error { - var ( - kubeConfigPath, kubeCtx, taskConfigs string - qps float64 - burst int - cleanupInfo engine.CleanupInfo - ) - flag.StringVar(&kubeConfigPath, "kubeconfig", "", "kubeconfig file path") - flag.StringVar(&kubeCtx, "kubectx", "", "kube context") - flag.BoolVar(&cleanupInfo.Enabled, "cleanup", false, "delete objects") - flag.DurationVar(&cleanupInfo.Timeout, "cleanup.timeout", engine.DefaultCleanupTimeout, "time limit for cleanup") - flag.StringVar(&taskConfigs, "tasks", "", "comma-separated list of task config files and dirs") - flag.Float64Var(&qps, "kube-api-qps", 500, "Maximum QPS to use while talking with Kubernetes API") - flag.IntVar(&burst, "kube-api-burst", 500, "Maximum burst for throttle while talking with Kubernetes API") + var args Args + flag.StringVar(&args.kubeCfg.KubeConfigPath, "kubeconfig", "", "kubeconfig file path") + flag.StringVar(&args.kubeCfg.KubeCtx, "kubectx", "", "kube context") + flag.Float64Var(&args.kubeCfg.QPS, "kube-api-qps", 500, "Maximum QPS to use while talking with Kubernetes API") + flag.IntVar(&args.kubeCfg.Burst, "kube-api-burst", 500, "Maximum burst for throttle while talking with Kubernetes API") + flag.BoolVar(&args.cleanupInfo.Enabled, "cleanup", false, "delete objects") + flag.DurationVar(&args.cleanupInfo.Timeout, "cleanup.timeout", engine.DefaultCleanupTimeout, "time limit for cleanup") + flag.StringVar(&args.taskCfg, "tasks", "", "comma-separated list of task config files and dirs (mutually excluded with the 'port' flag)") + flag.IntVar(&args.port, "port", 0, "listening port (mutually excluded with the 'tasks' flag)") klog.InitFlags(nil) flag.Parse() - if len(taskConfigs) == 0 { + if err := validate(&args); err != nil { flag.Usage() - return fmt.Errorf("missing task config") + return err } - taskconfigs, err := config.NewFromPaths(taskConfigs) + log := textlogger.NewLogger(textlogger.NewConfig(textlogger.Verbosity(utils.Flag2Verbosity(flag.Lookup("v"))))) + + restConfig, err := utils.GetK8sConfig(log, &args.kubeCfg) if err != nil { return err } - if len(taskconfigs) == 0 { - return fmt.Errorf("missing 'tasks' argument") - } - log := textlogger.NewLogger(textlogger.NewConfig(textlogger.Verbosity(utils.Flag2Verbosity(flag.Lookup("v"))))) - cfg := &config.KubeConfig{ - KubeConfigPath: kubeConfigPath, - KubeCtx: kubeCtx, - QPS: float32(qps), - Burst: burst, - } - restConfig, err := utils.GetK8sConfig(log, cfg) + eng, err := engine.New(log, restConfig, &args.cleanupInfo) if err != nil { return err } - eng, err := engine.New(log, restConfig, &cleanupInfo) + if args.port > 0 { + return server.New(&log, eng, args.port).Run() + } + + taskconfigs, err := config.NewFromPaths(args.taskCfg) if err != nil { return err } + if len(taskconfigs) == 0 { + return fmt.Errorf("missing 'tasks' argument") + } ctx := context.Background() - for _, taskconfig := range taskconfigs { log.Info("Starting test", "name", taskconfig.Name) if err := engine.Run(ctx, eng, taskconfig); err != nil { @@ -90,6 +92,18 @@ func mainInternal() error { return nil } +func validate(args *Args) error { + if len(args.taskCfg) == 0 && args.port == 0 { + return fmt.Errorf("must specify task config or server port") + } + + if len(args.taskCfg) != 0 && args.port > 0 { + return fmt.Errorf("task config and server port are mutually excluded") + } + + return nil +} + func main() { defer klog.Flush() if err := mainInternal(); err != nil { diff --git a/go.mod b/go.mod index e8a606f..ea5482c 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.21 require ( github.com/go-logr/logr v1.4.1 github.com/maja42/goval v1.4.0 + github.com/oklog/run v1.1.0 github.com/stretchr/testify v1.8.4 gopkg.in/yaml.v3 v3.0.1 k8s.io/api v0.29.3 diff --git a/go.sum b/go.sum index 0461e0b..02bd04b 100644 --- a/go.sum +++ b/go.sum @@ -59,6 +59,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA= +github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU= github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4= github.com/onsi/ginkgo/v2 v2.13.0/go.mod h1:TE309ZR8s5FsKKpuB1YAQYBzCaAfUgatB/xlT/ETL/o= github.com/onsi/gomega v1.29.0 h1:KIA/t2t5UBzoirT4H9tsML45GEbo3ouUnBHsCfD2tVg= diff --git a/pkg/config/config.go b/pkg/config/config.go index d5f9480..c506abe 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -28,7 +28,7 @@ import ( type KubeConfig struct { KubeConfigPath string KubeCtx string - QPS float32 + QPS float64 Burst int } @@ -39,10 +39,10 @@ type TaskConfig struct { } type Task struct { - ID string `yaml:"id"` - Type string `yaml:"type"` - Description string `yaml:"description,omitempty"` - Params map[string]interface{} `yaml:"params,omitempty"` + ID string `yaml:"id" json:"id"` + Type string `yaml:"type" json:"type"` + Description string `yaml:"description,omitempty" json:"description,omitempty"` + Params map[string]interface{} `yaml:"params,omitempty" json:"params,omitempty"` } // New populates task config from raw data diff --git a/pkg/server/server.go b/pkg/server/server.go new file mode 100644 index 0000000..8cdc38d --- /dev/null +++ b/pkg/server/server.go @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package server + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "syscall" + + "github.com/go-logr/logr" + "github.com/oklog/run" + + "github.com/NVIDIA/knavigator/pkg/config" + "github.com/NVIDIA/knavigator/pkg/engine" +) + +type Server struct { + s *http.Server + log *logr.Logger +} + +type TaskHandler struct { + eng *engine.Eng +} + +type ResetHandler struct { + eng *engine.Eng +} + +func New(log *logr.Logger, eng *engine.Eng, port int) *Server { + mux := http.NewServeMux() + mux.Handle("/task", &TaskHandler{eng: eng}) + mux.Handle("/reset", &ResetHandler{eng: eng}) + return &Server{ + log: log, + s: &http.Server{ // #nosec G112 + Addr: fmt.Sprintf(":%d", port), + Handler: mux, + }, + } +} + +func (srv *Server) Run() error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var g run.Group + // Signal handler + g.Add(run.SignalHandler(ctx, os.Interrupt, syscall.SIGTERM)) + // Server + g.Add( + func() error { + srv.log.Info("Starting server", "address", srv.s.Addr) + return srv.s.ListenAndServe() + }, + func(err error) { + srv.log.Error(err, "Stopping server") + if err := srv.s.Shutdown(ctx); err != nil { + srv.log.Error(err, "Error during server shutdown") + } + srv.log.Info("Server stopped") + }) + + return g.Run() +} + +func (h *TaskHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Invalid request method", http.StatusMethodNotAllowed) + return + } + + body, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "Unable to read request body", http.StatusBadRequest) + return + } + defer r.Body.Close() //nolint:errcheck + + var task config.Task + if err = json.Unmarshal(body, &task); err != nil { + http.Error(w, "Invalid JSON format", http.StatusBadRequest) + return + } + + if err = h.eng.RunTask(r.Context(), &task); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} + +func (h *ResetHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Invalid request method", http.StatusMethodNotAllowed) + return + } + + if err := h.eng.Reset(r.Context()); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} diff --git a/pkg/server/testdata/sleep.json b/pkg/server/testdata/sleep.json new file mode 100644 index 0000000..36ce146 --- /dev/null +++ b/pkg/server/testdata/sleep.json @@ -0,0 +1,8 @@ +{ + "id": "sleep", + "type": "Sleep", + "description": "curl -X POST -H 'Content-Type: application/json' -d @sleep.json http://localhost:12345/task", + "params": { + "timeout": "5s" + } +} diff --git a/pkg/utils/k8s_config.go b/pkg/utils/k8s_config.go index 26dffb0..4e3d5a5 100644 --- a/pkg/utils/k8s_config.go +++ b/pkg/utils/k8s_config.go @@ -33,7 +33,7 @@ func GetK8sConfig(log logr.Logger, cfg *cfg.KubeConfig) (*rest.Config, error) { restConfig, err := rest.InClusterConfig() if err == nil { restConfig.Burst = cfg.Burst - restConfig.QPS = cfg.QPS + restConfig.QPS = float32(cfg.QPS) log.Info("Using in-cluster kubeconfig") return restConfig, err }