Skip to content

Commit

Permalink
implement HTTP server for task processing
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Shmulevich <[email protected]>
  • Loading branch information
dmitsh committed Jun 2, 2024
1 parent 322d3d7 commit 6703522
Show file tree
Hide file tree
Showing 9 changed files with 320 additions and 36 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

LINTER_BIN ?= golangci-lint
DOCKER_BIN ?= docker
TARGETS := knavigator
TARGETS := knavigator klient
CMD_DIR := ./cmd
OUTPUT_DIR := ./bin

Expand Down
135 changes: 135 additions & 0 deletions cmd/klient/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"bytes"
"encoding/json"
"flag"
"fmt"
"io"
"net/http"
"net/url"
"os"

"k8s.io/klog/v2"

"github.com/NVIDIA/knavigator/pkg/config"
)

func mainInternal() error {
var addr, taskCfg string
flag.StringVar(&addr, "addr", "", "server address")
flag.StringVar(&taskCfg, "tasks", "", "comma-separated list of task config files and dirs")
flag.Parse()

taskconfigs, err := config.NewFromPaths(taskCfg)
if err != nil {
return err
}
if len(taskconfigs) == 0 {
return fmt.Errorf("missing 'tasks' argument")
}

taskURL, err := url.JoinPath(addr, "task")
if err != nil {
return err
}
resetURL, err := url.JoinPath(addr, "reset")
if err != nil {
return err
}

for _, taskconfig := range taskconfigs {
fmt.Printf("Starting test %s\n", taskconfig.Name)
if err := execTaskConfig(taskURL, resetURL, taskconfig); err != nil {
return err
}
}

return nil
}

func execTaskConfig(taskURL, resetURL string, taskconfig *config.TaskConfig) error {
var err error
for _, task := range taskconfig.Tasks {
if err = execTask(taskURL, task); err != nil {
break
}
}

if errReset := execReset(resetURL); errReset != nil {
if err == nil {
err = errReset
}
}

return err
}

func execTask(urlPath string, task *config.Task) error {
jsonData, err := json.Marshal(task)
if err != nil {
return fmt.Errorf("failed to marshal to JSON: %v", err)
}

resp, err := http.Post(urlPath, "application/json", bytes.NewBuffer(jsonData)) // #nosec G107
if err != nil {
return fmt.Errorf("failed to create request: %v", err)
}

defer resp.Body.Close() //nolint:errcheck
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response: %v", err)
}

if resp.StatusCode != http.StatusOK {
fmt.Printf("Error response: %s: %s\n", resp.Status, body)
return fmt.Errorf("%s", resp.Status)
}

return nil
}

func execReset(urlPath string) error {
resp, err := http.Get(urlPath) // #nosec G107
if err != nil {
return fmt.Errorf("failed to create request: %v", err)
}

defer func() { _ = resp.Body.Close() }()
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response: %v", err)
}

if resp.StatusCode != http.StatusOK {
fmt.Printf("Error response: %s: %s\n", resp.Status, body)
return fmt.Errorf("%s", resp.Status)
}

return nil
}

func main() {
defer klog.Flush()
if err := mainInternal(); err != nil {
klog.Error(err.Error())
os.Exit(1)
}
}
72 changes: 43 additions & 29 deletions cmd/knavigator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,59 +27,61 @@ import (

"github.com/NVIDIA/knavigator/pkg/config"
"github.com/NVIDIA/knavigator/pkg/engine"
"github.com/NVIDIA/knavigator/pkg/server"
"github.com/NVIDIA/knavigator/pkg/utils"
)

type Args struct {
kubeCfg config.KubeConfig
taskCfg string
port int
cleanupInfo engine.CleanupInfo
}

func mainInternal() error {
var (
kubeConfigPath, kubeCtx, taskConfigs string
qps float64
burst int
cleanupInfo engine.CleanupInfo
)
flag.StringVar(&kubeConfigPath, "kubeconfig", "", "kubeconfig file path")
flag.StringVar(&kubeCtx, "kubectx", "", "kube context")
flag.BoolVar(&cleanupInfo.Enabled, "cleanup", false, "delete objects")
flag.DurationVar(&cleanupInfo.Timeout, "cleanup.timeout", engine.DefaultCleanupTimeout, "time limit for cleanup")
flag.StringVar(&taskConfigs, "tasks", "", "comma-separated list of task config files and dirs")
flag.Float64Var(&qps, "kube-api-qps", 500, "Maximum QPS to use while talking with Kubernetes API")
flag.IntVar(&burst, "kube-api-burst", 500, "Maximum burst for throttle while talking with Kubernetes API")
var args Args
flag.StringVar(&args.kubeCfg.KubeConfigPath, "kubeconfig", "", "kubeconfig file path")
flag.StringVar(&args.kubeCfg.KubeCtx, "kubectx", "", "kube context")
flag.Float64Var(&args.kubeCfg.QPS, "kube-api-qps", 500, "Maximum QPS to use while talking with Kubernetes API")
flag.IntVar(&args.kubeCfg.Burst, "kube-api-burst", 500, "Maximum burst for throttle while talking with Kubernetes API")
flag.BoolVar(&args.cleanupInfo.Enabled, "cleanup", false, "delete objects")
flag.DurationVar(&args.cleanupInfo.Timeout, "cleanup.timeout", engine.DefaultCleanupTimeout, "time limit for cleanup")
flag.StringVar(&args.taskCfg, "tasks", "", "comma-separated list of task config files and dirs (mutually excluded with the 'port' flag)")
flag.IntVar(&args.port, "port", 0, "listening port (mutually excluded with the 'tasks' flag)")

klog.InitFlags(nil)
flag.Parse()

if len(taskConfigs) == 0 {
if err := validate(&args); err != nil {
flag.Usage()
return fmt.Errorf("missing task config")
return err
}

taskconfigs, err := config.NewFromPaths(taskConfigs)
log := textlogger.NewLogger(textlogger.NewConfig(textlogger.Verbosity(utils.Flag2Verbosity(flag.Lookup("v")))))

restConfig, err := utils.GetK8sConfig(log, &args.kubeCfg)
if err != nil {
return err
}
if len(taskconfigs) == 0 {
return fmt.Errorf("missing 'tasks' argument")
}

log := textlogger.NewLogger(textlogger.NewConfig(textlogger.Verbosity(utils.Flag2Verbosity(flag.Lookup("v")))))
cfg := &config.KubeConfig{
KubeConfigPath: kubeConfigPath,
KubeCtx: kubeCtx,
QPS: float32(qps),
Burst: burst,
}
restConfig, err := utils.GetK8sConfig(log, cfg)
eng, err := engine.New(log, restConfig, &args.cleanupInfo)
if err != nil {
return err
}

eng, err := engine.New(log, restConfig, &cleanupInfo)
if args.port > 0 {
return server.New(&log, eng, args.port).Run()
}

taskconfigs, err := config.NewFromPaths(args.taskCfg)
if err != nil {
return err
}
if len(taskconfigs) == 0 {
return fmt.Errorf("missing 'tasks' argument")
}

ctx := context.Background()

for _, taskconfig := range taskconfigs {
log.Info("Starting test", "name", taskconfig.Name)
if err := engine.Run(ctx, eng, taskconfig); err != nil {
Expand All @@ -90,6 +92,18 @@ func mainInternal() error {
return nil
}

func validate(args *Args) error {
if len(args.taskCfg) == 0 && args.port == 0 {
return fmt.Errorf("must specify task config or server port")
}

if len(args.taskCfg) != 0 && args.port > 0 {
return fmt.Errorf("task config and server port are mutually excluded")
}

return nil
}

func main() {
defer klog.Flush()
if err := mainInternal(); err != nil {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.21
require (
github.com/go-logr/logr v1.4.1
github.com/maja42/goval v1.4.0
github.com/oklog/run v1.1.0
github.com/stretchr/testify v1.8.4
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.29.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA=
github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=
github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4=
github.com/onsi/ginkgo/v2 v2.13.0/go.mod h1:TE309ZR8s5FsKKpuB1YAQYBzCaAfUgatB/xlT/ETL/o=
github.com/onsi/gomega v1.29.0 h1:KIA/t2t5UBzoirT4H9tsML45GEbo3ouUnBHsCfD2tVg=
Expand Down
10 changes: 5 additions & 5 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
type KubeConfig struct {
KubeConfigPath string
KubeCtx string
QPS float32
QPS float64
Burst int
}

Expand All @@ -39,10 +39,10 @@ type TaskConfig struct {
}

type Task struct {
ID string `yaml:"id"`
Type string `yaml:"type"`
Description string `yaml:"description,omitempty"`
Params map[string]interface{} `yaml:"params,omitempty"`
ID string `yaml:"id" json:"id"`
Type string `yaml:"type" json:"type"`
Description string `yaml:"description,omitempty" json:"description,omitempty"`
Params map[string]interface{} `yaml:"params,omitempty" json:"params,omitempty"`
}

// New populates task config from raw data
Expand Down
Loading

0 comments on commit 6703522

Please sign in to comment.