From d22972ad56aaaebc5185b47f308483820efd76ac Mon Sep 17 00:00:00 2001 From: Dmitry Shmulevich Date: Sat, 1 Jun 2024 19:55:15 -0700 Subject: [PATCH] implement HTTP server for task processing Signed-off-by: Dmitry Shmulevich --- Makefile | 2 +- cmd/klient/main.go | 99 +++++++++++++++++++++++++++++++++++ cmd/knavigator/main.go | 65 ++++++++++++++--------- go.mod | 1 + go.sum | 2 + pkg/config/config.go | 6 +-- pkg/config/config_test.go | 4 +- pkg/engine/engine.go | 4 +- pkg/server/server.go | 106 ++++++++++++++++++++++++++++++++++++++ pkg/utils/k8s_config.go | 2 +- 10 files changed, 257 insertions(+), 34 deletions(-) create mode 100644 cmd/klient/main.go create mode 100644 pkg/server/server.go diff --git a/Makefile b/Makefile index d4c1a8c..d7eda53 100644 --- a/Makefile +++ b/Makefile @@ -14,7 +14,7 @@ LINTER_BIN ?= golangci-lint DOCKER_BIN ?= docker -TARGETS := knavigator +TARGETS := knavigator klient CMD_DIR := ./cmd OUTPUT_DIR := ./bin diff --git a/cmd/klient/main.go b/cmd/klient/main.go new file mode 100644 index 0000000..9e212b0 --- /dev/null +++ b/cmd/klient/main.go @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "bytes" + "flag" + "fmt" + "io" + "net/http" + "net/url" + "os" + + "gopkg.in/yaml.v3" + "k8s.io/klog/v2" + + "github.com/NVIDIA/knavigator/pkg/config" +) + +func mainInternal() error { + var addr, workflow string + flag.StringVar(&addr, "address", "", "server address") + flag.StringVar(&workflow, "workflow", "", "comma-separated list of workflow config files and dirs") + flag.Parse() + + if len(addr) == 0 { + return fmt.Errorf("missing 'address' argument") + } + if len(workflow) == 0 { + return fmt.Errorf("missing 'workload' argument") + } + + workflows, err := config.NewFromPaths(workflow) + if err != nil { + return err + } + + urlPath, err := url.JoinPath(addr, "workflow") + if err != nil { + return err + } + + for _, workflow := range workflows { + fmt.Printf("Starting workflow %s\n", workflow.Name) + if err := execWorkflow(urlPath, workflow); err != nil { + return err + } + } + + return nil +} + +func execWorkflow(urlPath string, workflow *config.Workflow) error { + + data, err := yaml.Marshal(workflow) + if err != nil { + return fmt.Errorf("failed to marshal to YAML: %v", err) + } + + resp, err := http.Post(urlPath, "application/x-yaml", bytes.NewBuffer(data)) // #nosec G107 // Potential HTTP request made with variable url + if err != nil { + return fmt.Errorf("failed to create request: %v", err) + } + + defer resp.Body.Close() //nolint:errcheck // No check for the return value of Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to read response: %v", err) + } + + if resp.StatusCode != http.StatusOK { + fmt.Printf("Error response: %s: %s\n", resp.Status, body) + return fmt.Errorf("%s", resp.Status) + } + + return nil +} + +func main() { + defer klog.Flush() + if err := mainInternal(); err != nil { + klog.Error(err.Error()) + os.Exit(1) + } +} diff --git a/cmd/knavigator/main.go b/cmd/knavigator/main.go index b787c9e..8cf9197 100644 --- a/cmd/knavigator/main.go +++ b/cmd/knavigator/main.go @@ -27,50 +27,53 @@ import ( "github.com/NVIDIA/knavigator/pkg/config" "github.com/NVIDIA/knavigator/pkg/engine" + "github.com/NVIDIA/knavigator/pkg/server" "github.com/NVIDIA/knavigator/pkg/utils" ) +type Args struct { + kubeCfg config.KubeConfig + workflow string + port int + cleanupInfo engine.CleanupInfo +} + func mainInternal() error { - var ( - kubeConfigPath, kubeCtx, workflow string - qps float64 - burst int - cleanupInfo engine.CleanupInfo - ) - flag.StringVar(&kubeConfigPath, "kubeconfig", "", "kubeconfig file path") - flag.StringVar(&kubeCtx, "kubectx", "", "kube context") - flag.BoolVar(&cleanupInfo.Enabled, "cleanup", false, "delete objects") - flag.DurationVar(&cleanupInfo.Timeout, "cleanup.timeout", engine.DefaultCleanupTimeout, "time limit for cleanup") - flag.StringVar(&workflow, "workflow", "", "comma-separated list of workflow config files and dirs") - flag.Float64Var(&qps, "kube-api-qps", 500, "Maximum QPS to use while talking with Kubernetes API") - flag.IntVar(&burst, "kube-api-burst", 500, "Maximum burst for throttle while talking with Kubernetes API") + var args Args + flag.StringVar(&args.kubeCfg.KubeConfigPath, "kubeconfig", "", "kubeconfig file path") + flag.StringVar(&args.kubeCfg.KubeCtx, "kubectx", "", "kube context") + flag.Float64Var(&args.kubeCfg.QPS, "kube-api-qps", 500, "Maximum QPS to use while talking with Kubernetes API") + flag.IntVar(&args.kubeCfg.Burst, "kube-api-burst", 500, "Maximum burst for throttle while talking with Kubernetes API") + flag.BoolVar(&args.cleanupInfo.Enabled, "cleanup", false, "delete objects") + flag.DurationVar(&args.cleanupInfo.Timeout, "cleanup.timeout", engine.DefaultCleanupTimeout, "time limit for cleanup") + flag.StringVar(&args.workflow, "workflow", "", "comma-separated list of workflow config files and dirs (mutually exclusive with the 'port' flag)") + flag.IntVar(&args.port, "port", 0, "listening port (mutually exclusive with the 'workflow' flag)") klog.InitFlags(nil) flag.Parse() - if len(workflow) == 0 { + if err := validate(&args); err != nil { flag.Usage() - return fmt.Errorf("missing 'workflow' argument") + return err } - workflows, err := config.NewFromPaths(workflow) + log := textlogger.NewLogger(textlogger.NewConfig(textlogger.Verbosity(utils.Flag2Verbosity(flag.Lookup("v"))))) + + restConfig, err := utils.GetK8sConfig(log, &args.kubeCfg) if err != nil { return err } - log := textlogger.NewLogger(textlogger.NewConfig(textlogger.Verbosity(utils.Flag2Verbosity(flag.Lookup("v"))))) - cfg := &config.KubeConfig{ - KubeConfigPath: kubeConfigPath, - KubeCtx: kubeCtx, - QPS: float32(qps), - Burst: burst, - } - restConfig, err := utils.GetK8sConfig(log, cfg) + eng, err := engine.New(log, restConfig, &args.cleanupInfo) if err != nil { return err } - eng, err := engine.New(log, restConfig, &cleanupInfo) + if args.port > 0 { + return server.New(&log, eng, args.port).Run() + } + + workflows, err := config.NewFromPaths(args.workflow) if err != nil { return err } @@ -87,6 +90,18 @@ func mainInternal() error { return nil } +func validate(args *Args) error { + if len(args.workflow) == 0 && args.port == 0 { + return fmt.Errorf("must specify 'workflow' or 'port'") + } + + if len(args.workflow) != 0 && args.port > 0 { + return fmt.Errorf("'workflow' and 'port' are mutually exclusive") + } + + return nil +} + func main() { defer klog.Flush() if err := mainInternal(); err != nil { diff --git a/go.mod b/go.mod index e8a606f..ea5482c 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.21 require ( github.com/go-logr/logr v1.4.1 github.com/maja42/goval v1.4.0 + github.com/oklog/run v1.1.0 github.com/stretchr/testify v1.8.4 gopkg.in/yaml.v3 v3.0.1 k8s.io/api v0.29.3 diff --git a/go.sum b/go.sum index 0461e0b..02bd04b 100644 --- a/go.sum +++ b/go.sum @@ -59,6 +59,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA= +github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU= github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4= github.com/onsi/ginkgo/v2 v2.13.0/go.mod h1:TE309ZR8s5FsKKpuB1YAQYBzCaAfUgatB/xlT/ETL/o= github.com/onsi/gomega v1.29.0 h1:KIA/t2t5UBzoirT4H9tsML45GEbo3ouUnBHsCfD2tVg= diff --git a/pkg/config/config.go b/pkg/config/config.go index e756fd0..2adb1bc 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -28,7 +28,7 @@ import ( type KubeConfig struct { KubeConfigPath string KubeCtx string - QPS float32 + QPS float64 Burst int } @@ -45,7 +45,7 @@ type Task struct { Params map[string]interface{} `yaml:"params,omitempty"` } -// New populates task config from raw data +// New populates workflow config from raw data func New(data []byte) (*Workflow, error) { var config Workflow @@ -60,7 +60,7 @@ func New(data []byte) (*Workflow, error) { return &config, nil } -// NewFromFile populates test config from YAML file +// NewFromFile populates workflow config from YAML file func NewFromFile(path string) (*Workflow, error) { path = filepath.Clean(path) data, err := os.ReadFile(path) diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 5271bbb..f913a3f 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -80,13 +80,13 @@ tasks: } } -func TestConfigFile(t *testing.T) { +func TestWorkflowFile(t *testing.T) { c, err := NewFromFile("../../resources/workflows/test-custom-resource.yml") require.NoError(t, err) require.NotNil(t, c) } -func TestConfigPaths(t *testing.T) { +func TestWorkflowPaths(t *testing.T) { testCases := []struct { name string paths string diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index 4fed954..be69b94 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -77,9 +77,9 @@ func New(log logr.Logger, config *rest.Config, cleanupInfo *CleanupInfo, sim ... return eng, nil } -func Run(ctx context.Context, eng Engine, testconfig *config.Workflow) error { +func Run(ctx context.Context, eng Engine, workflow *config.Workflow) error { var errExec error - for _, cfg := range testconfig.Tasks { + for _, cfg := range workflow.Tasks { if errExec = eng.RunTask(ctx, cfg); errExec != nil { break } diff --git a/pkg/server/server.go b/pkg/server/server.go new file mode 100644 index 0000000..476a791 --- /dev/null +++ b/pkg/server/server.go @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package server + +import ( + "context" + "fmt" + "io" + "net/http" + "os" + "syscall" + + "github.com/go-logr/logr" + "github.com/oklog/run" + "gopkg.in/yaml.v3" + + "github.com/NVIDIA/knavigator/pkg/config" + "github.com/NVIDIA/knavigator/pkg/engine" +) + +type Server struct { + s *http.Server + log *logr.Logger +} + +type WorkflowHandler struct { + eng *engine.Eng +} + +func New(log *logr.Logger, eng *engine.Eng, port int) *Server { + mux := http.NewServeMux() + mux.Handle("/workflow", &WorkflowHandler{eng: eng}) + + return &Server{ + log: log, + s: &http.Server{ // #nosec G112 // Potential Slowloris Attack because ReadHeaderTimeout is not configured in the http.Server + Addr: fmt.Sprintf(":%d", port), + Handler: mux, + }, + } +} + +func (srv *Server) Run() error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var g run.Group + // Signal handler + g.Add(run.SignalHandler(ctx, os.Interrupt, syscall.SIGTERM)) + // Server + g.Add( + func() error { + srv.log.Info("Starting server", "address", srv.s.Addr) + return srv.s.ListenAndServe() + }, + func(err error) { + srv.log.Error(err, "Stopping server") + if err := srv.s.Shutdown(ctx); err != nil { + srv.log.Error(err, "Error during server shutdown") + } + srv.log.Info("Server stopped") + }) + + return g.Run() +} + +func (h *WorkflowHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Invalid request method", http.StatusMethodNotAllowed) + return + } + + body, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "Unable to read request body", http.StatusBadRequest) + return + } + defer r.Body.Close() //nolint:errcheck // No check for the return value of Body.Close() + + var workflow config.Workflow + if err = yaml.Unmarshal(body, &workflow); err != nil { + http.Error(w, "Invalid YAML format", http.StatusBadRequest) + return + } + + if err = engine.Run(r.Context(), h.eng, &workflow); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} diff --git a/pkg/utils/k8s_config.go b/pkg/utils/k8s_config.go index 26dffb0..4e3d5a5 100644 --- a/pkg/utils/k8s_config.go +++ b/pkg/utils/k8s_config.go @@ -33,7 +33,7 @@ func GetK8sConfig(log logr.Logger, cfg *cfg.KubeConfig) (*rest.Config, error) { restConfig, err := rest.InClusterConfig() if err == nil { restConfig.Burst = cfg.Burst - restConfig.QPS = cfg.QPS + restConfig.QPS = float32(cfg.QPS) log.Info("Using in-cluster kubeconfig") return restConfig, err }