Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement HTTP server for task processing #58

Merged
merged 1 commit into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

LINTER_BIN ?= golangci-lint
DOCKER_BIN ?= docker
TARGETS := knavigator
TARGETS := knavigator klient
CMD_DIR := ./cmd
OUTPUT_DIR := ./bin

Expand Down
99 changes: 99 additions & 0 deletions cmd/klient/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"bytes"
"flag"
"fmt"
"io"
"net/http"
"net/url"
"os"

"gopkg.in/yaml.v3"
"k8s.io/klog/v2"

"github.com/NVIDIA/knavigator/pkg/config"
)

func mainInternal() error {
var addr, workflow string
flag.StringVar(&addr, "address", "", "server address")
flag.StringVar(&workflow, "workflow", "", "comma-separated list of workflow config files and dirs")
flag.Parse()

if len(addr) == 0 {
return fmt.Errorf("missing 'address' argument")
}
if len(workflow) == 0 {
return fmt.Errorf("missing 'workload' argument")
}

workflows, err := config.NewFromPaths(workflow)
if err != nil {
return err
}

urlPath, err := url.JoinPath(addr, "workflow")
if err != nil {
return err
}

for _, workflow := range workflows {
fmt.Printf("Starting workflow %s\n", workflow.Name)
if err := execWorkflow(urlPath, workflow); err != nil {
return err
}
}

return nil
}

func execWorkflow(urlPath string, workflow *config.Workflow) error {

data, err := yaml.Marshal(workflow)
if err != nil {
return fmt.Errorf("failed to marshal to YAML: %v", err)
}

resp, err := http.Post(urlPath, "application/x-yaml", bytes.NewBuffer(data)) // #nosec G107 // Potential HTTP request made with variable url
if err != nil {
return fmt.Errorf("failed to create request: %v", err)
}

defer resp.Body.Close() //nolint:errcheck // No check for the return value of Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response: %v", err)
}

if resp.StatusCode != http.StatusOK {
fmt.Printf("Error response: %s: %s\n", resp.Status, body)
return fmt.Errorf("%s", resp.Status)
}

return nil
}

func main() {
defer klog.Flush()
if err := mainInternal(); err != nil {
klog.Error(err.Error())
os.Exit(1)
}
}
65 changes: 40 additions & 25 deletions cmd/knavigator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,50 +27,53 @@ import (

"github.com/NVIDIA/knavigator/pkg/config"
"github.com/NVIDIA/knavigator/pkg/engine"
"github.com/NVIDIA/knavigator/pkg/server"
"github.com/NVIDIA/knavigator/pkg/utils"
)

type Args struct {
kubeCfg config.KubeConfig
workflow string
port int
cleanupInfo engine.CleanupInfo
}

func mainInternal() error {
var (
kubeConfigPath, kubeCtx, workflow string
qps float64
burst int
cleanupInfo engine.CleanupInfo
)
flag.StringVar(&kubeConfigPath, "kubeconfig", "", "kubeconfig file path")
flag.StringVar(&kubeCtx, "kubectx", "", "kube context")
flag.BoolVar(&cleanupInfo.Enabled, "cleanup", false, "delete objects")
flag.DurationVar(&cleanupInfo.Timeout, "cleanup.timeout", engine.DefaultCleanupTimeout, "time limit for cleanup")
flag.StringVar(&workflow, "workflow", "", "comma-separated list of workflow config files and dirs")
flag.Float64Var(&qps, "kube-api-qps", 500, "Maximum QPS to use while talking with Kubernetes API")
flag.IntVar(&burst, "kube-api-burst", 500, "Maximum burst for throttle while talking with Kubernetes API")
var args Args
flag.StringVar(&args.kubeCfg.KubeConfigPath, "kubeconfig", "", "kubeconfig file path")
flag.StringVar(&args.kubeCfg.KubeCtx, "kubectx", "", "kube context")
flag.Float64Var(&args.kubeCfg.QPS, "kube-api-qps", 500, "Maximum QPS to use while talking with Kubernetes API")
flag.IntVar(&args.kubeCfg.Burst, "kube-api-burst", 500, "Maximum burst for throttle while talking with Kubernetes API")
flag.BoolVar(&args.cleanupInfo.Enabled, "cleanup", false, "delete objects")
flag.DurationVar(&args.cleanupInfo.Timeout, "cleanup.timeout", engine.DefaultCleanupTimeout, "time limit for cleanup")
flag.StringVar(&args.workflow, "workflow", "", "comma-separated list of workflow config files and dirs (mutually exclusive with the 'port' flag)")
flag.IntVar(&args.port, "port", 0, "listening port (mutually exclusive with the 'workflow' flag)")

klog.InitFlags(nil)
flag.Parse()

if len(workflow) == 0 {
if err := validate(&args); err != nil {
flag.Usage()
return fmt.Errorf("missing 'workflow' argument")
return err
}

workflows, err := config.NewFromPaths(workflow)
log := textlogger.NewLogger(textlogger.NewConfig(textlogger.Verbosity(utils.Flag2Verbosity(flag.Lookup("v")))))

restConfig, err := utils.GetK8sConfig(log, &args.kubeCfg)
if err != nil {
return err
}

log := textlogger.NewLogger(textlogger.NewConfig(textlogger.Verbosity(utils.Flag2Verbosity(flag.Lookup("v")))))
cfg := &config.KubeConfig{
KubeConfigPath: kubeConfigPath,
KubeCtx: kubeCtx,
QPS: float32(qps),
Burst: burst,
}
restConfig, err := utils.GetK8sConfig(log, cfg)
eng, err := engine.New(log, restConfig, &args.cleanupInfo)
if err != nil {
return err
}

eng, err := engine.New(log, restConfig, &cleanupInfo)
if args.port > 0 {
return server.New(&log, eng, args.port).Run()
}

workflows, err := config.NewFromPaths(args.workflow)
if err != nil {
return err
}
Expand All @@ -87,6 +90,18 @@ func mainInternal() error {
return nil
}

func validate(args *Args) error {
if len(args.workflow) == 0 && args.port == 0 {
return fmt.Errorf("must specify 'workflow' or 'port'")
}

if len(args.workflow) != 0 && args.port > 0 {
return fmt.Errorf("'workflow' and 'port' are mutually exclusive")
}

return nil
}

func main() {
defer klog.Flush()
if err := mainInternal(); err != nil {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.21
require (
github.com/go-logr/logr v1.4.1
github.com/maja42/goval v1.4.0
github.com/oklog/run v1.1.0
github.com/stretchr/testify v1.8.4
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.29.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA=
github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=
github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4=
github.com/onsi/ginkgo/v2 v2.13.0/go.mod h1:TE309ZR8s5FsKKpuB1YAQYBzCaAfUgatB/xlT/ETL/o=
github.com/onsi/gomega v1.29.0 h1:KIA/t2t5UBzoirT4H9tsML45GEbo3ouUnBHsCfD2tVg=
Expand Down
6 changes: 3 additions & 3 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
type KubeConfig struct {
KubeConfigPath string
KubeCtx string
QPS float32
QPS float64
Burst int
}

Expand All @@ -45,7 +45,7 @@ type Task struct {
Params map[string]interface{} `yaml:"params,omitempty"`
}

// New populates task config from raw data
// New populates workflow config from raw data
func New(data []byte) (*Workflow, error) {
var config Workflow

Expand All @@ -60,7 +60,7 @@ func New(data []byte) (*Workflow, error) {
return &config, nil
}

// NewFromFile populates test config from YAML file
// NewFromFile populates workflow config from YAML file
func NewFromFile(path string) (*Workflow, error) {
path = filepath.Clean(path)
data, err := os.ReadFile(path)
Expand Down
4 changes: 2 additions & 2 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,13 @@ tasks:
}
}

func TestConfigFile(t *testing.T) {
func TestWorkflowFile(t *testing.T) {
c, err := NewFromFile("../../resources/workflows/test-custom-resource.yml")
require.NoError(t, err)
require.NotNil(t, c)
}

func TestConfigPaths(t *testing.T) {
func TestWorkflowPaths(t *testing.T) {
testCases := []struct {
name string
paths string
Expand Down
4 changes: 2 additions & 2 deletions pkg/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ func New(log logr.Logger, config *rest.Config, cleanupInfo *CleanupInfo, sim ...
return eng, nil
}

func Run(ctx context.Context, eng Engine, testconfig *config.Workflow) error {
func Run(ctx context.Context, eng Engine, workflow *config.Workflow) error {
var errExec error
for _, cfg := range testconfig.Tasks {
for _, cfg := range workflow.Tasks {
if errExec = eng.RunTask(ctx, cfg); errExec != nil {
break
}
Expand Down
106 changes: 106 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package server

import (
"context"
"fmt"
"io"
"net/http"
"os"
"syscall"

"github.com/go-logr/logr"
"github.com/oklog/run"
"gopkg.in/yaml.v3"

"github.com/NVIDIA/knavigator/pkg/config"
"github.com/NVIDIA/knavigator/pkg/engine"
)

type Server struct {
s *http.Server
log *logr.Logger
}

type WorkflowHandler struct {
eng *engine.Eng
}

func New(log *logr.Logger, eng *engine.Eng, port int) *Server {
mux := http.NewServeMux()
mux.Handle("/workflow", &WorkflowHandler{eng: eng})

return &Server{
log: log,
s: &http.Server{ // #nosec G112 // Potential Slowloris Attack because ReadHeaderTimeout is not configured in the http.Server
Addr: fmt.Sprintf(":%d", port),
Handler: mux,
},
}
}

func (srv *Server) Run() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var g run.Group
// Signal handler
g.Add(run.SignalHandler(ctx, os.Interrupt, syscall.SIGTERM))
// Server
g.Add(
func() error {
srv.log.Info("Starting server", "address", srv.s.Addr)
return srv.s.ListenAndServe()
dmitsh marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need some form of authn and authz? I think a user who is able to access the server will be able to privilege escalate since the user and the server use two different RBAC means. What are your thoughts on this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to complicate things. This is a local kind cluster, the nodes are virtual. I think authn/authz will be an overkill.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, fair point. Then we should add documentation describing the risks for users who want to have a long running cluster, maybe for simulation testing/QA.

I was also wondering; can we leverage the JWT tokens generated for Kubernetes service accounts? Use the tokens for a service account (https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/#manually-create-an-api-token-for-a-serviceaccount) for making calls to the Kubernetes API and let the Kubernetes API server handle auth. User will provide the token as a part of the Authorization header.

},
func(err error) {
srv.log.Error(err, "Stopping server")
if err := srv.s.Shutdown(ctx); err != nil {
srv.log.Error(err, "Error during server shutdown")
}
srv.log.Info("Server stopped")
})

return g.Run()
}

func (h *WorkflowHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
return
}

body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Unable to read request body", http.StatusBadRequest)
return
}
defer r.Body.Close() //nolint:errcheck // No check for the return value of Body.Close()

var workflow config.Workflow
if err = yaml.Unmarshal(body, &workflow); err != nil {
http.Error(w, "Invalid YAML format", http.StatusBadRequest)
return
}

if err = engine.Run(r.Context(), h.eng, &workflow); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusOK)
}
2 changes: 1 addition & 1 deletion pkg/utils/k8s_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func GetK8sConfig(log logr.Logger, cfg *cfg.KubeConfig) (*rest.Config, error) {
restConfig, err := rest.InClusterConfig()
if err == nil {
restConfig.Burst = cfg.Burst
restConfig.QPS = cfg.QPS
restConfig.QPS = float32(cfg.QPS)
log.Info("Using in-cluster kubeconfig")
return restConfig, err
}
Expand Down
Loading