Skip to content

Commit

Permalink
Merge pull request #3626 from Thushani-Jayasekera/ws-paidorg
Browse files Browse the repository at this point in the history
Set circuitbreaker if org is paid
  • Loading branch information
renuka-fernando authored Jan 22, 2025
2 parents 450d7a1 + 33fadc0 commit 266e153
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 40 deletions.
12 changes: 6 additions & 6 deletions adapter/config/default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,18 @@ var defaultConfig = &Config{
{
Organizations: "*",
CircuitBreakerName: "BasicCircuitBreaker",
MaxConnections: 3,
MaxRequests: 3,
MaxConnections: 1,
MaxRequests: 1,
MaxPendingRequests: 1,
MaxRetries: 3,
MaxConnectionPools: 1,
},
{
Organizations: "*",
CircuitBreakerName: "EnhancedCircuitBreaker",
MaxConnections: 50,
MaxRequests: 50,
MaxConnections: 25,
MaxRequests: 25,
MaxPendingRequests: 1,
MaxRetries: 50,
MaxConnectionPools: 2,
},
},
},
Expand Down
11 changes: 10 additions & 1 deletion adapter/internal/api/apis_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ func ProcessMountedAPIProjects() (err error) {
continue
}

// setting false as this feature is not in use
apiProject.IsPaidOrg = false

overrideValue := false
err = validateAndUpdateXds(apiProject, &overrideValue)
if err != nil {
Expand Down Expand Up @@ -238,6 +241,7 @@ func ApplyAPIProjectFromAPIM(
vhostToEnvsMap map[string][]*synchronizer.GatewayLabel,
apiEnvs map[string]map[string]synchronizer.APIEnvProps,
xdsOptions common.XdsOptions,
isPaidOrg bool,
) (deployedRevisionList []*notifier.DeployedAPIRevision, err error) {
apiProject, err := extractAPIProject(payload)
if err != nil {
Expand All @@ -260,7 +264,8 @@ func ApplyAPIProjectFromAPIM(
if apiProject.OrganizationID == "" {
apiProject.OrganizationID = config.GetControlPlaneConnectedTenantDomain()
}
loggers.LoggerAPI.Infof("Deploying api %s:%s in Organization %s", apiYaml.Name, apiYaml.Version, apiProject.OrganizationID)
apiProject.IsPaidOrg = isPaidOrg
loggers.LoggerAPI.Infof("Deploying api %s:%s in Organization %s ( isPaid: %v )", apiYaml.Name, apiYaml.Version, apiProject.OrganizationID, isPaidOrg)

conf, _ := config.ReadConfigs()
currentEnv := conf.ControlPlane.EnvironmentLabels[0] // assumption - adapter has only one environment
Expand Down Expand Up @@ -302,6 +307,10 @@ func ApplyAPIProjectInStandaloneMode(payload []byte, override *bool) (err error)
if err != nil {
return err
}

// setting false as this feature is not in use
apiProject.IsPaidOrg = false

return validateAndUpdateXds(apiProject, override)
}

Expand Down
4 changes: 2 additions & 2 deletions adapter/internal/discovery/xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,14 +372,14 @@ func UpdateAPI(vHost string, apiProject mgw.ProjectAPI, deployedEnvironments []*
apiHashValue := generateHashValue(apiYaml.Name, apiYaml.Version)

if mgwSwagger.GetProdEndpoints() != nil {
mgwSwagger.GetProdEndpoints().SetEndpointsConfig(apiYaml.EndpointConfig.ProductionEndpoints, apiYaml.EndpointConfig.EndpointType, apiYaml.OrganizationID)
mgwSwagger.GetProdEndpoints().SetEndpointsConfig(apiYaml.EndpointConfig.ProductionEndpoints, apiYaml.EndpointConfig.EndpointType, apiYaml.OrganizationID, apiProject.IsPaidOrg)
if !mgwSwagger.GetProdEndpoints().SecurityConfig.Enabled && apiYaml.EndpointConfig.APIEndpointSecurity.Production.Enabled {
mgwSwagger.GetProdEndpoints().SecurityConfig = apiYaml.EndpointConfig.APIEndpointSecurity.Production
}
}

if mgwSwagger.GetSandEndpoints() != nil {
mgwSwagger.GetSandEndpoints().SetEndpointsConfig(apiYaml.EndpointConfig.SandBoxEndpoints, apiYaml.EndpointConfig.EndpointType, apiYaml.OrganizationID)
mgwSwagger.GetSandEndpoints().SetEndpointsConfig(apiYaml.EndpointConfig.SandBoxEndpoints, apiYaml.EndpointConfig.EndpointType, apiYaml.OrganizationID, apiProject.IsPaidOrg)
if !mgwSwagger.GetSandEndpoints().SecurityConfig.Enabled && apiYaml.EndpointConfig.APIEndpointSecurity.Sandbox.Enabled {
mgwSwagger.GetSandEndpoints().SecurityConfig = apiYaml.EndpointConfig.APIEndpointSecurity.Sandbox
}
Expand Down
71 changes: 55 additions & 16 deletions adapter/internal/oasparser/model/mgw_swagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"net/url"
"os"
"regexp"
"strconv"
"strings"
Expand All @@ -37,6 +38,19 @@ import (
"github.com/wso2/product-microgateway/adapter/pkg/synchronizer"
)

var paidOrgsFromSubscriptionServiceEnabled bool

func init() {
envIsPaidOrgsFromSubscriptionServiceEnabled := os.Getenv("ENABLE_PAID_ORGS_FROM_SUBSCRIPTION_SERVICE")

// Parse the environment variable to a boolean, defaulting to false if not set or if parsing fails
var err error
paidOrgsFromSubscriptionServiceEnabled, err = strconv.ParseBool(envIsPaidOrgsFromSubscriptionServiceEnabled)
if err != nil || envIsPaidOrgsFromSubscriptionServiceEnabled == "" {
paidOrgsFromSubscriptionServiceEnabled = false
}
}

// MgwSwagger represents the object structure holding the information related to the
// openAPI object. The values are populated from the extensions/properties mentioned at
// the root level of the openAPI definition. The pathItem level information is represented
Expand Down Expand Up @@ -181,6 +195,9 @@ const prototypedAPI = "prototyped"
// BasicCircuitBreaker is the name for free tier cluster level circuit breaker
const BasicCircuitBreaker = "BasicCircuitBreaker"

// EnhancedCircuitBreaker is the name for the circuit breaker assigned for paid orgs
const EnhancedCircuitBreaker = "EnhancedCircuitBreaker"

// GetCorsConfig returns the CorsConfiguration Object.
func (swagger *MgwSwagger) GetCorsConfig() *CorsConfig {
return swagger.xWso2Cors
Expand Down Expand Up @@ -643,7 +660,7 @@ func (swagger *MgwSwagger) setXWso2Endpoints() error {
}

// SetEndpointsConfig set configs for Endpoints sent by api.yaml
func (endpointCluster *EndpointCluster) SetEndpointsConfig(endpointInfos []EndpointInfo, apiType string, orgID string) error {
func (endpointCluster *EndpointCluster) SetEndpointsConfig(endpointInfos []EndpointInfo, apiType string, orgID string, isChoreoOrgPaid bool) error {
if endpointInfos == nil || len(endpointInfos) == 0 {
return nil
}
Expand Down Expand Up @@ -684,22 +701,31 @@ func (endpointCluster *EndpointCluster) SetEndpointsConfig(endpointInfos []Endpo
conf, _ := config.ReadConfigs()
var selectedCircuitBreaker *CircuitBreakers

for _, circuitBreaker := range conf.Envoy.Upstream.CircuitBreakers {
if utills.GetIsOrganizationInList(orgID, circuitBreaker.Organizations) {
selectedCircuitBreaker = createCircuitBreaker(
circuitBreaker.MaxConnections,
circuitBreaker.MaxPendingRequests,
circuitBreaker.MaxRequests,
circuitBreaker.MaxRetries,
circuitBreaker.MaxConnectionPools,
)
break
if paidOrgsFromSubscriptionServiceEnabled {
for _, circuitBreaker := range conf.Envoy.Upstream.CircuitBreakers {
if isChoreoOrgPaid && circuitBreaker.CircuitBreakerName == EnhancedCircuitBreaker {
selectedCircuitBreaker = createCircuitBreaker(
circuitBreaker.MaxConnections,
circuitBreaker.MaxPendingRequests,
circuitBreaker.MaxRequests,
circuitBreaker.MaxRetries,
circuitBreaker.MaxConnectionPools,
)
break
} else if !isChoreoOrgPaid && circuitBreaker.CircuitBreakerName == BasicCircuitBreaker {
selectedCircuitBreaker = createCircuitBreaker(
circuitBreaker.MaxConnections,
circuitBreaker.MaxPendingRequests,
circuitBreaker.MaxRequests,
circuitBreaker.MaxRetries,
circuitBreaker.MaxConnectionPools,
)
break
}
}
}
if selectedCircuitBreaker == nil {
} else {
for _, circuitBreaker := range conf.Envoy.Upstream.CircuitBreakers {
// breaks from the first iteration
if circuitBreaker.CircuitBreakerName == BasicCircuitBreaker {
if utills.GetIsOrganizationInList(orgID, circuitBreaker.Organizations) {
selectedCircuitBreaker = createCircuitBreaker(
circuitBreaker.MaxConnections,
circuitBreaker.MaxPendingRequests,
Expand All @@ -709,7 +735,20 @@ func (endpointCluster *EndpointCluster) SetEndpointsConfig(endpointInfos []Endpo
)
break
}

}
if selectedCircuitBreaker == nil {
for _, circuitBreaker := range conf.Envoy.Upstream.CircuitBreakers {
if circuitBreaker.CircuitBreakerName == BasicCircuitBreaker {
selectedCircuitBreaker = createCircuitBreaker(
circuitBreaker.MaxConnections,
circuitBreaker.MaxPendingRequests,
circuitBreaker.MaxRequests,
circuitBreaker.MaxRetries,
circuitBreaker.MaxConnectionPools,
)
break
}
}
}
}
endpointCluster.Config.CircuitBreakers = selectedCircuitBreaker
Expand Down
9 changes: 5 additions & 4 deletions adapter/internal/oasparser/model/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type ProjectAPI struct {
APIType string // read from api.yaml and formatted to upper case
APILifeCycleStatus string // read from api.yaml and formatted to upper case
OrganizationID string // read from api.yaml or config
IsPaidOrg bool

//UpstreamCerts cert filename -> cert bytes
UpstreamCerts map[string][]byte
Expand Down Expand Up @@ -152,10 +153,10 @@ type apiData struct {
}

type choreoComponentInfo struct {
OrganizationID string `json:"organizationId,omitempty"`
ProjectID string `json:"projectId,omitempty"`
ComponentID string `json:"componentId,omitempty"`
VersionID string `json:"versionId,omitempty"`
OrganizationID string `json:"organizationId,omitempty"`
ProjectID string `json:"projectId,omitempty"`
ComponentID string `json:"componentId,omitempty"`
VersionID string `json:"versionId,omitempty"`
}

type backendJWTConfiguration struct {
Expand Down
5 changes: 3 additions & 2 deletions adapter/internal/synchronizer/apis_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ func PushAPIProjects(payload []byte, environments []string, xdsOptions common.Xd
// Pass the byte slice for the XDS APIs to push it to the enforcer and router
// TODO: (renuka) optimize applying API project, update maps one by one and apply xds once
var deployedRevisionList []*notifier.DeployedAPIRevision
deployedRevisionList, err = apiServer.ApplyAPIProjectFromAPIM(apiFileData, vhostToEnvsMap, envProps, xdsOptions)

deployedRevisionList, err = apiServer.ApplyAPIProjectFromAPIM(apiFileData, vhostToEnvsMap, envProps, xdsOptions, deployment.IsPaidOrg)
if err != nil {
logger.LoggerSync.Errorf("Error occurred while applying project %v", err)
} else if deployedRevisionList != nil {
Expand All @@ -123,7 +124,7 @@ func PushAPIProjects(payload []byte, environments []string, xdsOptions common.Xd

// TODO: (renuka) notify the revision deployment to the control plane once all chunks are deployed.
// This is not fixed as notify the control plane chunk by chunk (even though the chunk is not really applied to the Enforcer and Router) is not a drastic issue.
// This path is only happening when Adapter is restarting and at that time the deployed time is already updated in the control plane.
// This path is only happening when Adapter is restarting and at that time the deployed time is already updated in the control plane.
notifier.SendRevisionUpdate(deploymentList)
logger.LoggerSync.Infof("Successfully deployed %d API/s", len(deploymentList))
// Error nil for successful execution
Expand Down
1 change: 1 addition & 0 deletions adapter/pkg/synchronizer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type APIDeployment struct {
Environments []GatewayLabel `json:"environments"`
// These properties are used by global Adapter
OrganizationID string `json:"organizationId"`
IsPaidOrg bool `json:"isPaidOrg"`
APIContext string `json:"apiContext"`
Version string `json:"version"`
}
Expand Down
18 changes: 9 additions & 9 deletions resources/conf/config.toml.template
Original file line number Diff line number Diff line change
Expand Up @@ -206,18 +206,18 @@ retainKeys = ["self_validate_jwt", "issuer", "claim_mappings", "consumer_key_cla
[[router.upstream.circuitBreakers]]
organizations = "*"
circuitBreakerName = "BasicCircuitBreaker"
maxConnections = 3
maxRequests = 3
maxPendingRequests = 0
maxConnectionPools = 3
maxConnections = 1
maxRequests = 1
maxPendingRequests = 1
maxConnectionPools = 1

[[router.upstream.circuitBreakers]]
organizations = "e0682456-2ba6-4c5f-8f36-3c5b6dc46913,4b9afefb-4bcc-4e63-85d3-ddd593841012,d3a7dfea-fb10-4371-b21d-85d1bc28667b"
organizations = "e0682456-2ba6-4c5f-8f36-3c5b6dc46913,4b9afefb-4bcc-4e63-85d3-ddd593841012"
circuitBreakerName = "EnhancedCircuitBreaker"
maxConnections = 50
maxRequests = 50
maxPendingRequests = 0
maxConnectionPools = 50
maxConnections = 25
maxRequests = 25
maxPendingRequests = 1
maxConnectionPools = 2

# Configs relevant to the envoy rate-limit service
[router.ratelimit]
Expand Down

0 comments on commit 266e153

Please sign in to comment.