From d2dba4b754848f0b74e81ba994f1ece1ead7003b Mon Sep 17 00:00:00 2001 From: Hakan Uyumaz Date: Wed, 19 Oct 2022 17:32:13 +0200 Subject: [PATCH] Allocate stale primary shard functionality is added --- es.go | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- es_test.go | 17 +++++++++++++++++ 2 files changed, 71 insertions(+), 2 deletions(-) diff --git a/es.go b/es.go index e218189..e4b48e7 100644 --- a/es.go +++ b/es.go @@ -227,11 +227,11 @@ type ClusterSettings struct { // A setting name and value with the setting name to be a "collapsed" version of // the setting. A setting of: // -// { "indices": { "recovery" : { "max_bytes_per_sec": "10mb" } } } +// { "indices": { "recovery" : { "max_bytes_per_sec": "10mb" } } } // // would be represented by: // -// ClusterSetting{ Setting: "indices.recovery.max_bytes_per_sec", Value: "10mb" } +// ClusterSetting{ Setting: "indices.recovery.max_bytes_per_sec", Value: "10mb" } type Setting struct { Setting string Value string @@ -1606,3 +1606,55 @@ func (c *Client) ClusterAllocationExplain(req *ClusterAllocationExplainRequest, return string(body), nil } + +type RerouteRequest struct { + // The commands to perform (move, cancel, allocate, etc) + Commands []RerouteCommand `json:"commands,omitempty"` +} + +type RerouteCommand struct { + AllocateStalePrimary AllocateStalePrimary `json:"allocate_stale_primary,omitempty"` +} + +type AllocateStalePrimary struct { + // The node ID or node name of the node to assign the shard to. + Node string `json:"node,omitempty"` + + // The name of the index containing the shard to be assigned. + Index string `json:"index,omitempty"` + + // The shard ID of the shard to be assigned. + Shard *int `json:"shard,omitempty"` + + // If a node which has the good copy of the data rejoins the cluster later on, that data will be deleted or overwritten with the data of the stale copy that was forcefully allocated with this command. + AcceptDataLoss bool `json:"accept_data_loss,omitempty"` +} + +// AllocateStalePrimary allows to manually allocate a stale primary shard to a specific node +func (c *Client) AllocateStalePrimaryShard(node, index string, shard int) error { + var urlBuilder strings.Builder + urlBuilder.WriteString("_cluster/reroute") + + agent := c.buildPostRequest(urlBuilder.String()) + + req := RerouteRequest{ + Commands: []RerouteCommand{ + { + AllocateStalePrimary: AllocateStalePrimary{ + Node: node, + Index: index, + Shard: &shard, + AcceptDataLoss: true, + }, + }, + }, + } + agent.Set("Content-Type", "application/json").Send(req) + + _, err := handleErrWithBytes(agent) + if err != nil { + return err + } + + return nil +} diff --git a/es_test.go b/es_test.go index ac146fc..6e77dda 100644 --- a/es_test.go +++ b/es_test.go @@ -2179,3 +2179,20 @@ func TestClusterAllocationExplain(t *testing.T) { }) } } + +func TestAllocateStalePrimaryShard(t *testing.T) { + testSetup := &ServerSetup{ + Method: "POST", + Path: "/_cluster/reroute", + Body: `{"commands":[{"allocate_stale_primary":{"accept_data_loss":true,"index":"test-index","node":"test-node","shard":0}}]}`, + } + + host, port, ts := setupTestServers(t, []*ServerSetup{testSetup}) + defer ts.Close() + client := NewClient(host, port) + + err := client.AllocateStalePrimaryShard("test-node", "test-index", 0) + if err != nil { + t.Fatalf("Unexpected error. expected nil, got %s", err) + } +}