-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #49 from netboxlabs/feat/OBS-688-worker-backend
feat: Implement orb worker backend
- Loading branch information
Showing
8 changed files
with
520 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
package worker | ||
|
||
import ( | ||
"encoding/json" | ||
"fmt" | ||
"io" | ||
"net/http" | ||
"time" | ||
|
||
"go.uber.org/zap" | ||
|
||
"github.com/netboxlabs/orb-agent/agent/backend" | ||
) | ||
|
||
func (d *workerBackend) getProcRunningStatus() (backend.RunningStatus, string, error) { | ||
if d.proc == nil { | ||
return backend.Unknown, "backend not started yet", nil | ||
} | ||
status := d.proc.Status() | ||
|
||
if status.Error != nil { | ||
errMsg := fmt.Sprintf("worker process error: %v", status.Error) | ||
return backend.BackendError, errMsg, status.Error | ||
} | ||
|
||
if status.Complete { | ||
err := d.proc.Stop() | ||
return backend.Offline, "worker process ended", err | ||
} | ||
|
||
if status.StopTs > 0 { | ||
return backend.Offline, "worker process ended", nil | ||
} | ||
return backend.Running, "", nil | ||
} | ||
|
||
// note this needs to be stateless because it is called for multiple go routines | ||
func (d *workerBackend) request(url string, payload interface{}, method string, body io.Reader, contentType string, timeout int32) error { | ||
client := http.Client{ | ||
Timeout: time.Second * time.Duration(timeout), | ||
} | ||
|
||
status, _, err := d.getProcRunningStatus() | ||
if status != backend.Running { | ||
d.logger.Warn("skipping device discovery REST API request because process is not running or is unresponsive", zap.String("url", url), zap.String("method", method), zap.Error(err)) | ||
return err | ||
} | ||
|
||
URL := fmt.Sprintf("%s://%s:%s/api/v1/%s", d.apiProtocol, d.apiHost, d.apiPort, url) | ||
|
||
req, err := http.NewRequest(method, URL, body) | ||
if err != nil { | ||
d.logger.Error("received error from payload", zap.Error(err)) | ||
return err | ||
} | ||
|
||
req.Header.Add("Content-Type", contentType) | ||
res, getErr := client.Do(req) | ||
|
||
if getErr != nil { | ||
d.logger.Error("received error from payload", zap.Error(getErr)) | ||
return getErr | ||
} | ||
|
||
defer func() { | ||
if err := res.Body.Close(); err != nil { | ||
d.logger.Error("failed to close response body", zap.Error(err)) | ||
} | ||
}() | ||
|
||
if (res.StatusCode < 200) || (res.StatusCode > 299) { | ||
body, err := io.ReadAll(res.Body) | ||
if err != nil { | ||
return fmt.Errorf("non 2xx HTTP error code from worker, no or invalid body: %d", res.StatusCode) | ||
} | ||
if len(body) == 0 { | ||
return fmt.Errorf("%d empty body", res.StatusCode) | ||
} else if body[0] == '{' { | ||
var jsonBody map[string]interface{} | ||
err := json.Unmarshal(body, &jsonBody) | ||
if err == nil { | ||
if errMsg, ok := jsonBody["error"]; ok { | ||
return fmt.Errorf("%d %s", res.StatusCode, errMsg) | ||
} | ||
} | ||
} | ||
} | ||
|
||
if res.Body != nil { | ||
err = json.NewDecoder(res.Body).Decode(&payload) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
package worker | ||
|
||
import ( | ||
"github.com/spf13/viper" | ||
) | ||
|
||
// RegisterBackendSpecificVariables registers the backend specific variables for the worker backend | ||
func RegisterBackendSpecificVariables(v *viper.Viper) { | ||
v.SetDefault("orb.backends.worker.host", defaultAPIHost) | ||
v.SetDefault("orb.backends.worker.port", defaultAPIPort) | ||
} |
Oops, something went wrong.