From 96d47e6c0b4efc3b9a0c78d9dfd8b5cb0495caed Mon Sep 17 00:00:00 2001 From: Francois Poinsot Date: Wed, 30 Jan 2019 10:27:42 +0100 Subject: [PATCH] Deploy multiple (#32) --- cli/cmd/create.go | 99 +++-- cli/cmd/delete.go | 7 +- cli/cmd/deploy.go | 14 +- cli/cmd/get.go | 12 +- cli/cmd/helper.go | 6 +- cli/cmd/pause.go | 7 +- cli/cmd/resume.go | 7 +- cli/cmd/root.go | 3 +- cli/cmd/update.go | 17 +- lib/Makefile | 14 +- .../{connectors.go => base_client.go} | 307 ++++++++-------- lib/connectors/client.go | 46 --- lib/connectors/connector_integration_test.go | 54 +++ lib/connectors/highlevel_client.go | 333 +++++++++++++++++ lib/connectors/highlevel_client_test.go | 192 ++++++++++ lib/connectors/mock_base_client.go | 295 +++++++++++++++ lib/connectors/mock_high_level_client.go | 344 ++++++++++++++++++ lib/connectors/tasks.go | 102 ------ lib/docker-compose.yml | 1 + 19 files changed, 1475 insertions(+), 385 deletions(-) rename lib/connectors/{connectors.go => base_client.go} (52%) delete mode 100644 lib/connectors/client.go create mode 100644 lib/connectors/highlevel_client.go create mode 100644 lib/connectors/highlevel_client_test.go create mode 100644 lib/connectors/mock_base_client.go create mode 100644 lib/connectors/mock_high_level_client.go delete mode 100644 lib/connectors/tasks.go diff --git a/cli/cmd/create.go b/cli/cmd/create.go index 734cbc4..7426ddd 100644 --- a/cli/cmd/create.go +++ b/cli/cmd/create.go @@ -16,8 +16,11 @@ package cmd import ( "encoding/json" - "errors" + "github.com/pkg/errors" + "io/ioutil" + "log" "os" + "path" "strings" "github.com/ricardo-ch/go-kafka-connect/lib/connectors" @@ -28,61 +31,99 @@ import ( var createCmd = &cobra.Command{ Use: "create", Short: "Create a new connector", - Long: `Create a connector using either a config file or a literal string - flags: - --url -u: url of the kafka-connect server - --file -f: path to the config file - --string -s: JSON configuration string - --sync -y: execute synchronously - `, - RunE: RunECreate, + RunE: RunECreate, } //RunECreate ... func RunECreate(cmd *cobra.Command, args []string) error { - config, err := getCreateCmdConfig(cmd) + configs, err := getCreateCmdConfig(cmd) if err != nil { return err } - resp, err := getClient().CreateConnector(config, sync) - if err != nil { - return err + //TODO was not expecting I would have to update CreateConnector when adding multiple file deployment feature + // will have to add properly later + for _, config := range configs { + resp, err := getClient().CreateConnector(config, sync) + printResponse(resp) + if err != nil { + return err + } } - - return printResponse(resp) + return nil } -func getCreateCmdConfig(cmd *cobra.Command) (connectors.CreateConnectorRequest, error) { - config := connectors.CreateConnectorRequest{} +func getCreateCmdConfig(cmd *cobra.Command) ([]connectors.CreateConnectorRequest, error) { + var configs []connectors.CreateConnectorRequest - if cmd.Flag("file").Changed { - fileReader, err := os.Open(file) + if cmd.Flag("path").Changed { + fileInfo, err := os.Stat(filePath) if err != nil { - return config, err + return nil, errors.Wrapf(err, "error while trying to find input or folder: %v", filePath) } - - err = json.NewDecoder(fileReader).Decode(&config) - if err != nil { - return config, err + if fileInfo.IsDir() { + configs, err = getConfigFromFolder(filePath) + if err != nil { + return nil, err + } + } else { + config, err := getConfigFromFile(filePath) + if err != nil { + return nil, err + } + configs = append(configs, config) } } else if cmd.Flag("string").Changed { + config := connectors.CreateConnectorRequest{} err := json.NewDecoder(strings.NewReader(configString)).Decode(&config) if err != nil { - return config, err + return nil, err } + configs = append(configs, config) } else { - return config, errors.New("neither file nor string was supplied") + return nil, errors.New("neither path nor string was supplied") + } + return configs, nil +} + +func getConfigFromFolder(folderPath string) ([]connectors.CreateConnectorRequest, error) { + configs := []connectors.CreateConnectorRequest{} + configFiles, err := ioutil.ReadDir(folderPath) + if err != nil { + return configs, err + } + for _, fileInfo := range configFiles { + if fileInfo.IsDir() { + log.Printf("found unexpected subfolder in folder: %s. This command will not search through it.", filePath) + continue + } + config, err := getConfigFromFile(path.Join(folderPath, fileInfo.Name())) + if err != nil { + log.Printf("found unexpected not config file in folder: %s", filePath) + } else { + configs = append(configs, config) + } } - return config, nil + return configs, nil +} + +func getConfigFromFile(filePath string) (connectors.CreateConnectorRequest, error) { + config := connectors.CreateConnectorRequest{} + fileReader, err := os.Open(filePath) + if err != nil { + return config, err + } + + err = json.NewDecoder(fileReader).Decode(&config) + return config, err } func init() { RootCmd.AddCommand(createCmd) - createCmd.PersistentFlags().StringVarP(&file, "file", "f", "", "path to the config file") - createCmd.MarkFlagFilename("file") + createCmd.PersistentFlags().StringVarP(&filePath, "path", "p", "", "path to the config file") + createCmd.MarkFlagFilename("path") createCmd.PersistentFlags().StringVarP(&configString, "string", "s", "", "JSON configuration string") createCmd.PersistentFlags().BoolVarP(&sync, "sync", "y", false, "execute synchronously") diff --git a/cli/cmd/delete.go b/cli/cmd/delete.go index 8022971..f172695 100644 --- a/cli/cmd/delete.go +++ b/cli/cmd/delete.go @@ -23,12 +23,7 @@ import ( var deleteCmd = &cobra.Command{ Use: "delete", Short: "Delete an existing connector", - Long: `Delete an existing connector - flags: - --url -u: url of the kafka-connect server - --connector -n: name of the target connector - --sync -y: execute synchronously`, - RunE: RunEDelete, + RunE: RunEDelete, } //RunEDelete ... diff --git a/cli/cmd/deploy.go b/cli/cmd/deploy.go index 4f14fc2..f51867c 100644 --- a/cli/cmd/deploy.go +++ b/cli/cmd/deploy.go @@ -23,27 +23,23 @@ var deployCmd = &cobra.Command{ Use: "deploy", Short: "Deploy a new connector", Long: `Deploy a new connector or replace the old version if it alrerady exists. - This command is executes all its steps synchronously. - flags: - --url -u: url of the kafka-connect server - --file -f: path to the config file - --string -s: literal configuration string`, + This command is executes all its steps synchronously.`, RunE: RunEDeploy, } func RunEDeploy(cmd *cobra.Command, args []string) error { - config, err := getCreateCmdConfig(cmd) + configs, err := getCreateCmdConfig(cmd) if err != nil { return err } - return getClient().DeployConnector(config) + return getClient().DeployMultipleConnector(configs) } func init() { RootCmd.AddCommand(deployCmd) - deployCmd.PersistentFlags().StringVarP(&file, "file", "f", "", "path to the config file") - deployCmd.MarkFlagFilename("file") + deployCmd.PersistentFlags().StringVarP(&filePath, "path", "p", "", "path to the config file or folder") + deployCmd.MarkFlagFilename("path") deployCmd.PersistentFlags().StringVarP(&configString, "string", "s", "", "JSON configuration string") } diff --git a/cli/cmd/get.go b/cli/cmd/get.go index aa0bc31..3dfb0cc 100644 --- a/cli/cmd/get.go +++ b/cli/cmd/get.go @@ -26,13 +26,7 @@ var getCmd = &cobra.Command{ Use: "get", Short: "Retrieve information from kafka-connect", Long: `Get reads from the kafka-connect REST API. - It can get the list of all deployed connectors, or details about a single one. - flags: - --url -u: url of the kafka-connect server - --connector -n: name of the target connector - --status -s: get the connector's status (requires -n) - --config -c: get the connector's config (requires -n) - --tasks -t: get the connector's tasks list (requires -n)`, + It can get the list of all deployed connectors, or details about a single one.`, RunE: handleCmd, } @@ -57,10 +51,10 @@ func handleCmd(cmd *cobra.Command, args []string) error { func validateArgs() error { if connector == "" { - return errors.New("Please specify the target connector's name") + return errors.New("please specify the target connector's name") } if (status && config) || (status && tasks) || (config && tasks) { - return errors.New("More than one action were provided") + return errors.New("more than one action were provided") } return nil diff --git a/cli/cmd/helper.go b/cli/cmd/helper.go index 7861225..5789dba 100644 --- a/cli/cmd/helper.go +++ b/cli/cmd/helper.go @@ -15,13 +15,13 @@ func printResponse(response interface{}) error { return nil } -func getClient() *connectors.Client { +func getClient() connectors.HighLevelClient { client := connectors.NewClient(url) if verbose { - client = client.WithDebug() + client.SetDebug() } if SSLInsecure { - client = client.WithInsecureSSL() + client.SetInsecureSSL() } return client } diff --git a/cli/cmd/pause.go b/cli/cmd/pause.go index a742d5d..37334b7 100644 --- a/cli/cmd/pause.go +++ b/cli/cmd/pause.go @@ -23,12 +23,7 @@ import ( var pauseCmd = &cobra.Command{ Use: "pause", Short: "Pause a connector", - Long: `Suspend a connector without deleting it. - flags: - --url -u: url of the kafka-connect server - --connector -n: name of the target connector - --sync -y: execute synchronously`, - RunE: RunEPause, + RunE: RunEPause, } //RunEPause ... diff --git a/cli/cmd/resume.go b/cli/cmd/resume.go index fa37856..db81858 100644 --- a/cli/cmd/resume.go +++ b/cli/cmd/resume.go @@ -23,12 +23,7 @@ import ( var resumeCmd = &cobra.Command{ Use: "resume", Short: "Resume a connector", - Long: `Resume a paused connector. - flags: - --url -u: url of the kafka-connect server - --connector -n: name of the target connector - --sync -y: execute synchronously`, - RunE: RunEResume, + RunE: RunEResume, } //RunEResume ... diff --git a/cli/cmd/root.go b/cli/cmd/root.go index 40c19df..b66a534 100644 --- a/cli/cmd/root.go +++ b/cli/cmd/root.go @@ -24,7 +24,7 @@ import ( var ( url string connector string - file string + filePath string configString string sync bool status bool @@ -32,7 +32,6 @@ var ( tasks bool verbose bool SSLInsecure bool - output string ) var RootCmd = &cobra.Command{ diff --git a/cli/cmd/update.go b/cli/cmd/update.go index 616d1eb..9aaf5ea 100644 --- a/cli/cmd/update.go +++ b/cli/cmd/update.go @@ -36,14 +36,7 @@ var update updateCmdConfig var updateCmd = &cobra.Command{ Use: "update", Short: "Updater a connector", - Long: `Update a connector's configuration - flags: - --url -u: url of the kafka-connect server - --connector -n: name of the target connector - --file -f: path to the config file - --string -s: literal configuration string - --sync -y: execute synchronously`, - RunE: RunEUpdate, + RunE: RunEUpdate, } //RunEUpdate ... @@ -68,7 +61,7 @@ func RunEUpdate(cmd *cobra.Command, args []string) error { func getUpdateCmdConfig(cmd *cobra.Command) (map[string]interface{}, error) { config := map[string]interface{}{} - if cmd.Flag("file").Changed { + if cmd.Flag("path").Changed { fileReader, err := os.Open(update.file) if err != nil { return config, err @@ -85,7 +78,7 @@ func getUpdateCmdConfig(cmd *cobra.Command) (map[string]interface{}, error) { return config, err } } else { - return config, errors.New("neither file nor string was supplied") + return config, errors.New("neither input nor string was supplied") } return config, nil } @@ -93,8 +86,8 @@ func getUpdateCmdConfig(cmd *cobra.Command) (map[string]interface{}, error) { func init() { RootCmd.AddCommand(updateCmd) - updateCmd.PersistentFlags().StringVarP(&update.file, "file", "f", "", "path to the config file") - updateCmd.MarkFlagFilename("file") + updateCmd.PersistentFlags().StringVarP(&update.file, "path", "p", "", "path to the config file") + updateCmd.MarkFlagFilename("path") updateCmd.PersistentFlags().StringVarP(&update.configString, "string", "s", "", "JSON configuration string") updateCmd.PersistentFlags().StringVarP(&update.connector, "connector", "n", "", "name of the target connector") updateCmd.MarkFlagRequired("connector") diff --git a/lib/Makefile b/lib/Makefile index 4a34991..18ac407 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -8,7 +8,7 @@ install: .PHONY: test-integration test-integration: make rundep - go test -v `go list ./... | grep -v /vendor/` -tags=integration + go test -tags=integration -count=1 ./... .PHONY: rundep rundep: @@ -19,3 +19,15 @@ rundep: sleep 2; \ done @echo "up and running" + +MOCKERY_PATH := $(shell [ -z "$${GOBIN}" ] && echo $${GOPATH}/bin/mockery || echo $${GOBIN}/mockery; ) + +.PHONY: update-mocks +update-mocks: + go get github.com/vektra/mockery/... + ${MOCKERY_PATH} -inpkg -case "underscore" -recursive -all -note "NOTE: run 'make update-mocks' from this project top folder to update this file and generate new ones." + +.PHONY: test-unit +test-unit: + go test -tags=unit ./... + diff --git a/lib/connectors/connectors.go b/lib/connectors/base_client.go similarity index 52% rename from lib/connectors/connectors.go rename to lib/connectors/base_client.go index df9e823..9e044d9 100644 --- a/lib/connectors/connectors.go +++ b/lib/connectors/base_client.go @@ -1,12 +1,72 @@ package connectors import ( - "errors" + "crypto/tls" "fmt" + "gopkg.in/resty.v1" "strconv" "time" ) +// BaseClient implement the kafka-connect contract as a client +// handle retries on 409 response +type BaseClient interface { + GetAll() (GetAllConnectorsResponse, error) + GetConnector(req ConnectorRequest) (ConnectorResponse, error) + CreateConnector(req CreateConnectorRequest) (ConnectorResponse, error) + UpdateConnector(req CreateConnectorRequest) (ConnectorResponse, error) + DeleteConnector(req ConnectorRequest) (EmptyResponse, error) + GetConnectorConfig(req ConnectorRequest) (GetConnectorConfigResponse, error) + GetConnectorStatus(req ConnectorRequest) (GetConnectorStatusResponse, error) + RestartConnector(req ConnectorRequest) (EmptyResponse, error) + PauseConnector(req ConnectorRequest) (EmptyResponse, error) + ResumeConnector(req ConnectorRequest) (EmptyResponse, error) + GetAllTasks(req ConnectorRequest) (GetAllTasksResponse, error) + GetTaskStatus(req TaskRequest) (TaskStatusResponse, error) + RestartTask(req TaskRequest) (EmptyResponse, error) + + SetInsecureSSL() + SetDebug() +} + +type baseClient struct { + restClient *resty.Client +} + +func (c *baseClient) SetInsecureSSL() { + c.restClient.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}) +} + +func (c *baseClient) SetDebug() { + c.restClient.SetDebug(true) +} + +//ErrorResponse is generic error returned by kafka connect +type ErrorResponse struct { + ErrorCode int `json:"error_code,omitempty"` + Message string `json:"message,omitempty"` +} + +func (err ErrorResponse) Error() string { + return fmt.Sprintf("error code: %d , message: %s", err.ErrorCode, err.Message) +} + +func newBaseClient(url string) BaseClient { + restClient := resty.New(). + SetError(ErrorResponse{}). + SetHostURL(url). + SetHeader("Accept", "application/json"). + SetRetryCount(3). + SetTimeout(5 * time.Second). + AddRetryCondition(func(resp *resty.Response) (bool, error) { + return resp.StatusCode() == 409, nil + }) + + return &baseClient{restClient: restClient} +} + +// ------------- Connectors ------------ + //ConnectorRequest is generic request used when interacting with connector endpoint type ConnectorRequest struct { Name string `json:"name"` @@ -53,7 +113,7 @@ type GetConnectorStatusResponse struct { } //GetAll gets the list of all active connectors -func (c *Client) GetAll() (GetAllConnectorsResponse, error) { +func (c *baseClient) GetAll() (GetAllConnectorsResponse, error) { result := GetAllConnectorsResponse{} var connectors []string @@ -75,7 +135,7 @@ func (c *Client) GetAll() (GetAllConnectorsResponse, error) { } //GetConnector return information on specific connector -func (c Client) GetConnector(req ConnectorRequest) (ConnectorResponse, error) { +func (c *baseClient) GetConnector(req ConnectorRequest) (ConnectorResponse, error) { result := ConnectorResponse{} resp, err := c.restClient.NewRequest(). @@ -94,7 +154,7 @@ func (c Client) GetConnector(req ConnectorRequest) (ConnectorResponse, error) { } //CreateConnector create connector using specified config and name -func (c *Client) CreateConnector(req CreateConnectorRequest, sync bool) (ConnectorResponse, error) { +func (c *baseClient) CreateConnector(req CreateConnectorRequest) (ConnectorResponse, error) { result := ConnectorResponse{} resp, err := c.restClient.NewRequest(). @@ -110,23 +170,11 @@ func (c *Client) CreateConnector(req CreateConnectorRequest, sync bool) (Connect result.Code = resp.StatusCode() - if sync { - if !TryUntil( - func() bool { - resp, err := c.GetConnector(req.ConnectorRequest) - return err == nil && resp.Code == 200 - }, - 2*time.Minute, - ) { - return result, errors.New("timeout on creating connector sync") - } - } - return result, nil } //UpdateConnector update a connector config -func (c Client) UpdateConnector(req CreateConnectorRequest, sync bool) (ConnectorResponse, error) { +func (c *baseClient) UpdateConnector(req CreateConnectorRequest) (ConnectorResponse, error) { result := ConnectorResponse{} resp, err := c.restClient.NewRequest(). @@ -143,23 +191,11 @@ func (c Client) UpdateConnector(req CreateConnectorRequest, sync bool) (Connecto result.Code = resp.StatusCode() - if sync { - if !TryUntil( - func() bool { - upToDate, err := c.IsUpToDate(req.Name, req.Config) - return err == nil && upToDate - }, - 2*time.Minute, - ) { - return result, errors.New("timeout on creating connector sync") - } - } - return result, nil } //DeleteConnector delete a connector -func (c Client) DeleteConnector(req ConnectorRequest, sync bool) (EmptyResponse, error) { +func (c *baseClient) DeleteConnector(req ConnectorRequest) (EmptyResponse, error) { result := EmptyResponse{} resp, err := c.restClient.NewRequest(). @@ -175,23 +211,11 @@ func (c Client) DeleteConnector(req ConnectorRequest, sync bool) (EmptyResponse, result.Code = resp.StatusCode() - if sync { - if !TryUntil( - func() bool { - r, e := c.GetConnector(req) - return e == nil && r.Code == 404 - }, - 2*time.Minute, - ) { - return result, errors.New("timeout on deleting connector sync") - } - } - return result, nil } ////GetConnectorConfig return config of a connector -func (c Client) GetConnectorConfig(req ConnectorRequest) (GetConnectorConfigResponse, error) { +func (c *baseClient) GetConnectorConfig(req ConnectorRequest) (GetConnectorConfigResponse, error) { result := GetConnectorConfigResponse{} var config map[string]interface{} @@ -212,7 +236,7 @@ func (c Client) GetConnectorConfig(req ConnectorRequest) (GetConnectorConfigResp } //GetConnectorStatus return current status of connector -func (c Client) GetConnectorStatus(req ConnectorRequest) (GetConnectorStatusResponse, error) { +func (c *baseClient) GetConnectorStatus(req ConnectorRequest) (GetConnectorStatusResponse, error) { result := GetConnectorStatusResponse{} resp, err := c.restClient.NewRequest(). @@ -231,7 +255,7 @@ func (c Client) GetConnectorStatus(req ConnectorRequest) (GetConnectorStatusResp } //RestartConnector restart connector -func (c Client) RestartConnector(req ConnectorRequest) (EmptyResponse, error) { +func (c *baseClient) RestartConnector(req ConnectorRequest) (EmptyResponse, error) { result := EmptyResponse{} resp, err := c.restClient.NewRequest(). @@ -251,7 +275,7 @@ func (c Client) RestartConnector(req ConnectorRequest) (EmptyResponse, error) { //PauseConnector pause a running connector //asynchronous operation -func (c Client) PauseConnector(req ConnectorRequest, sync bool) (EmptyResponse, error) { +func (c *baseClient) PauseConnector(req ConnectorRequest) (EmptyResponse, error) { result := EmptyResponse{} resp, err := c.restClient.NewRequest(). @@ -267,23 +291,12 @@ func (c Client) PauseConnector(req ConnectorRequest, sync bool) (EmptyResponse, result.Code = resp.StatusCode() - if sync { - if !TryUntil( - func() bool { - resp, err := c.GetConnectorStatus(req) - return err == nil && resp.Code == 200 && resp.ConnectorStatus["state"] == "PAUSED" - }, - 2*time.Minute, - ) { - return result, errors.New("timeout on pausing connector sync") - } - } return result, nil } //ResumeConnector resume a paused connector //asynchronous operation -func (c Client) ResumeConnector(req ConnectorRequest, sync bool) (EmptyResponse, error) { +func (c *baseClient) ResumeConnector(req ConnectorRequest) (EmptyResponse, error) { result := EmptyResponse{} resp, err := c.restClient.NewRequest(). @@ -299,118 +312,104 @@ func (c Client) ResumeConnector(req ConnectorRequest, sync bool) (EmptyResponse, result.Code = resp.StatusCode() - if sync { - if !TryUntil( - func() bool { - resp, err := c.GetConnectorStatus(req) - return err == nil && resp.Code == 200 && resp.ConnectorStatus["state"] == "RUNNING" - }, - 2*time.Minute, - ) { - return result, errors.New("timeout on resuming connector sync") - } - } return result, nil } -//IsUpToDate checks if the given configuration is different from the deployed one. -//Returns true if they are the same -func (c Client) IsUpToDate(connector string, config map[string]interface{}) (bool, error) { - config["name"] = connector +// ----------- Tasks --------- + +//TaskRequest is generic request when interacting with task endpoint +type TaskRequest struct { + Connector string + TaskID int +} + +//GetAllTasksResponse is response to get all tasks of a specific endpoint +type GetAllTasksResponse struct { + Code int + Tasks []TaskDetails +} + +//TaskDetails is detail of a specific task on a specific endpoint +type TaskDetails struct { + ID TaskID `json:"id"` + Config map[string]interface{} `json:"config"` +} + +//TaskID identify a task and its connector +type TaskID struct { + Connector string `json:"connector"` + TaskID int `json:"task"` +} + +//TaskStatusResponse is response returned by get task status endpoint +type TaskStatusResponse struct { + Code int + Status TaskStatus +} - configResp, err := c.GetConnectorConfig(ConnectorRequest{Name: connector}) +//TaskStatus define task status +type TaskStatus struct { + ID int `json:"id"` + State string `json:"state"` + WorkerID string `json:"worker_id"` + Trace string `json:"trace,omitempty"` +} + +//GetAllTasks return list of running task +func (c *baseClient) GetAllTasks(req ConnectorRequest) (GetAllTasksResponse, error) { + var result GetAllTasksResponse + + resp, err := c.restClient.NewRequest(). + SetResult(&result.Tasks). + SetPathParams(map[string]string{"name": req.Name}). + Get("connectors/{name}/tasks") if err != nil { - return false, err + return GetAllTasksResponse{}, err } - if configResp.Code == 404 { - return false, nil - } - if configResp.Code >= 400 { - return false, errors.New(fmt.Sprintf("status code: %d", configResp.Code)) + if resp.Error() != nil { + return GetAllTasksResponse{}, resp.Error().(*ErrorResponse) } - if len(configResp.Config) != len(config) { - return false, nil - } - for key, value := range configResp.Config { - if convertConfigValueToString(config[key]) != convertConfigValueToString(value) { - return false, nil - } - } - return true, nil -} - -// Because trying to compare the same field on 2 different config may return false negative if one is encoded as a string and not the other -func convertConfigValueToString(value interface{}) string { - switch v := value.(type) { - case string: - return v - case int: - return strconv.Itoa(v) - default: - return "" - } + result.Code = resp.StatusCode() + return result, nil } -// TryUntil repeats exec until it return true or timeout is reached -// TryUntil itself return true if `exec` has return true (success), false if timeout (failure) -func TryUntil(exec func() bool, limit time.Duration) bool { - timeLimit := time.After(limit) - - run := true - defer func() { run = false }() - success := make(chan bool) - go func() { - for run { - if exec() { - success <- true - return - } - time.Sleep(1 * time.Second) - } - }() - - select { - case <-timeLimit: - return false - case <-success: - return true - } -} +//GetTaskStatus return current status of task +func (c *baseClient) GetTaskStatus(req TaskRequest) (TaskStatusResponse, error) { + var result TaskStatusResponse -//DeployConnector checks if the configuration changed before deploying. -//It does nothing if it is the same -func (c Client) DeployConnector(req CreateConnectorRequest) (err error) { - existingConnector, err := c.GetConnector(ConnectorRequest{Name: req.Name}) + resp, err := c.restClient.NewRequest(). + SetResult(&result). + SetPathParams(map[string]string{"name": req.Connector, "task_id": strconv.Itoa(req.TaskID)}). + Get("connectors/{name}/tasks/{task_id}/status") if err != nil { - return err + return TaskStatusResponse{}, err } - - if existingConnector.Code != 404 { - var upToDate bool - upToDate, err = c.IsUpToDate(req.Name, req.Config) - if err != nil { - return err - } - // Connector is already up to date, stop there and return ok - if upToDate { - return nil - } - - _, err = c.PauseConnector(ConnectorRequest{Name: req.Name}, true) - if err != nil { - return err - } - - defer func() { - _, err = c.ResumeConnector(ConnectorRequest{Name: req.Name}, true) - }() + if resp.Error() != nil && resp.StatusCode() != 404 { + return TaskStatusResponse{}, resp.Error().(*ErrorResponse) } - _, err = c.UpdateConnector(req, true) + result.Code = resp.StatusCode() + + return result, nil +} + +//RestartTask try to restart task +func (c *baseClient) RestartTask(req TaskRequest) (EmptyResponse, error) { + var result EmptyResponse + + resp, err := c.restClient.NewRequest(). + SetResult(&result). + SetPathParams(map[string]string{"name": req.Connector, "task_id": strconv.Itoa(req.TaskID)}). + Post("connectors/{name}/tasks/{task_id}/restart") if err != nil { - return err + return EmptyResponse{}, err + } + if resp.Error() != nil { + return EmptyResponse{}, resp.Error().(*ErrorResponse) } - return err + result.Code = resp.StatusCode() + + return result, nil } diff --git a/lib/connectors/client.go b/lib/connectors/client.go deleted file mode 100644 index 78cdfa5..0000000 --- a/lib/connectors/client.go +++ /dev/null @@ -1,46 +0,0 @@ -package connectors - -import ( - "crypto/tls" - "fmt" - "gopkg.in/resty.v1" - "time" -) - -//Client represents the kafka connect access configuration -type Client struct { - restClient *resty.Client -} - -//ErrorResponse is generic error returned by kafka connect -type ErrorResponse struct { - ErrorCode int `json:"error_code,omitempty"` - Message string `json:"message,omitempty"` -} - -func (err ErrorResponse) Error() string { - return fmt.Sprintf("error code: %d , message: %s", err.ErrorCode, err.Message) -} - -//NewClient generates a new client -func NewClient(url string) *Client { - restClient := resty.New(). - SetError(ErrorResponse{}). - SetHostURL(url). - SetHeader("Accept", "application/json"). - SetRetryCount(3). - SetTimeout(5 * time.Second). - AddRetryCondition(func(resp *resty.Response) (bool, error) { - return resp.StatusCode() == 409, nil - }) - - return &Client{restClient: restClient} -} - -func (c *Client) WithInsecureSSL() *Client { - return &Client{restClient: c.restClient.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true})} -} - -func (c *Client) WithDebug() *Client { - return &Client{restClient: c.restClient.SetDebug(true)} -} diff --git a/lib/connectors/connector_integration_test.go b/lib/connectors/connector_integration_test.go index b16af78..ddce248 100644 --- a/lib/connectors/connector_integration_test.go +++ b/lib/connectors/connector_integration_test.go @@ -405,3 +405,57 @@ func TestDeployConnector(t *testing.T) { assert.Nil(t, err) assert.True(t, isUpToDate) } + +func TestDeployMultipleConnectors(t *testing.T) { + config := map[string]interface{}{ + "connector.class": "FileStreamSource", + "file": testFile, + "topic": "connect-test", + } + + req := []CreateConnectorRequest{ + { + ConnectorRequest: ConnectorRequest{Name: "test-deploy-multiple-connectors-1"}, + Config: config, + }, + { + ConnectorRequest: ConnectorRequest{Name: "test-deploy-multiple-connectors-2"}, + Config: config, + }, + { + ConnectorRequest: ConnectorRequest{Name: "test-deploy-multiple-connectors-3"}, + Config: config, + }, + { + ConnectorRequest: ConnectorRequest{Name: "test-deploy-multiple-connectors-4"}, + Config: config, + }, + } + + client := NewClient(hostConnect) + err := client.DeployMultipleConnector(req) + + assert.Nil(t, err) + + // use IsUpToDate to check sync worked (force get actual config for server rather than what was returned on update call) + { + isUpToDate, err := client.IsUpToDate("test-deploy-multiple-connectors-1", config) + assert.Nil(t, err) + assert.True(t, isUpToDate) + } + { + isUpToDate, err := client.IsUpToDate("test-deploy-multiple-connectors-2", config) + assert.Nil(t, err) + assert.True(t, isUpToDate) + } + { + isUpToDate, err := client.IsUpToDate("test-deploy-multiple-connectors-3", config) + assert.Nil(t, err) + assert.True(t, isUpToDate) + } + { + isUpToDate, err := client.IsUpToDate("test-deploy-multiple-connectors-4", config) + assert.Nil(t, err) + assert.True(t, isUpToDate) + } +} diff --git a/lib/connectors/highlevel_client.go b/lib/connectors/highlevel_client.go new file mode 100644 index 0000000..ac9e6d1 --- /dev/null +++ b/lib/connectors/highlevel_client.go @@ -0,0 +1,333 @@ +package connectors + +import ( + "fmt" + "sync" + "time" + + "github.com/hashicorp/go-multierror" + "github.com/pkg/errors" +) + +// HighLevelClient support all function of kafka-connect API + some more features +type HighLevelClient interface { + // kafka-connect api + GetAll() (GetAllConnectorsResponse, error) + GetConnector(req ConnectorRequest) (ConnectorResponse, error) + CreateConnector(req CreateConnectorRequest, sync bool) (ConnectorResponse, error) + UpdateConnector(req CreateConnectorRequest, sync bool) (ConnectorResponse, error) + DeleteConnector(req ConnectorRequest, sync bool) (EmptyResponse, error) + GetConnectorConfig(req ConnectorRequest) (GetConnectorConfigResponse, error) + GetConnectorStatus(req ConnectorRequest) (GetConnectorStatusResponse, error) + RestartConnector(req ConnectorRequest) (EmptyResponse, error) + PauseConnector(req ConnectorRequest, sync bool) (EmptyResponse, error) + ResumeConnector(req ConnectorRequest, sync bool) (EmptyResponse, error) + GetAllTasks(req ConnectorRequest) (GetAllTasksResponse, error) + GetTaskStatus(req TaskRequest) (TaskStatusResponse, error) + RestartTask(req TaskRequest) (EmptyResponse, error) + + // custom features, mostly composition of previous ones + IsUpToDate(connector string, config map[string]interface{}) (bool, error) + DeployConnector(req CreateConnectorRequest) (err error) + DeployMultipleConnector(connectors []CreateConnectorRequest) (err error) + SetInsecureSSL() + SetDebug() +} + +type highLevelClient struct { + client BaseClient + maxParallelRequest int +} + +//NewClient generates a new client +func NewClient(url string) HighLevelClient { + return &highLevelClient{client: newBaseClient(url), maxParallelRequest: 3} +} + +func (c *highLevelClient) SetInsecureSSL() { + c.client.SetInsecureSSL() +} + +func (c *highLevelClient) SetDebug() { + c.client.SetDebug() +} + +//GetAll gets the list of all active connectors +func (c *highLevelClient) GetAll() (GetAllConnectorsResponse, error) { + return c.client.GetAll() +} + +//GetConnector return information on specific connector +func (c *highLevelClient) GetConnector(req ConnectorRequest) (ConnectorResponse, error) { + return c.client.GetConnector(req) +} + +//CreateConnector create connector using specified config and name +func (c *highLevelClient) CreateConnector(req CreateConnectorRequest, sync bool) (ConnectorResponse, error) { + result, err := c.client.CreateConnector(req) + if err != nil { + return result, err + } + + if sync { + if !tryUntil( + func() bool { + resp, err := c.GetConnector(req.ConnectorRequest) + return err == nil && resp.Code == 200 + }, + 2*time.Minute, + ) { + return result, errors.New("timeout on creating connector sync") + } + } + + return result, nil +} + +//UpdateConnector update a connector config +func (c *highLevelClient) UpdateConnector(req CreateConnectorRequest, sync bool) (ConnectorResponse, error) { + result, err := c.client.UpdateConnector(req) + if err != nil { + return result, err + } + + if sync { + if !tryUntil( + func() bool { + upToDate, err := c.IsUpToDate(req.Name, req.Config) + return err == nil && upToDate + }, + 2*time.Minute, + ) { + return result, errors.New("timeout on creating connector sync") + } + } + + return result, nil +} + +//DeleteConnector delete a connector +func (c *highLevelClient) DeleteConnector(req ConnectorRequest, sync bool) (EmptyResponse, error) { + result, err := c.client.DeleteConnector(req) + if err != nil { + return result, err + } + + if sync { + if !tryUntil( + func() bool { + r, e := c.GetConnector(req) + return e == nil && r.Code == 404 + }, + 2*time.Minute, + ) { + return result, errors.New("timeout on deleting connector sync") + } + } + + return result, nil +} + +////GetConnectorConfig return config of a connector +func (c *highLevelClient) GetConnectorConfig(req ConnectorRequest) (GetConnectorConfigResponse, error) { + return c.client.GetConnectorConfig(req) +} + +//GetConnectorStatus return current status of connector +func (c *highLevelClient) GetConnectorStatus(req ConnectorRequest) (GetConnectorStatusResponse, error) { + return c.client.GetConnectorStatus(req) +} + +//RestartConnector restart connector +func (c *highLevelClient) RestartConnector(req ConnectorRequest) (EmptyResponse, error) { + return c.client.RestartConnector(req) +} + +//PauseConnector pause a running connector +//asynchronous operation +func (c *highLevelClient) PauseConnector(req ConnectorRequest, sync bool) (EmptyResponse, error) { + result, err := c.client.PauseConnector(req) + if err != nil { + return result, err + } + + if sync { + if !tryUntil( + func() bool { + resp, err := c.GetConnectorStatus(req) + return err == nil && resp.Code == 200 && resp.ConnectorStatus["state"] == "PAUSED" + }, + 2*time.Minute, + ) { + return result, errors.New("timeout on pausing connector sync") + } + } + return result, nil +} + +//ResumeConnector resume a paused connector +//asynchronous operation +func (c *highLevelClient) ResumeConnector(req ConnectorRequest, sync bool) (EmptyResponse, error) { + result, err := c.client.ResumeConnector(req) + if err != nil { + return result, err + } + + if sync { + if !tryUntil( + func() bool { + resp, err := c.GetConnectorStatus(req) + return err == nil && resp.Code == 200 && resp.ConnectorStatus["state"] == "RUNNING" + }, + 2*time.Minute, + ) { + return result, errors.New("timeout on resuming connector sync") + } + } + return result, nil +} + +//IsUpToDate checks if the given configuration is different from the deployed one. +//Returns true if they are the same +func (c *highLevelClient) IsUpToDate(connector string, config map[string]interface{}) (bool, error) { + // copy the map to safely interact with it + // we are going to need to add connector name to be able to exact match + copyConfig := make(map[string]interface{}, len(config)) + for key, value := range config { + copyConfig[key] = value + } + + copyConfig["name"] = connector + + configResp, err := c.GetConnectorConfig(ConnectorRequest{Name: connector}) + if err != nil { + return false, err + } + if configResp.Code == 404 { + return false, nil + } + if configResp.Code >= 400 { + return false, errors.New(fmt.Sprintf("status code: %d", configResp.Code)) + } + + if len(configResp.Config) != len(copyConfig) { + return false, nil + } + for key, value := range configResp.Config { + if convertConfigValueToString(copyConfig[key]) != convertConfigValueToString(value) { + return false, nil + } + } + return true, nil +} + +// Because trying to compare the same field on 2 different config may return false negative if one is encoded as a string and not the other +func convertConfigValueToString(value interface{}) string { + return fmt.Sprintf("%v", value) +} + +// tryUntil repeats exec until it return true or timeout is reached +// tryUntil itself return true if `exec` has return true (success), false if timeout (failure) +func tryUntil(exec func() bool, limit time.Duration) bool { + timeLimit := time.After(limit) + + run := true + defer func() { run = false }() + success := make(chan bool) + go func() { + for run { + if exec() { + success <- true + return + } + time.Sleep(1 * time.Second) + } + }() + + select { + case <-timeLimit: + return false + case <-success: + return true + } +} + +//DeployConnector checks if the configuration changed before deploying. +//It does nothing if it is the same +func (c *highLevelClient) DeployConnector(req CreateConnectorRequest) (err error) { + existingConnector, err := c.GetConnector(ConnectorRequest{Name: req.Name}) + if err != nil { + return err + } + + if existingConnector.Code != 404 { + var upToDate bool + upToDate, err = c.IsUpToDate(req.Name, req.Config) + if err != nil { + return err + } + // Connector is already up to date, stop there and return ok + if upToDate { + return nil + } + + _, err = c.PauseConnector(ConnectorRequest{Name: req.Name}, true) + if err != nil { + return err + } + + defer func() { + _, err = c.ResumeConnector(ConnectorRequest{Name: req.Name}, true) + }() + } + + _, err = c.UpdateConnector(req, true) + if err != nil { + return err + } + + return err +} + +func (c *highLevelClient) DeployMultipleConnector(connectors []CreateConnectorRequest) (err error) { + errSync := new(sync.Mutex) + // Channel is used only to limit number of parallel request + throttleCh := make(chan interface{}, c.maxParallelRequest) + + for _, connector := range connectors { + throttleCh <- struct{}{} + go func(req CreateConnectorRequest) { + defer func() { <-throttleCh }() + newErr := c.DeployConnector(req) + if newErr != nil { + errSync.Lock() + defer errSync.Unlock() + multierror.Append(err, newErr) + } + }(connector) + } + + // wait for the end + for i := 0; i < c.maxParallelRequest; i++ { + throttleCh <- struct{}{} + } + + return err +} + +// --------------- tasks --------------------- + +//GetAllTasks return list of running task +func (c *highLevelClient) GetAllTasks(req ConnectorRequest) (GetAllTasksResponse, error) { + return c.client.GetAllTasks(req) +} + +//GetTaskStatus return current status of task +func (c *highLevelClient) GetTaskStatus(req TaskRequest) (TaskStatusResponse, error) { + return c.client.GetTaskStatus(req) +} + +//RestartTask try to restart task +func (c *highLevelClient) RestartTask(req TaskRequest) (EmptyResponse, error) { + return c.client.RestartTask(req) +} diff --git a/lib/connectors/highlevel_client_test.go b/lib/connectors/highlevel_client_test.go new file mode 100644 index 0000000..a09fdb0 --- /dev/null +++ b/lib/connectors/highlevel_client_test.go @@ -0,0 +1,192 @@ +//+build !integration + +package connectors + +import ( + "reflect" + "sync" + "testing" + "time" + + "bou.ke/monkey" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func Test_IsUpToDate_Should_Be_True(t *testing.T) { + configOnline := map[string]interface{}{ + "name": "test1", + "param1": 2, + "param2": "abc", + "param3": "3", + } + + configLocal := map[string]interface{}{ + "param1": 2, + "param2": "abc", + "param3": 3, + } + + mockBaseClient := &MockBaseClient{} + mockBaseClient.On("GetConnectorConfig", mock.Anything). + Return(GetConnectorConfigResponse{Config: configOnline}, nil) + + client := &highLevelClient{client: mockBaseClient} + + isUpToDate, err := client.IsUpToDate("test1", configLocal) + + assert.NoError(t, err) + assert.True(t, isUpToDate) +} + +func Test_IsUpToDate_Should_Be_False(t *testing.T) { + configOnline := map[string]interface{}{ + "name": "test1", + "param1": 3, + "param2": "abc", + "param3": "3", + } + + configLocal := map[string]interface{}{ + "param1": 2, + "param2": "abc", + "param3": 3, + } + + mockBaseClient := &MockBaseClient{} + mockBaseClient.On("GetConnectorConfig", mock.Anything). + Return(GetConnectorConfigResponse{Config: configOnline}, nil) + + client := &highLevelClient{client: mockBaseClient} + + isUpToDate, err := client.IsUpToDate("test1", configLocal) + + assert.NoError(t, err) + assert.False(t, isUpToDate) +} + +func Test_tryUntil_When_Success(t *testing.T) { + result := tryUntil( + func() bool { + return true + }, + 100*time.Millisecond, + ) + assert.True(t, result) +} + +func Test_tryUntil_When_Timeout(t *testing.T) { + result := tryUntil( + func() bool { + time.Sleep(200 * time.Millisecond) + return true + }, + 100*time.Millisecond, + ) + assert.False(t, result) +} + +func Test_DeployConnector_When_Already_Up_To_Date(t *testing.T) { + configOnline := map[string]interface{}{ + "name": "test1", + "param1": 2, + "param2": "abc", + "param3": "3", + } + configLocal := map[string]interface{}{ + "param1": 2, + "param2": "abc", + "param3": 3, + } + + mockBaseClient := &MockBaseClient{} + mockBaseClient.On("GetConnector", mock.Anything). + Return(ConnectorResponse{Name: "test1", Config: configOnline}, nil) + //TODO there shouldn't be a need to make both these call + mockBaseClient.On("GetConnectorConfig", mock.Anything). + Return(GetConnectorConfigResponse{Config: configOnline}, nil) + // note we don't mock the update part because it should not be called + + client := &highLevelClient{client: mockBaseClient} + err := client.DeployConnector(CreateConnectorRequest{ + ConnectorRequest: ConnectorRequest{"test1"}, + Config: configLocal, + }) + + assert.NoError(t, err) + mockBaseClient.AssertExpectations(t) +} + +func Test_DeployConnector_Ok(t *testing.T) { + configOnline := map[string]interface{}{ + "name": "test1", + "param1": 2, + } + configLocal := map[string]interface{}{ + "param1": 3, + } + + // expected steps: + // - get connector info + // - compare online config and local config + // - pause online connector + // - loop get connector status until it is paused + // - update connector + // - loop get connector config until it match deployed + // - resume connector + // - loop get connector status until it is running + + mockBaseClient := &MockBaseClient{} + mockBaseClient.On("GetConnector", mock.Anything). + Return(ConnectorResponse{Name: "test1", Config: configOnline}, nil) + mockBaseClient.On("GetConnectorConfig", mock.Anything). + Return(GetConnectorConfigResponse{Config: configOnline}, nil).Once() + mockBaseClient.On("PauseConnector", mock.Anything, mock.Anything). + Return(EmptyResponse{}, nil) + mockBaseClient.On("GetConnectorStatus", mock.Anything). + Return(GetConnectorStatusResponse{EmptyResponse: EmptyResponse{Code: 200}, ConnectorStatus: map[string]string{"state": "PAUSED"}}, nil).Once() + mockBaseClient.On("ResumeConnector", mock.Anything, mock.Anything). + Return(EmptyResponse{}, nil) + mockBaseClient.On("GetConnectorStatus", mock.Anything). + Return(GetConnectorStatusResponse{EmptyResponse: EmptyResponse{Code: 200}, ConnectorStatus: map[string]string{"state": "RUNNING"}}, nil).Once() + mockBaseClient.On("UpdateConnector", mock.Anything). + Return(ConnectorResponse{}, nil) + mockBaseClient.On("GetConnectorConfig", mock.Anything). + Return(GetConnectorConfigResponse{Config: map[string]interface{}{"name": "test1", "param1": 3}}, nil).Once() + + client := &highLevelClient{client: mockBaseClient} + err := client.DeployConnector(CreateConnectorRequest{ + ConnectorRequest: ConnectorRequest{"test1"}, + Config: configLocal, + }) + + assert.NoError(t, err) + mockBaseClient.AssertExpectations(t) +} + +func Test_DeployMultipleConnector_Ok(t *testing.T) { + client := &highLevelClient{client: &MockBaseClient{}, maxParallelRequest: 2} + + lock := &sync.Mutex{} + received := map[string]interface{}{} + + // Don't want to mock every baseClient call, so I am going the lazy way. + patch := monkey.PatchInstanceMethod(reflect.TypeOf(client), "DeployConnector", func(_ *highLevelClient, req CreateConnectorRequest) (err error) { + lock.Lock() + defer lock.Unlock() + received[req.Name] = true + return nil + }) + defer patch.Restore() + + err := client.DeployMultipleConnector([]CreateConnectorRequest{ + {ConnectorRequest: ConnectorRequest{Name: "test1"}}, + {ConnectorRequest: ConnectorRequest{Name: "test2"}}, + {ConnectorRequest: ConnectorRequest{Name: "test3"}}, + {ConnectorRequest: ConnectorRequest{Name: "test4"}}, + {ConnectorRequest: ConnectorRequest{Name: "test5"}}, + }) + + assert.Equal(t, map[string]interface{}{"test1": true, "test2": true, "test3": true, "test4": true, "test5": true}, received) + assert.NoError(t, err) +} diff --git a/lib/connectors/mock_base_client.go b/lib/connectors/mock_base_client.go new file mode 100644 index 0000000..139f81f --- /dev/null +++ b/lib/connectors/mock_base_client.go @@ -0,0 +1,295 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +// NOTE: run 'make update-mocks' from this project top folder to update this file and generate new ones. + +package connectors + +import mock "github.com/stretchr/testify/mock" + +// MockBaseClient is an autogenerated mock type for the BaseClient type +type MockBaseClient struct { + mock.Mock +} + +// CreateConnector provides a mock function with given fields: req +func (_m *MockBaseClient) CreateConnector(req CreateConnectorRequest) (ConnectorResponse, error) { + ret := _m.Called(req) + + var r0 ConnectorResponse + if rf, ok := ret.Get(0).(func(CreateConnectorRequest) ConnectorResponse); ok { + r0 = rf(req) + } else { + r0 = ret.Get(0).(ConnectorResponse) + } + + var r1 error + if rf, ok := ret.Get(1).(func(CreateConnectorRequest) error); ok { + r1 = rf(req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// DeleteConnector provides a mock function with given fields: req +func (_m *MockBaseClient) DeleteConnector(req ConnectorRequest) (EmptyResponse, error) { + ret := _m.Called(req) + + var r0 EmptyResponse + if rf, ok := ret.Get(0).(func(ConnectorRequest) EmptyResponse); ok { + r0 = rf(req) + } else { + r0 = ret.Get(0).(EmptyResponse) + } + + var r1 error + if rf, ok := ret.Get(1).(func(ConnectorRequest) error); ok { + r1 = rf(req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetAll provides a mock function with given fields: +func (_m *MockBaseClient) GetAll() (GetAllConnectorsResponse, error) { + ret := _m.Called() + + var r0 GetAllConnectorsResponse + if rf, ok := ret.Get(0).(func() GetAllConnectorsResponse); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(GetAllConnectorsResponse) + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetAllTasks provides a mock function with given fields: req +func (_m *MockBaseClient) GetAllTasks(req ConnectorRequest) (GetAllTasksResponse, error) { + ret := _m.Called(req) + + var r0 GetAllTasksResponse + if rf, ok := ret.Get(0).(func(ConnectorRequest) GetAllTasksResponse); ok { + r0 = rf(req) + } else { + r0 = ret.Get(0).(GetAllTasksResponse) + } + + var r1 error + if rf, ok := ret.Get(1).(func(ConnectorRequest) error); ok { + r1 = rf(req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetConnector provides a mock function with given fields: req +func (_m *MockBaseClient) GetConnector(req ConnectorRequest) (ConnectorResponse, error) { + ret := _m.Called(req) + + var r0 ConnectorResponse + if rf, ok := ret.Get(0).(func(ConnectorRequest) ConnectorResponse); ok { + r0 = rf(req) + } else { + r0 = ret.Get(0).(ConnectorResponse) + } + + var r1 error + if rf, ok := ret.Get(1).(func(ConnectorRequest) error); ok { + r1 = rf(req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetConnectorConfig provides a mock function with given fields: req +func (_m *MockBaseClient) GetConnectorConfig(req ConnectorRequest) (GetConnectorConfigResponse, error) { + ret := _m.Called(req) + + var r0 GetConnectorConfigResponse + if rf, ok := ret.Get(0).(func(ConnectorRequest) GetConnectorConfigResponse); ok { + r0 = rf(req) + } else { + r0 = ret.Get(0).(GetConnectorConfigResponse) + } + + var r1 error + if rf, ok := ret.Get(1).(func(ConnectorRequest) error); ok { + r1 = rf(req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetConnectorStatus provides a mock function with given fields: req +func (_m *MockBaseClient) GetConnectorStatus(req ConnectorRequest) (GetConnectorStatusResponse, error) { + ret := _m.Called(req) + + var r0 GetConnectorStatusResponse + if rf, ok := ret.Get(0).(func(ConnectorRequest) GetConnectorStatusResponse); ok { + r0 = rf(req) + } else { + r0 = ret.Get(0).(GetConnectorStatusResponse) + } + + var r1 error + if rf, ok := ret.Get(1).(func(ConnectorRequest) error); ok { + r1 = rf(req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetTaskStatus provides a mock function with given fields: req +func (_m *MockBaseClient) GetTaskStatus(req TaskRequest) (TaskStatusResponse, error) { + ret := _m.Called(req) + + var r0 TaskStatusResponse + if rf, ok := ret.Get(0).(func(TaskRequest) TaskStatusResponse); ok { + r0 = rf(req) + } else { + r0 = ret.Get(0).(TaskStatusResponse) + } + + var r1 error + if rf, ok := ret.Get(1).(func(TaskRequest) error); ok { + r1 = rf(req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// PauseConnector provides a mock function with given fields: req +func (_m *MockBaseClient) PauseConnector(req ConnectorRequest) (EmptyResponse, error) { + ret := _m.Called(req) + + var r0 EmptyResponse + if rf, ok := ret.Get(0).(func(ConnectorRequest) EmptyResponse); ok { + r0 = rf(req) + } else { + r0 = ret.Get(0).(EmptyResponse) + } + + var r1 error + if rf, ok := ret.Get(1).(func(ConnectorRequest) error); ok { + r1 = rf(req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// RestartConnector provides a mock function with given fields: req +func (_m *MockBaseClient) RestartConnector(req ConnectorRequest) (EmptyResponse, error) { + ret := _m.Called(req) + + var r0 EmptyResponse + if rf, ok := ret.Get(0).(func(ConnectorRequest) EmptyResponse); ok { + r0 = rf(req) + } else { + r0 = ret.Get(0).(EmptyResponse) + } + + var r1 error + if rf, ok := ret.Get(1).(func(ConnectorRequest) error); ok { + r1 = rf(req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// RestartTask provides a mock function with given fields: req +func (_m *MockBaseClient) RestartTask(req TaskRequest) (EmptyResponse, error) { + ret := _m.Called(req) + + var r0 EmptyResponse + if rf, ok := ret.Get(0).(func(TaskRequest) EmptyResponse); ok { + r0 = rf(req) + } else { + r0 = ret.Get(0).(EmptyResponse) + } + + var r1 error + if rf, ok := ret.Get(1).(func(TaskRequest) error); ok { + r1 = rf(req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ResumeConnector provides a mock function with given fields: req +func (_m *MockBaseClient) ResumeConnector(req ConnectorRequest) (EmptyResponse, error) { + ret := _m.Called(req) + + var r0 EmptyResponse + if rf, ok := ret.Get(0).(func(ConnectorRequest) EmptyResponse); ok { + r0 = rf(req) + } else { + r0 = ret.Get(0).(EmptyResponse) + } + + var r1 error + if rf, ok := ret.Get(1).(func(ConnectorRequest) error); ok { + r1 = rf(req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// SetDebug provides a mock function with given fields: +func (_m *MockBaseClient) SetDebug() { + _m.Called() +} + +// SetInsecureSSL provides a mock function with given fields: +func (_m *MockBaseClient) SetInsecureSSL() { + _m.Called() +} + +// UpdateConnector provides a mock function with given fields: req +func (_m *MockBaseClient) UpdateConnector(req CreateConnectorRequest) (ConnectorResponse, error) { + ret := _m.Called(req) + + var r0 ConnectorResponse + if rf, ok := ret.Get(0).(func(CreateConnectorRequest) ConnectorResponse); ok { + r0 = rf(req) + } else { + r0 = ret.Get(0).(ConnectorResponse) + } + + var r1 error + if rf, ok := ret.Get(1).(func(CreateConnectorRequest) error); ok { + r1 = rf(req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/lib/connectors/mock_high_level_client.go b/lib/connectors/mock_high_level_client.go new file mode 100644 index 0000000..412d5ad --- /dev/null +++ b/lib/connectors/mock_high_level_client.go @@ -0,0 +1,344 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +// NOTE: run 'make update-mocks' from this project top folder to update this file and generate new ones. + +package connectors + +import mock "github.com/stretchr/testify/mock" + +// MockHighLevelClient is an autogenerated mock type for the HighLevelClient type +type MockHighLevelClient struct { + mock.Mock +} + +// CreateConnector provides a mock function with given fields: req, sync +func (_m *MockHighLevelClient) CreateConnector(req CreateConnectorRequest, sync bool) (ConnectorResponse, error) { + ret := _m.Called(req, sync) + + var r0 ConnectorResponse + if rf, ok := ret.Get(0).(func(CreateConnectorRequest, bool) ConnectorResponse); ok { + r0 = rf(req, sync) + } else { + r0 = ret.Get(0).(ConnectorResponse) + } + + var r1 error + if rf, ok := ret.Get(1).(func(CreateConnectorRequest, bool) error); ok { + r1 = rf(req, sync) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// DeleteConnector provides a mock function with given fields: req, sync +func (_m *MockHighLevelClient) DeleteConnector(req ConnectorRequest, sync bool) (EmptyResponse, error) { + ret := _m.Called(req, sync) + + var r0 EmptyResponse + if rf, ok := ret.Get(0).(func(ConnectorRequest, bool) EmptyResponse); ok { + r0 = rf(req, sync) + } else { + r0 = ret.Get(0).(EmptyResponse) + } + + var r1 error + if rf, ok := ret.Get(1).(func(ConnectorRequest, bool) error); ok { + r1 = rf(req, sync) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// DeployConnector provides a mock function with given fields: req +func (_m *MockHighLevelClient) DeployConnector(req CreateConnectorRequest) error { + ret := _m.Called(req) + + var r0 error + if rf, ok := ret.Get(0).(func(CreateConnectorRequest) error); ok { + r0 = rf(req) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// DeployMultipleConnector provides a mock function with given fields: connectors +func (_m *MockHighLevelClient) DeployMultipleConnector(connectors []CreateConnectorRequest) error { + ret := _m.Called(connectors) + + var r0 error + if rf, ok := ret.Get(0).(func([]CreateConnectorRequest) error); ok { + r0 = rf(connectors) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// GetAll provides a mock function with given fields: +func (_m *MockHighLevelClient) GetAll() (GetAllConnectorsResponse, error) { + ret := _m.Called() + + var r0 GetAllConnectorsResponse + if rf, ok := ret.Get(0).(func() GetAllConnectorsResponse); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(GetAllConnectorsResponse) + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetAllTasks provides a mock function with given fields: req +func (_m *MockHighLevelClient) GetAllTasks(req ConnectorRequest) (GetAllTasksResponse, error) { + ret := _m.Called(req) + + var r0 GetAllTasksResponse + if rf, ok := ret.Get(0).(func(ConnectorRequest) GetAllTasksResponse); ok { + r0 = rf(req) + } else { + r0 = ret.Get(0).(GetAllTasksResponse) + } + + var r1 error + if rf, ok := ret.Get(1).(func(ConnectorRequest) error); ok { + r1 = rf(req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetConnector provides a mock function with given fields: req +func (_m *MockHighLevelClient) GetConnector(req ConnectorRequest) (ConnectorResponse, error) { + ret := _m.Called(req) + + var r0 ConnectorResponse + if rf, ok := ret.Get(0).(func(ConnectorRequest) ConnectorResponse); ok { + r0 = rf(req) + } else { + r0 = ret.Get(0).(ConnectorResponse) + } + + var r1 error + if rf, ok := ret.Get(1).(func(ConnectorRequest) error); ok { + r1 = rf(req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetConnectorConfig provides a mock function with given fields: req +func (_m *MockHighLevelClient) GetConnectorConfig(req ConnectorRequest) (GetConnectorConfigResponse, error) { + ret := _m.Called(req) + + var r0 GetConnectorConfigResponse + if rf, ok := ret.Get(0).(func(ConnectorRequest) GetConnectorConfigResponse); ok { + r0 = rf(req) + } else { + r0 = ret.Get(0).(GetConnectorConfigResponse) + } + + var r1 error + if rf, ok := ret.Get(1).(func(ConnectorRequest) error); ok { + r1 = rf(req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetConnectorStatus provides a mock function with given fields: req +func (_m *MockHighLevelClient) GetConnectorStatus(req ConnectorRequest) (GetConnectorStatusResponse, error) { + ret := _m.Called(req) + + var r0 GetConnectorStatusResponse + if rf, ok := ret.Get(0).(func(ConnectorRequest) GetConnectorStatusResponse); ok { + r0 = rf(req) + } else { + r0 = ret.Get(0).(GetConnectorStatusResponse) + } + + var r1 error + if rf, ok := ret.Get(1).(func(ConnectorRequest) error); ok { + r1 = rf(req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetTaskStatus provides a mock function with given fields: req +func (_m *MockHighLevelClient) GetTaskStatus(req TaskRequest) (TaskStatusResponse, error) { + ret := _m.Called(req) + + var r0 TaskStatusResponse + if rf, ok := ret.Get(0).(func(TaskRequest) TaskStatusResponse); ok { + r0 = rf(req) + } else { + r0 = ret.Get(0).(TaskStatusResponse) + } + + var r1 error + if rf, ok := ret.Get(1).(func(TaskRequest) error); ok { + r1 = rf(req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// IsUpToDate provides a mock function with given fields: connector, config +func (_m *MockHighLevelClient) IsUpToDate(connector string, config map[string]interface{}) (bool, error) { + ret := _m.Called(connector, config) + + var r0 bool + if rf, ok := ret.Get(0).(func(string, map[string]interface{}) bool); ok { + r0 = rf(connector, config) + } else { + r0 = ret.Get(0).(bool) + } + + var r1 error + if rf, ok := ret.Get(1).(func(string, map[string]interface{}) error); ok { + r1 = rf(connector, config) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// PauseConnector provides a mock function with given fields: req, sync +func (_m *MockHighLevelClient) PauseConnector(req ConnectorRequest, sync bool) (EmptyResponse, error) { + ret := _m.Called(req, sync) + + var r0 EmptyResponse + if rf, ok := ret.Get(0).(func(ConnectorRequest, bool) EmptyResponse); ok { + r0 = rf(req, sync) + } else { + r0 = ret.Get(0).(EmptyResponse) + } + + var r1 error + if rf, ok := ret.Get(1).(func(ConnectorRequest, bool) error); ok { + r1 = rf(req, sync) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// RestartConnector provides a mock function with given fields: req +func (_m *MockHighLevelClient) RestartConnector(req ConnectorRequest) (EmptyResponse, error) { + ret := _m.Called(req) + + var r0 EmptyResponse + if rf, ok := ret.Get(0).(func(ConnectorRequest) EmptyResponse); ok { + r0 = rf(req) + } else { + r0 = ret.Get(0).(EmptyResponse) + } + + var r1 error + if rf, ok := ret.Get(1).(func(ConnectorRequest) error); ok { + r1 = rf(req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// RestartTask provides a mock function with given fields: req +func (_m *MockHighLevelClient) RestartTask(req TaskRequest) (EmptyResponse, error) { + ret := _m.Called(req) + + var r0 EmptyResponse + if rf, ok := ret.Get(0).(func(TaskRequest) EmptyResponse); ok { + r0 = rf(req) + } else { + r0 = ret.Get(0).(EmptyResponse) + } + + var r1 error + if rf, ok := ret.Get(1).(func(TaskRequest) error); ok { + r1 = rf(req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ResumeConnector provides a mock function with given fields: req, sync +func (_m *MockHighLevelClient) ResumeConnector(req ConnectorRequest, sync bool) (EmptyResponse, error) { + ret := _m.Called(req, sync) + + var r0 EmptyResponse + if rf, ok := ret.Get(0).(func(ConnectorRequest, bool) EmptyResponse); ok { + r0 = rf(req, sync) + } else { + r0 = ret.Get(0).(EmptyResponse) + } + + var r1 error + if rf, ok := ret.Get(1).(func(ConnectorRequest, bool) error); ok { + r1 = rf(req, sync) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// SetDebug provides a mock function with given fields: +func (_m *MockHighLevelClient) SetDebug() { + _m.Called() +} + +// SetInsecureSSL provides a mock function with given fields: +func (_m *MockHighLevelClient) SetInsecureSSL() { + _m.Called() +} + +// UpdateConnector provides a mock function with given fields: req, sync +func (_m *MockHighLevelClient) UpdateConnector(req CreateConnectorRequest, sync bool) (ConnectorResponse, error) { + ret := _m.Called(req, sync) + + var r0 ConnectorResponse + if rf, ok := ret.Get(0).(func(CreateConnectorRequest, bool) ConnectorResponse); ok { + r0 = rf(req, sync) + } else { + r0 = ret.Get(0).(ConnectorResponse) + } + + var r1 error + if rf, ok := ret.Get(1).(func(CreateConnectorRequest, bool) error); ok { + r1 = rf(req, sync) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/lib/connectors/tasks.go b/lib/connectors/tasks.go deleted file mode 100644 index 8643a82..0000000 --- a/lib/connectors/tasks.go +++ /dev/null @@ -1,102 +0,0 @@ -package connectors - -import ( - "strconv" -) - -//TaskRequest is generic request when interacting with task endpoint -type TaskRequest struct { - Connector string - TaskID int -} - -//GetAllTasksResponse is response to get all tasks of a specific endpoint -type GetAllTasksResponse struct { - Code int - Tasks []TaskDetails -} - -//TaskDetails is detail of a specific task on a specific endpoint -type TaskDetails struct { - ID TaskID `json:"id"` - Config map[string]interface{} `json:"config"` -} - -//TaskID identify a task and its connector -type TaskID struct { - Connector string `json:"connector"` - TaskID int `json:"task"` -} - -//TaskStatusResponse is response returned by get task status endpoint -type TaskStatusResponse struct { - Code int - Status TaskStatus -} - -//TaskStatus define task status -type TaskStatus struct { - ID int `json:"id"` - State string `json:"state"` - WorkerID string `json:"worker_id"` - Trace string `json:"trace,omitempty"` -} - -//GetAllTasks return list of running task -func (c Client) GetAllTasks(req ConnectorRequest) (GetAllTasksResponse, error) { - var result GetAllTasksResponse - - resp, err := c.restClient.NewRequest(). - SetResult(&result.Tasks). - SetPathParams(map[string]string{"name": req.Name}). - Get("connectors/{name}/tasks") - if err != nil { - return GetAllTasksResponse{}, err - } - if resp.Error() != nil { - return GetAllTasksResponse{}, resp.Error().(*ErrorResponse) - } - - result.Code = resp.StatusCode() - return result, nil -} - -//GetTaskStatus return current status of task -func (c Client) GetTaskStatus(req TaskRequest) (TaskStatusResponse, error) { - var result TaskStatusResponse - - resp, err := c.restClient.NewRequest(). - SetResult(&result). - SetPathParams(map[string]string{"name": req.Connector, "task_id": strconv.Itoa(req.TaskID)}). - Get("connectors/{name}/tasks/{task_id}/status") - if err != nil { - return TaskStatusResponse{}, err - } - if resp.Error() != nil && resp.StatusCode() != 404 { - return TaskStatusResponse{}, resp.Error().(*ErrorResponse) - } - - result.Code = resp.StatusCode() - - return result, nil -} - -//RestartTask try to restart task -func (c Client) RestartTask(req TaskRequest) (EmptyResponse, error) { - var result EmptyResponse - - resp, err := c.restClient.NewRequest(). - SetResult(&result). - SetPathParams(map[string]string{"name": req.Connector, "task_id": strconv.Itoa(req.TaskID)}). - Post("connectors/{name}/tasks/{task_id}/restart") - if err != nil { - return EmptyResponse{}, err - } - if resp.Error() != nil { - return EmptyResponse{}, resp.Error().(*ErrorResponse) - } - - result.Code = resp.StatusCode() - - return result, nil -} diff --git a/lib/docker-compose.yml b/lib/docker-compose.yml index 57ab1b5..098e16c 100644 --- a/lib/docker-compose.yml +++ b/lib/docker-compose.yml @@ -7,6 +7,7 @@ services: - "8081-8083:8081-8083" - "9092:9092" - "2181:2181" + - "3030:3030" environment: - ADV_HOST=localhost - SAMPLEDATA=0