Skip to content

Commit

Permalink
Merge pull request #5 from rerost/rerost/treat-copy-in-executer
Browse files Browse the repository at this point in the history
treat copy in executer
  • Loading branch information
Hazumi Ichijo authored Nov 11, 2018
2 parents 66081eb + 39ab753 commit b0828b3
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 42 deletions.
68 changes: 56 additions & 12 deletions executer/executer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package executer

import (
"context"
"fmt"
"os"
"time"

"github.com/rerost/es-cli/infra/es"
"github.com/srvc/fail"
Expand Down Expand Up @@ -37,27 +40,67 @@ func (e *executerImp) Run(ctx context.Context, operation string, target string,
return e.esBaseClient.ListIndex(ctx)
case "create":
if len(args) != 2 {
return Empty{}, fail.New("Invalid arguments")
return Empty{}, fail.New(fmt.Sprintf("Invalid arguments expected: %d, %v", 2, args))
}
return Empty{}, e.esBaseClient.CreateIndex(ctx, args[0], args[1])
case "delete":
if len(args) != 1 {
return Empty{}, fail.New("Invalid arguments")
return Empty{}, fail.New(fmt.Sprintf("Invalid arguments expected: %d, %v", 1, args))
}
return Empty{}, e.esBaseClient.DeleteIndex(ctx, args[0])
case "copy":
if len(args) != 2 {
return Empty{}, fail.New("Invalid arguments")
return Empty{}, fail.New(fmt.Sprintf("Invalid arguments expected: %d, %v", 2, args))
}
return Empty{}, e.esBaseClient.CopyIndex(ctx, args[0], args[1])

task, err := e.esBaseClient.CopyIndex(ctx, args[0], args[1])
if err != nil {
return Empty{}, fail.Wrap(err)
}

fmt.Fprintf(os.Stdout, "TaskID is %s\n", task.ID)

for i := 1; ; i++ {
// Back off
time.Sleep(time.Second * time.Duration(i*i))
fmt.Fprintf(os.Stdout, "Waiting for complete copy...\n")
task, err := e.esBaseClient.GetTask(ctx, task.ID)

if err != nil {
return Empty{}, fail.Wrap(err)
}

if task.Complete == true {
break
}
}
srcIndexCount, err := e.esBaseClient.CountIndex(ctx, args[0])
if err != nil {
return Empty{}, fail.Wrap(err)
}
dstIndexCount, err := e.esBaseClient.CountIndex(ctx, args[0])
if err != nil {
return Empty{}, fail.Wrap(err)
}
if srcIndexCount.Num != dstIndexCount.Num {
return Empty{}, fail.New(fmt.Sprintf("Copy is faild. Prease delete index %s Not match document count src: %d, dst: %d", args[1], srcIndexCount.Num, dstIndexCount.Num))
}
fmt.Fprintf(os.Stdout, "Done copy")

return Empty{}, nil
case "count":
if len(args) != 1 {
return Empty{}, fail.New(fmt.Sprintf("Invalid arguments expected: %d, %v", 1, args))
}
return e.esBaseClient.CountIndex(ctx, args[0])
}
}

if target == "mapping" {
switch operation {
case "get":
if len(args) != 1 {
return Empty{}, fail.New("Invalid arguments")
return Empty{}, fail.New(fmt.Sprintf("Invalid arguments expected: %d, %v", 1, args))
}
return e.esBaseClient.GetMapping(ctx, args[0])
}
Expand All @@ -67,24 +110,24 @@ func (e *executerImp) Run(ctx context.Context, operation string, target string,
switch operation {
case "create":
if len(args) != 2 {
return Empty{}, fail.New("Invalid arguments")
return Empty{}, fail.New(fmt.Sprintf("Invalid arguments expected: %d, %v", 2, args))
}
return Empty{}, e.esBaseClient.CreateAlias(ctx, args[0], args[1])
case "drop":
if len(args) != 2 {
return Empty{}, fail.New("Invalid arguments")
return Empty{}, fail.New(fmt.Sprintf("Invalid arguments expected: %d, %v", 2, args))
}
// TODO implement
return Empty{}, nil
// return Empty{}, e.esBaseClient.DropAlias(ctx, args[0], args[1])
case "add":
if len(args) != 2 {
return Empty{}, fail.New("Invalid arguments")
return Empty{}, fail.New(fmt.Sprintf("Invalid arguments expected: %d, %v", 2, args))
}
return Empty{}, e.esBaseClient.AddAlias(ctx, args[0], args[1])
case "remove":
if len(args) >= 2 {
return Empty{}, fail.New("Invalid arguments")
return Empty{}, fail.New(fmt.Sprintf("Invalid arguments expected: >= %d, %v", 2, args))
}
return Empty{}, e.esBaseClient.RemoveAlias(ctx, args[0], args[1:]...)
}
Expand All @@ -94,15 +137,16 @@ func (e *executerImp) Run(ctx context.Context, operation string, target string,
switch operation {
case "list":
if len(args) != 0 {
return Empty{}, fail.New("Invalid arguments")
return Empty{}, fail.New(fmt.Sprintf("Invalid arguments expected: %d, %v", 0, args))
}
return e.esBaseClient.ListTask(ctx)
case "get":
if len(args) != 1 {
return e.esBaseClient.GetTask(ctx, args[0])
return Empty{}, fail.New(fmt.Sprintf("Invalid arguments expected: %d, %v", 1, args))
}
return e.esBaseClient.GetTask(ctx, args[0])
}
}

return Empty{}, fail.New("Invalid arguments")
return Empty{}, fail.New(fmt.Sprintf("Invalid arguments %v", args))
}
90 changes: 60 additions & 30 deletions infra/es/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"os"
"strings"
"time"

"github.com/rerost/es-cli/setting"
"github.com/srvc/fail"
Expand Down Expand Up @@ -43,11 +41,12 @@ func (m Mapping) String() string {
type Opt struct{}
type Alias struct{}
type Task struct {
ID string
Complete bool
}

func (t Task) String() string {
return fmt.Sprintf("%v", t.Complete)
return fmt.Sprintf("ID: %s, Complete: %v", t.ID, t.Complete)
}

type Tasks []Task
Expand All @@ -61,15 +60,24 @@ func (ts Tasks) String() string {
return strings.Join(result, "\n")
}

type Count struct {
Num int64
}

func (c Count) String() string {
return fmt.Sprintf("%d", c.Num)
}

type Version struct{}

// Client is http wrapper
type BaseClient interface {
// Index
ListIndex(ctx context.Context) (Indices, error)
CreateIndex(ctx context.Context, indexName string, mappingJSON string) error
CopyIndex(ctx context.Context, srcIndexName string, dstIndexName string) error
CopyIndex(ctx context.Context, srcIndexName string, dstIndexName string) (Task, error)
DeleteIndex(ctx context.Context, indexName string) error
CountIndex(ctx context.Context, indexName string) (Count, error)

// Mapping
GetMapping(ctx context.Context, indexOrAliasName string) (Mapping, error)
Expand Down Expand Up @@ -201,7 +209,7 @@ func (client baseClientImp) CreateIndex(ctx context.Context, indexName string, m

return nil
}
func (client baseClientImp) CopyIndex(ctx context.Context, srcIndexName string, dstIndexName string) error {
func (client baseClientImp) CopyIndex(ctx context.Context, srcIndexName string, dstIndexName string) (Task, error) {
reindexJSON := fmt.Sprintf(`
{
"source": {
Expand All @@ -214,7 +222,7 @@ func (client baseClientImp) CopyIndex(ctx context.Context, srcIndexName string,
`, srcIndexName, dstIndexName)
request, err := http.NewRequest(http.MethodPost, client.reindexURL(), bytes.NewBufferString(reindexJSON))
if err != nil {
return fail.Wrap(err)
return Task{}, fail.Wrap(err)
}

if client.User.Valid && client.Pass.Valid {
Expand All @@ -225,7 +233,7 @@ func (client baseClientImp) CopyIndex(ctx context.Context, srcIndexName string,

response, err := client.HttpClient.Do(request)
if err != nil {
return fail.Wrap(err)
return Task{}, fail.Wrap(err)
}
defer response.Body.Close()

Expand All @@ -234,41 +242,23 @@ func (client baseClientImp) CopyIndex(ctx context.Context, srcIndexName string,
responseBody, err := ioutil.ReadAll(response.Body)
err = json.Unmarshal(responseBody, &responseMap)
if err != nil {
return fail.Wrap(err)
return Task{}, fail.Wrap(err)
}

if errMsg, ok := responseMap["error"]; ok {
return fail.New(fmt.Sprintf("%v", errMsg))
return Task{}, fail.New(fmt.Sprintf("%v", errMsg))
}

if _, ok := responseMap["task"].(string); !ok {
return fail.New(fmt.Sprintf("Not found task: %v", string(responseBody)))
return Task{}, fail.New(fmt.Sprintf("Not found task: %v", string(responseBody)))
}

taskID := responseMap["task"].(string)
fmt.Fprintf(os.Stdout, "TaskID is %s\n", taskID)

for i := 1; ; i++ {
// Back off
time.Sleep(time.Second * time.Duration(i*i))
fmt.Fprintf(os.Stdout, "Waiting for complete copy...\n")
task, err := client.GetTask(ctx, taskID)

if err != nil {
return fail.Wrap(err)
}

if task.Complete == true {
break
}
}

// TODO Check count

return nil
return Task{ID: taskID}, nil
}
func (client baseClientImp) DeleteIndex(ctx context.Context, indexName string) error {
request, err := http.NewRequest(http.MethodDelete, client.indexURL(indexName), bytes.NewBufferString(""))
request, err := http.NewRequest(http.MethodDelete, client.rawIndexURL(indexName), bytes.NewBufferString(""))
if err != nil {
return fail.Wrap(err)
}
Expand Down Expand Up @@ -297,6 +287,40 @@ func (client baseClientImp) DeleteIndex(ctx context.Context, indexName string) e

return nil
}
func (client baseClientImp) CountIndex(ctx context.Context, indexName string) (Count, error) {
request, err := http.NewRequest(http.MethodGet, client.countURL(indexName), bytes.NewBufferString(""))
if err != nil {
return Count{}, fail.Wrap(err)
}

if client.User.Valid && client.Pass.Valid {
request.SetBasicAuth(client.User.String, client.Pass.String)
}

response, err := client.HttpClient.Do(request)
if err != nil {
return Count{}, fail.Wrap(err)
}
defer response.Body.Close()

responseMap := map[string]interface{}{}

responseBody, err := ioutil.ReadAll(response.Body)
err = json.Unmarshal(responseBody, &responseMap)
if err != nil {
return Count{}, fail.Wrap(err)
}

if errMsg, ok := responseMap["error"]; ok {
return Count{}, fail.New(fmt.Sprintf("%v", errMsg))
}

if _, ok := responseMap["count"].(float64); !ok {
return Count{}, fail.New(fmt.Sprintf("Failed to extract count from json: %s", responseBody))
}

return Count{Num: int64(responseMap["count"].(float64))}, nil
}

// Mapping
func (client baseClientImp) GetMapping(ctx context.Context, indexOrAliasName string) (Mapping, error) {
Expand Down Expand Up @@ -586,6 +610,9 @@ func (client baseClientImp) listIndexURL() string {
func (client baseClientImp) indexURL(indexName string) string {
return client.baseURL() + "/" + indexName + "/" + client.Type
}
func (client baseClientImp) rawIndexURL(indexName string) string {
return client.baseURL() + "/" + indexName
}
func (client baseClientImp) reindexURL() string {
return client.baseURL() + "/_reindex"
}
Expand All @@ -601,6 +628,9 @@ func (client baseClientImp) mappingURL(indexOrAliasName string) string {
func (client baseClientImp) aliasURL() string {
return client.baseURL() + "/_aliases"
}
func (client baseClientImp) countURL(indexName string) string {
return client.indexURL(indexName) + "/_count"
}

func addParams(req *http.Request, params map[string]string) *http.Request {
q := req.URL.Query()
Expand Down

0 comments on commit b0828b3

Please sign in to comment.