Skip to content

Commit

Permalink
don't put all load in the last orderer
Browse files Browse the repository at this point in the history
Signed-off-by: David VIEJO <[email protected]>
  • Loading branch information
dviejokfs committed Jan 16, 2025
1 parent 6b56f8a commit f9b6e76
Showing 1 changed file with 56 additions and 13 deletions.
69 changes: 56 additions & 13 deletions controllers/mainchannel/mainchannel_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ func (r *FabricMainChannelReconciler) Reconcile(ctx context.Context, req ctrl.Re
return r.handleReconcileError(ctx, fabricMainChannel, err)
}

resmgmtOptions := r.setupResmgmtOptions(fabricMainChannel)
options, _ := r.setupResmgmtOptions(fabricMainChannel)

blockBytes, err := r.fetchConfigBlock(resClient, fabricMainChannel, resmgmtOptions)
blockBytes, err := r.fetchConfigBlock(resClient, fabricMainChannel, options)
if err != nil {
return r.handleReconcileError(ctx, fabricMainChannel, err)
}
Expand All @@ -127,11 +127,11 @@ func (r *FabricMainChannelReconciler) Reconcile(ctx context.Context, req ctrl.Re
return r.handleReconcileError(ctx, fabricMainChannel, err)
}

if err := r.updateChannelConfig(ctx, fabricMainChannel, resClient, resmgmtOptions, blockBytes, sdk, clientSet); err != nil {
if err := r.updateChannelConfig(ctx, fabricMainChannel, resClient, options, blockBytes, sdk, clientSet); err != nil {
return r.handleReconcileError(ctx, fabricMainChannel, err)
}
time.Sleep(3 * time.Second)
if err := r.saveChannelConfig(ctx, fabricMainChannel, resClient, resmgmtOptions); err != nil {
if err := r.saveChannelConfig(ctx, fabricMainChannel, resClient, options); err != nil {
return r.handleReconcileError(ctx, fabricMainChannel, err)
}

Expand Down Expand Up @@ -395,6 +395,43 @@ func (r *FabricMainChannelReconciler) joinInternalOrderers(ctx context.Context,
return nil
}

func (r *FabricMainChannelReconciler) queryConfigBlockFromOrdererWithRoundRobin(resClient *resmgmt.Client, channelID string, ordererEndpoints []string, resmgmtOptions []resmgmt.RequestOption) (*common.Block, error) {
if len(ordererEndpoints) == 0 {
return nil, fmt.Errorf("no orderer endpoints available")
}

// Try each orderer in sequence until one succeeds
var lastErr error
for _, endpoint := range ordererEndpoints {
// Create options for this specific orderer
ordererOpts := []resmgmt.RequestOption{
resmgmt.WithOrdererEndpoint(endpoint),
resmgmt.WithRetry(retry.Opts{
Attempts: 3,
InitialBackoff: 1 * time.Second,
MaxBackoff: 10 * time.Second,
}),
}

// Add any other options that were passed in (except orderer endpoints)
for _, opt := range resmgmtOptions {
ordererOpts = append(ordererOpts, opt)
}

log.Infof("Attempting to query config block from orderer %s", endpoint)
block, err := resClient.QueryConfigBlockFromOrderer(channelID, ordererOpts...)
if err != nil {
log.Warnf("Failed to query config block from orderer %s: %v", endpoint, err)
lastErr = err
continue
}
log.Infof("Successfully queried config block from orderer %s", endpoint)
return block, nil
}

return nil, fmt.Errorf("failed to query config block from all orderers, last error: %v", lastErr)
}

func (r *FabricMainChannelReconciler) fetchOrdererChannelBlock(resClient *resmgmt.Client, fabricMainChannel *hlfv1alpha1.FabricMainChannel, resmgmtOptions []resmgmt.RequestOption) (*common.Block, error) {
var ordererChannelBlock *common.Block
var err error
Expand All @@ -403,7 +440,9 @@ func (r *FabricMainChannelReconciler) fetchOrdererChannelBlock(resClient *resmgm
InitialBackoff: 1000 * time.Millisecond,
MaxBackoff: 10 * time.Second,
}))
ordererChannelBlock, err = resClient.QueryConfigBlockFromOrderer(fabricMainChannel.Spec.Name, resmgmtOptions...)

options, endpoints := r.setupResmgmtOptions(fabricMainChannel)
ordererChannelBlock, err = r.queryConfigBlockFromOrdererWithRoundRobin(resClient, fabricMainChannel.Spec.Name, endpoints, options)
if err != nil {
return nil, errors.Wrapf(err, "failed to get block from channel %s", fabricMainChannel.Spec.Name)
}
Expand Down Expand Up @@ -478,18 +517,25 @@ func (r *FabricMainChannelReconciler) handleReconcileError(ctx context.Context,
return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel)
}

func (r *FabricMainChannelReconciler) setupResmgmtOptions(fabricMainChannel *hlfv1alpha1.FabricMainChannel) []resmgmt.RequestOption {
func (r *FabricMainChannelReconciler) setupResmgmtOptions(fabricMainChannel *hlfv1alpha1.FabricMainChannel) ([]resmgmt.RequestOption, []string) {
resmgmtOptions := []resmgmt.RequestOption{
resmgmt.WithTimeout(fab2.ResMgmt, 30*time.Second),
resmgmt.WithRetry(retry.Opts{
Attempts: 3,
InitialBackoff: 1 * time.Second,
MaxBackoff: 10 * time.Second,
}),
}

var ordererEndpoints []string
for _, ordOrg := range fabricMainChannel.Spec.OrdererOrganizations {
for _, endpoint := range ordOrg.OrdererEndpoints {
resmgmtOptions = append(resmgmtOptions, resmgmt.WithOrdererEndpoint(endpoint))
ordererEndpoints = append(ordererEndpoints, endpoint)
// resmgmtOptions = append(resmgmtOptions)
}
}

return resmgmtOptions
return resmgmtOptions, ordererEndpoints
}

func (r *FabricMainChannelReconciler) fetchConfigBlock(resClient *resmgmt.Client, fabricMainChannel *hlfv1alpha1.FabricMainChannel, resmgmtOptions []resmgmt.RequestOption) ([]byte, error) {
Expand All @@ -501,11 +547,8 @@ func (r *FabricMainChannelReconciler) fetchConfigBlock(resClient *resmgmt.Client
MaxBackoff: 10 * time.Second,
}))

channelBlock, err = resClient.QueryConfigBlockFromOrderer(fabricMainChannel.Spec.Name, resmgmtOptions...)
if err != nil {
return nil, errors.Wrapf(err, "failed to query config block from orderer %s", fabricMainChannel.Spec.Name)
}

options, endpoints := r.setupResmgmtOptions(fabricMainChannel)
channelBlock, err = r.queryConfigBlockFromOrdererWithRoundRobin(resClient, fabricMainChannel.Spec.Name, endpoints, options)
if err != nil {
log.Infof("Channel %s does not exist, creating it: %v", fabricMainChannel.Spec.Name, err)
return r.createNewChannel(fabricMainChannel)
Expand Down

0 comments on commit f9b6e76

Please sign in to comment.