Skip to content

Commit

Permalink
Merge pull request #325 from rtay1188/updateshard
Browse files Browse the repository at this point in the history
Update Shard status processing
  • Loading branch information
rtay1188 authored Aug 20, 2024
2 parents 84e9f9d + 6d8cdea commit ee2d3d0
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 31 deletions.
174 changes: 148 additions & 26 deletions admiral/pkg/clusters/shard_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@ package clusters
import (
"context"
"fmt"
admiralapiv1 "github.com/istio-ecosystem/admiral-api/pkg/apis/admiral/v1"
admiralapi "github.com/istio-ecosystem/admiral-api/pkg/client/clientset/versioned"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/util"
"github.com/istio-ecosystem/admiral/admiral/pkg/registry"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"slices"
"strconv"
"strings"
"sync"
"time"

admiralapiv1 "github.com/istio-ecosystem/admiral-api/pkg/apis/admiral/v1"
"github.com/istio-ecosystem/admiral/admiral/pkg/registry"

"github.com/istio-ecosystem/admiral/admiral/pkg/controller/admiral"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/common"
log "github.com/sirupsen/logrus"
Expand All @@ -23,13 +27,11 @@ type ShardHandler struct {
type ConfigWriterData struct {
IdentityConfig *registry.IdentityConfig
ClusterName string
// TODO: Could keep this result field or derive it from the passed along error, also could be Shard.Status type instead of string
Result string
Error error
Error error
}

func (sh *ShardHandler) Added(ctx context.Context, obj *admiralapiv1.Shard) error {
err := HandleEventForShard(ctx, admiral.Add, obj, sh.RemoteRegistry)
func (sh *ShardHandler) Added(ctx context.Context, obj *admiralapiv1.Shard, cc admiralapi.Interface) error {
err := HandleEventForShard(ctx, admiral.Add, obj, sh.RemoteRegistry, cc)
if err != nil {
return fmt.Errorf(LogErrFormat, common.Add, common.ShardResourceType, obj.Name, "", err)
}
Expand All @@ -48,11 +50,26 @@ type HandleEventForShardFunc func(
remoteRegistry *RemoteRegistry, clusterName string) error

// helper function to handle add and delete for ShardHandler
func HandleEventForShard(ctx context.Context, event admiral.EventType, obj *admiralapiv1.Shard,
remoteRegistry *RemoteRegistry) error {
func HandleEventForShard(ctx context.Context, event admiral.EventType, obj *admiralapiv1.Shard, remoteRegistry *RemoteRegistry, cc admiralapi.Interface) error {
var err error
ctxLogger := common.GetCtxLogger(ctx, obj.Name, "")
tmpShard := obj.DeepCopy()
ctxLogger.Infof(common.CtxLogFormat, "HandleEventForShard", obj.Name, "", "", "")
ctxLogger.Infof(common.CtxLogFormat, "HandleEventForShard", obj.Name, "", "", "beginning to handle shard event")
tmpShardStatusCondition := admiralapiv1.ShardStatusCondition{
Message: "Starting to handle shard",
Reason: admiralapiv1.Processing,
Status: admiralapiv1.FalseConditionStatus,
LastUpdatedTime: v1.Time{},
}
if len(tmpShard.Status.Conditions) > 0 {
tmpShard.Status.Conditions = slices.Insert(tmpShard.Status.Conditions, 0, tmpShardStatusCondition)
} else {
tmpShard.Status.Conditions = []admiralapiv1.ShardStatusCondition{tmpShardStatusCondition}
}
err = updateShardStatus(ctx, ctxLogger, tmpShard, obj, cc)
if err != nil {
ctxLogger.Warnf(common.CtxLogFormat, "updateShardStatus", obj.Name, obj.Namespace, "", "failed to update shard status to processing")
}
var consumerWG, producerWG, resultsWG sync.WaitGroup
configWriterData := make(chan *ConfigWriterData, 1000)
configWriterDataResults := make(chan *ConfigWriterData, 1000)
Expand All @@ -65,15 +82,15 @@ func HandleEventForShard(ctx context.Context, event admiral.EventType, obj *admi
go ProduceIdentityConfigsFromShard(ctxLogger, *obj, configWriterData, remoteRegistry, &producerWG)
// Start processing results
resultsWG.Add(1)
go UpdateShard(ctxLogger, configWriterDataResults, &resultsWG, tmpShard)
go ProcessResults(ctx, ctxLogger, configWriterDataResults, &resultsWG, tmpShard, cc)
// wait for all consumers to finish
producerWG.Wait()
consumerWG.Wait()
// all consumers done,no more values sent to results
close(configWriterDataResults)
// wait for all results to be processed
resultsWG.Wait()
//TODO: Need to write the new tmpShard with all the results to the cluster + return error for the item to be requeued
//TODO: choose what errors we want to retry shard processing very carefully - this error is only for writing shard to cluster, can choose what error to return in an error channel from processresults
return nil
}

Expand All @@ -88,6 +105,12 @@ func ProduceIdentityConfigsFromShard(ctxLogger *log.Entry, shard admiralapiv1.Sh
identityConfig, err := rr.RegistryClient.GetIdentityConfigByIdentityName(identityItem.Name, ctxLogger)
if err != nil {
ctxLogger.Warnf(common.CtxLogFormat, "ProduceIdentityConfig", identityItem.Name, shard.Namespace, clusterShard.Name, err)
configWriterData <- &ConfigWriterData{
IdentityConfig: &registry.IdentityConfig{IdentityName: identityItem.Name},
ClusterName: clusterShard.Name,
Error: err,
}
continue
}
ctxLogger.Infof(common.CtxLogFormat, "ProduceIdentityConfig", identityConfig.IdentityName, shard.Namespace, clusterShard.Name, "successfully produced IdentityConfig")
// Fill the IdentityDependencyCache
Expand Down Expand Up @@ -138,6 +161,11 @@ func ProduceIdentityConfigsFromShard(ctxLogger *log.Entry, shard admiralapiv1.Sh
func ConsumeIdentityConfigs(ctxLogger *log.Entry, ctx context.Context, configWriterData <-chan *ConfigWriterData, configWriterDataResults chan<- *ConfigWriterData, rr *RemoteRegistry, wg *sync.WaitGroup) {
defer wg.Done()
for data := range configWriterData {
if data.Error != nil {
ctxLogger.Warnf(common.CtxLogFormat, "ConsumeIdentityConfig", "", "", data.ClusterName, "received configWriterData with error from producer")
configWriterDataResults <- data
continue
}
identityConfig := data.IdentityConfig
assetName := identityConfig.IdentityName
clientCluster := data.ClusterName
Expand All @@ -146,8 +174,10 @@ func ConsumeIdentityConfigs(ctxLogger *log.Entry, ctx context.Context, configWri
serviceEntryBuilder := ServiceEntryBuilder{ClientCluster: clientCluster, RemoteRegistry: rr}
serviceEntries, err := serviceEntryBuilder.BuildServiceEntriesFromIdentityConfig(ctxLogger, *identityConfig)
if err != nil {
ctxLogger.Warnf(common.CtxLogFormat, "ConsumeIdentityConfig", assetName, "", clientCluster, err)
data.Result = err.Error()
ctxLogger.Warnf(common.CtxLogFormat, "ConsumeIdentityConfigBuildSEs", assetName, "", clientCluster, err)
data.Error = err
configWriterDataResults <- data
continue
}
isServiceEntryModifyCalledForSourceCluster := false
sourceClusterEnvironmentNamespaces := map[string]string{}
Expand Down Expand Up @@ -188,18 +218,18 @@ func ConsumeIdentityConfigs(ctxLogger *log.Entry, ctx context.Context, configWri
close(clusters)
err := <-errors
if err != nil {
ctxLogger.Warnf(common.CtxLogFormat, "ConsumeIdentityConfig", strings.ToLower(se.Hosts[0])+"-se", "", clientCluster, err)
data.Result = err.Error()
ctxLogger.Errorf(common.CtxLogFormat, "ConsumeIdentityConfigAddSEWithDRWorker", strings.ToLower(se.Hosts[0])+"-se", "", clientCluster, err)
data.Error = err
}
if isServiceEntryModifyCalledForSourceClusterAndEnv {
ctxLogger.Infof(common.CtxLogFormat, "ConsumeIdentityConfig", strings.ToLower(se.Hosts[0])+"-se", "", clientCluster, "modifying Sidecar for local cluster communication")
ctxLogger.Infof(common.CtxLogFormat, "ConsumeIdentityConfigModifySidecar", strings.ToLower(se.Hosts[0])+"-se", "", clientCluster, "modifying Sidecar for local cluster communication")
err = modifySidecarForLocalClusterCommunication(
ctxLogger,
ctx, sourceClusterEnvironmentNamespaces[env], assetName,
rr.AdmiralCache.DependencyNamespaceCache, rr.GetRemoteController(clientCluster))
if err != nil {
ctxLogger.Errorf(common.CtxLogFormat, "modifySidecarForLocalClusterCommunication",
assetName, sourceClusterEnvironmentNamespaces[env], "", err)
ctxLogger.Errorf(common.CtxLogFormat, "ConsumeIdentityConfigModifySidecarErr", assetName, sourceClusterEnvironmentNamespaces[env], "", err)
data.Error = err
}
}
}
Expand All @@ -208,12 +238,104 @@ func ConsumeIdentityConfigs(ctxLogger *log.Entry, ctx context.Context, configWri
}
}

// UpdateShard reads the job object from the results channel and updates the original shard object with the proper result.
func UpdateShard(ctxLogger *log.Entry, results <-chan *ConfigWriterData, resultswg *sync.WaitGroup, shard *admiralapiv1.Shard) {
// ProcessResults reads the data object from the results channel and updates the original shard object with the proper result.
func ProcessResults(ctx context.Context, ctxLogger *log.Entry, results <-chan *ConfigWriterData, resultswg *sync.WaitGroup, shard *admiralapiv1.Shard, cc admiralapi.Interface) error {
defer resultswg.Done()
for job := range results {
ctxLogger.Infof(common.CtxLogFormat, "UpdateShard", shard.Name, "", job.ClusterName, job.Result)
//ctxLogger.Infof(common.CtxLogFormat, "UpdateShard", shard.Name, "", job.ClusterName, shard.Status.Conditions[0].Message)
//TODO: need to get updated shard crd spec and set status here
updatedShard := shard.DeepCopy()
updatedShard.Status.ClustersMonitored = len(shard.Spec.Clusters)
updatedShardStatusCondition := admiralapiv1.ShardStatusCondition{
Message: "Shard handling complete",
Reason: admiralapiv1.Processed,
Status: admiralapiv1.TrueConditionStatus,
Type: admiralapiv1.SyncComplete,
LastUpdatedTime: v1.Time{Time: time.Now()},
}
updatedShard.Status.FailureDetails = admiralapiv1.FailureDetails{
LastUpdatedTime: v1.Time{Time: time.Now()},
FailedClusters: []admiralapiv1.FailedCluster{},
}
clusterFailedIdentitiesMap := make(map[string][]admiralapiv1.FailedIdentity)
for data := range results {
if data.Error != nil {
ctxLogger.Warnf(common.CtxLogFormat, "ProcessResults", shard.Name, common.GetOperatorSyncNamespace(), data.ClusterName, data.Error.Error())
failedIdentityList := clusterFailedIdentitiesMap[data.ClusterName]
failedIdentity := admiralapiv1.FailedIdentity{
Name: data.IdentityConfig.IdentityName,
Message: data.Error.Error(),
}
if failedIdentityList != nil {
clusterFailedIdentitiesMap[data.ClusterName] = slices.Insert(failedIdentityList, 0, failedIdentity)
} else {
clusterFailedIdentitiesMap[data.ClusterName] = []admiralapiv1.FailedIdentity{failedIdentity}
}
updatedShardStatusCondition.Reason = admiralapiv1.ErrorOccurred
updatedShardStatusCondition.Type = admiralapiv1.SyncFailed
updatedShardStatusCondition.LastUpdatedTime = v1.Time{Time: time.Now()}
updatedShard.Status.FailureDetails.LastUpdatedTime = v1.Time{Time: time.Now()}
}
}
for cluster, identities := range clusterFailedIdentitiesMap {
updatedShard.Status.FailureDetails.FailedClusters = append(updatedShard.Status.FailureDetails.FailedClusters, admiralapiv1.FailedCluster{
Name: cluster,
FailedIdentities: identities,
})
}
//Just overwrite the first one because we already created a new condition entry in HandleEventForShard
updatedShard.Status.Conditions[0] = updatedShardStatusCondition
updatedShard.Status.LastUpdatedTime = v1.Time{Time: time.Now()}
err := updateShardStatus(ctx, ctxLogger, updatedShard, shard, cc)
if err != nil {
ctxLogger.Errorf(common.CtxLogFormat, "ProcessResults", shard.Name, shard.Namespace, "", err)
}
return err
}

func updateShardStatus(ctx context.Context, ctxLogger *log.Entry, obj *admiralapiv1.Shard, exist *admiralapiv1.Shard, cc admiralapi.Interface) error {
var err error
ctxLogger.Infof(common.CtxLogFormat, "UpdateShardStatus", obj.Namespace, "", "Updating Shard="+obj.Name)
exist, err = cc.AdmiralV1().Shards(obj.Namespace).Get(ctx, obj.Name, v1.GetOptions{})
if err != nil {
exist = obj
ctxLogger.Warnf(common.CtxLogFormat, "UpdateShard", exist.Name, exist.Namespace, "", "got error on fetching shard, will retry updating")
}
exist.Labels = obj.Labels
exist.Annotations = obj.Annotations
exist.Spec = obj.Spec
_, err = cc.AdmiralV1().Shards(exist.Namespace).Update(ctx, exist, v1.UpdateOptions{})
if err != nil {
ctxLogger.Infof(common.CtxLogFormat, "UpdateShard", exist.Name, exist.Namespace, "", "Failed to initially update shard="+obj.Name)
err = retryUpdatingShard(ctx, ctxLogger, obj, exist, cc, err)
}
if err != nil {
ctxLogger.Errorf(LogErrFormat, "UpdateShardStatus", common.ShardResourceType, obj.Name, "", err)
return err
} else {
ctxLogger.Infof(LogFormat, "UpdateShardStatus", common.ShardResourceType, obj.Name, "", "Success")
}
return nil
}

func retryUpdatingShard(ctx context.Context, ctxLogger *log.Entry, obj *admiralapiv1.Shard, exist *admiralapiv1.Shard, cc admiralapi.Interface, err error) error {
numRetries := 5
if err != nil && k8sErrors.IsConflict(err) {
for i := 0; i < numRetries; i++ {
ctxLogger.Errorf(common.CtxLogFormat, "Update", obj.Name, obj.Namespace, "", err.Error()+". retry Shard update "+strconv.Itoa(i+1)+"/"+strconv.Itoa(numRetries))
updatedShard, err := cc.AdmiralV1().Shards(exist.Namespace).Get(ctx, exist.Name, v1.GetOptions{})
// if old shard not found, move on
if err != nil {
ctxLogger.Infof(common.CtxLogFormat, "Update", exist.Name, exist.Namespace, "", err.Error()+fmt.Sprintf(". Error getting old shard"))
continue
}
ctxLogger.Infof(common.CtxLogFormat, "Update", obj.Name, obj.Namespace, "", fmt.Sprintf("existingResourceVersion=%s resourceVersionUsedForUpdate=%s", updatedShard.ResourceVersion, obj.ResourceVersion))
updatedShard.Spec = obj.Spec
updatedShard.Status = obj.Status
updatedShard.Annotations = obj.Annotations
updatedShard.Labels = obj.Labels
sh, err := cc.AdmiralV1().Shards(exist.Namespace).Update(ctx, updatedShard, v1.UpdateOptions{})
if err == nil || sh == nil {
return nil
}
}
}
return err
}
68 changes: 66 additions & 2 deletions admiral/pkg/clusters/shard_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@ import (
"fmt"
"github.com/golang/protobuf/ptypes/duration"
"github.com/google/go-cmp/cmp"
"github.com/istio-ecosystem/admiral/admiral/pkg/client/loader"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/admiral"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/wrapperspb"
"istio.io/client-go/pkg/apis/networking/v1alpha3"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
"sync"
"testing"
"time"

admiralapiv1 "github.com/istio-ecosystem/admiral-api/pkg/apis/admiral/v1"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/common"
Expand Down Expand Up @@ -96,7 +101,7 @@ func createMockShard(shardName string, clusterName string, identityName string,
ClustersMonitored: 1,
Conditions: []admiralapiv1.ShardStatusCondition{shardStatusCondition},
FailureDetails: admiralapiv1.FailureDetails{},
LastUpdatedTime: v1.Time{},
LastUpdatedTime: v1.Now(),
},
}
return &shard
Expand All @@ -115,9 +120,12 @@ func TestShardHandler_Added(t *testing.T) {
rc2 := createRemoteControllerForShardTests("cluster-usw2-k8s")
rr.PutRemoteController("cluster1", rc1)
rr.PutRemoteController("cluster-usw2-k8s", rc2)

//sampleShard1 := createMockShard("shard-sample", "cluster1", "sample", "e2e")
sampleShard2 := createMockShard("blackhole-shard", "cluster-usw2-k8s", "ppdmeshtestblackhole", "ppd")
shardHandler := &ShardHandler{RemoteRegistry: rr}
sc, _ := admiral.NewShardController(make(chan struct{}), shardHandler, "../../test/resources/[email protected]", "ns", time.Duration(1000), loader.GetFakeClientLoader())
sampleShard2, _ = sc.CrdClient.AdmiralV1().Shards(sampleShard2.Namespace).Create(context.Background(), sampleShard2, v1.CreateOptions{})
defaultSidecar := &v1alpha3.Sidecar{
ObjectMeta: v1.ObjectMeta{
Name: "default",
Expand Down Expand Up @@ -251,7 +259,7 @@ func TestShardHandler_Added(t *testing.T) {

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
shErr := shardHandler.Added(context.Background(), tt.shard)
shErr := shardHandler.Added(context.Background(), tt.shard, sc.CrdClient)
if shErr != nil {
t.Errorf("failed to handle Shard with err: %v", shErr)
}
Expand Down Expand Up @@ -311,3 +319,59 @@ func TestShardHandler_Deleted(t *testing.T) {
t.Errorf("expected nil err for delete, for %v", err)
}
}

func TestRetryUpdatingShard(t *testing.T) {
admiralParams := setupForShardTests()
rr, _ := InitAdmiralOperator(context.Background(), admiralParams)
sampleShard := createMockShard("blackhole-shard", "cluster-usw2-k8s", "ppdmeshtestblackhole", "ppd")
shardHandler := &ShardHandler{RemoteRegistry: rr}
sc, _ := admiral.NewShardController(make(chan struct{}), shardHandler, "../../test/resources/[email protected]", "ns", time.Duration(1000), loader.GetFakeClientLoader())
sc.CrdClient.AdmiralV1().Shards(sampleShard.Namespace).Create(context.Background(), sampleShard, v1.CreateOptions{})
sampleShard2 := sampleShard.DeepCopy()
newFailedCluster := admiralapiv1.FailedCluster{
Name: "cluster-usw2-k8s",
FailedIdentities: []admiralapiv1.FailedIdentity{{
Name: "ppdmeshtestblackhole",
Message: "test failure message",
}},
}
sampleShard2.Status.FailureDetails.FailedClusters = append(sampleShard2.Status.FailureDetails.FailedClusters, newFailedCluster)
ctxLogger := common.GetCtxLogger(context.Background(), sampleShard2.Name, "")
testCases := []struct {
name string
sc *admiral.ShardController
newShard *admiralapiv1.Shard
existingShard *admiralapiv1.Shard
expectedShard *admiralapiv1.Shard
err *k8sErrors.StatusError
}{
{
name: "Given the server asset we want to write resources for is deployed on a remote cluster in env A and a client cluster in env B" +
"Then an SE with only remote endpoint and istio-system in exportTo should be built for env B",
sc: sc,
newShard: sampleShard2,
existingShard: sampleShard,
expectedShard: sampleShard2,
err: k8sErrors.NewConflict(schema.GroupResource{}, "", nil),
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
err := retryUpdatingShard(context.Background(), ctxLogger, tt.newShard, tt.existingShard, tt.sc.CrdClient, tt.err)
if err != nil {
t.Errorf("failed to retry updating shard with err: %v", err)
}
// Check that the expected Shard matches the produced Shard
actualShard, shardErr := tt.sc.CrdClient.AdmiralV1().Shards(tt.newShard.Namespace).Get(context.Background(), tt.newShard.Name, v1.GetOptions{})
if actualShard == nil {
t.Errorf("expected Shard to not be nil")
} else if !cmp.Equal(actualShard.Status, tt.expectedShard.Status, protocmp.Transform()) {
t.Errorf("got=%v, want=%v", jsonPrint(actualShard.Status), jsonPrint(tt.expectedShard.Status))
}
if shardErr != nil {
t.Errorf("failed to get Shard with err %v", shardErr)
}
})
}
}
4 changes: 2 additions & 2 deletions admiral/pkg/controller/admiral/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
const OperatorIdentityLabelKey = "admiral.io/operatorIdentity"

type ShardHandler interface {
Added(ctx context.Context, obj *admiralapiv1.Shard) error
Added(ctx context.Context, obj *admiralapiv1.Shard, cc admiralapi.Interface) error
Deleted(ctx context.Context, obj *admiralapiv1.Shard) error
}

Expand Down Expand Up @@ -200,7 +200,7 @@ func HandleAddUpdateShard(ctx context.Context, obj interface{}, d *ShardControll
if len(key) > 0 {
d.Cache.UpdateShardToClusterCache(key, shard)
}
err := d.ShardHandler.Added(ctx, shard)
err := d.ShardHandler.Added(ctx, shard, d.CrdClient)
return err
}

Expand Down
Loading

0 comments on commit ee2d3d0

Please sign in to comment.