Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement orb worker backend #49

Merged
merged 6 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ orb:
network_discovery:
...
```
Only the `network_discovery` and `device_discovery` backends are currently supported. They do not require any special configuration.
Only the `network_discovery`, `device_discovery` and `worker` backends are currently supported. They do not require any special configuration.
- [Device Discovery](./docs/backends/device_discovery.md)
- [Network Discovery](./docs/backends/network_discovery.md)
- [Worker](./docs/backends/worker.md)

#### Common
A special `common` subsection under `backends` defines configuration settings that are shared with all backends. Currently, it supports passing [diode](https://github.com/netboxlabs/diode) server settings to all backends.
Expand Down Expand Up @@ -66,6 +67,9 @@ orb:
network_discovery:
network_policy_1:
# see docs/backends/network_discovery.md
worker:
worker_policy_1:
# see docs/backends/worker.md
```

## Running the agent
Expand Down
96 changes: 96 additions & 0 deletions agent/backend/worker/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package worker

import (
"encoding/json"
"fmt"
"io"
"net/http"
"time"

"go.uber.org/zap"

"github.com/netboxlabs/orb-agent/agent/backend"
)

func (d *workerBackend) getProcRunningStatus() (backend.RunningStatus, string, error) {
if d.proc == nil {
return backend.Unknown, "backend not started yet", nil
}
status := d.proc.Status()

if status.Error != nil {
errMsg := fmt.Sprintf("worker process error: %v", status.Error)
return backend.BackendError, errMsg, status.Error
}

if status.Complete {
err := d.proc.Stop()
return backend.Offline, "worker process ended", err
}

if status.StopTs > 0 {
return backend.Offline, "worker process ended", nil
}
return backend.Running, "", nil
}

// note this needs to be stateless because it is called for multiple go routines
func (d *workerBackend) request(url string, payload interface{}, method string, body io.Reader, contentType string, timeout int32) error {
client := http.Client{
Timeout: time.Second * time.Duration(timeout),
}

status, _, err := d.getProcRunningStatus()
if status != backend.Running {
d.logger.Warn("skipping device discovery REST API request because process is not running or is unresponsive", zap.String("url", url), zap.String("method", method), zap.Error(err))
return err
}

URL := fmt.Sprintf("%s://%s:%s/api/v1/%s", d.apiProtocol, d.apiHost, d.apiPort, url)

req, err := http.NewRequest(method, URL, body)
if err != nil {
d.logger.Error("received error from payload", zap.Error(err))
return err
}

req.Header.Add("Content-Type", contentType)
res, getErr := client.Do(req)

if getErr != nil {
d.logger.Error("received error from payload", zap.Error(getErr))
return getErr
}

defer func() {
if err := res.Body.Close(); err != nil {
d.logger.Error("failed to close response body", zap.Error(err))
}
}()

if (res.StatusCode < 200) || (res.StatusCode > 299) {
body, err := io.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("non 2xx HTTP error code from worker, no or invalid body: %d", res.StatusCode)
}
if len(body) == 0 {
return fmt.Errorf("%d empty body", res.StatusCode)
} else if body[0] == '{' {
var jsonBody map[string]interface{}
err := json.Unmarshal(body, &jsonBody)
if err == nil {
if errMsg, ok := jsonBody["error"]; ok {
return fmt.Errorf("%d %s", res.StatusCode, errMsg)
}
}
}
}

if res.Body != nil {
err = json.NewDecoder(res.Body).Decode(&payload)
if err != nil {
return err
}
}
return nil
}
11 changes: 11 additions & 0 deletions agent/backend/worker/vars.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package worker

import (
"github.com/spf13/viper"
)

// RegisterBackendSpecificVariables registers the backend specific variables for the worker backend
func RegisterBackendSpecificVariables(v *viper.Viper) {
v.SetDefault("orb.backends.worker.host", defaultAPIHost)
v.SetDefault("orb.backends.worker.port", defaultAPIPort)
}
Loading
Loading