Skip to content

Commit

Permalink
Merge pull request #245 from orkes-io/rate_limiter_update
Browse files Browse the repository at this point in the history
add rate limit config to workflow def
  • Loading branch information
v1r3n authored Nov 22, 2023
2 parents 2a8adad + 59c77e6 commit 1bfb797
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 112 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2023 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.common.metadata.workflow;

/** Rate limit configuration for workflows */
public class RateLimitConfig {
/**
* Key that defines the rate limit. Rate limit key is a combination of workflow payload such as
* name, or correlationId etc.
*/
private String rateLimitKey;

/** Number of concurrently running workflows that are allowed per key */
private int concurrentExecLimit;

public String getRateLimitKey() {
return rateLimitKey;
}

public void setRateLimitKey(String rateLimitKey) {
this.rateLimitKey = rateLimitKey;
}

public int getConcurrentExecLimit() {
return concurrentExecLimit;
}

public void setConcurrentExecLimit(int concurrentExecLimit) {
this.concurrentExecLimit = concurrentExecLimit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public enum TimeoutPolicy {
@ProtoField(id = 16)
private String workflowStatusListenerSink;

private @Valid Map<String, List<StateChangeEvent>> onStateChange = new LinkedHashMap<>();
private RateLimitConfig rateLimitConfig;

/**
* @return the name
Expand Down Expand Up @@ -321,10 +321,6 @@ public static String getKey(String name, int version) {
return name + "." + version;
}

public Map<String, List<StateChangeEvent>> getOnStateChange() {
return onStateChange;
}

public String getWorkflowStatusListenerSink() {
return workflowStatusListenerSink;
}
Expand All @@ -333,8 +329,12 @@ public void setWorkflowStatusListenerSink(String workflowStatusListenerSink) {
this.workflowStatusListenerSink = workflowStatusListenerSink;
}

public void setOnStateChange(Map<String, List<StateChangeEvent>> onStateChange) {
this.onStateChange = onStateChange;
public RateLimitConfig getRateLimitConfig() {
return rateLimitConfig;
}

public void setRateLimitConfig(RateLimitConfig rateLimitConfig) {
this.rateLimitConfig = rateLimitConfig;
}

public boolean containsType(String taskType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,59 +526,11 @@ public boolean equals(Object o) {
return false;
}
Workflow workflow = (Workflow) o;
return getEndTime() == workflow.getEndTime()
&& getWorkflowVersion() == workflow.getWorkflowVersion()
&& getStatus() == workflow.getStatus()
&& Objects.equals(getWorkflowId(), workflow.getWorkflowId())
&& Objects.equals(getParentWorkflowId(), workflow.getParentWorkflowId())
&& Objects.equals(getParentWorkflowTaskId(), workflow.getParentWorkflowTaskId())
&& Objects.equals(getTasks(), workflow.getTasks())
&& Objects.equals(getInput(), workflow.getInput())
&& Objects.equals(getOutput(), workflow.getOutput())
&& Objects.equals(getWorkflowName(), workflow.getWorkflowName())
&& Objects.equals(getCorrelationId(), workflow.getCorrelationId())
&& Objects.equals(getReRunFromWorkflowId(), workflow.getReRunFromWorkflowId())
&& Objects.equals(getReasonForIncompletion(), workflow.getReasonForIncompletion())
&& Objects.equals(getEvent(), workflow.getEvent())
&& Objects.equals(getTaskToDomain(), workflow.getTaskToDomain())
&& Objects.equals(
getFailedReferenceTaskNames(), workflow.getFailedReferenceTaskNames())
&& Objects.equals(
getExternalInputPayloadStoragePath(),
workflow.getExternalInputPayloadStoragePath())
&& Objects.equals(
getExternalOutputPayloadStoragePath(),
workflow.getExternalOutputPayloadStoragePath())
&& Objects.equals(getPriority(), workflow.getPriority())
&& Objects.equals(getWorkflowDefinition(), workflow.getWorkflowDefinition())
&& Objects.equals(getVariables(), workflow.getVariables())
&& Objects.equals(getLastRetriedTime(), workflow.getLastRetriedTime());
return Objects.equals(getWorkflowId(), workflow.getWorkflowId());
}

@Override
public int hashCode() {
return Objects.hash(
getStatus(),
getEndTime(),
getWorkflowId(),
getParentWorkflowId(),
getParentWorkflowTaskId(),
getTasks(),
getInput(),
getOutput(),
getWorkflowName(),
getWorkflowVersion(),
getCorrelationId(),
getReRunFromWorkflowId(),
getReasonForIncompletion(),
getEvent(),
getTaskToDomain(),
getFailedReferenceTaskNames(),
getWorkflowDefinition(),
getExternalInputPayloadStoragePath(),
getExternalOutputPayloadStoragePath(),
getPriority(),
getVariables(),
getLastRetriedTime());
return Objects.hash(getWorkflowId());
}
}
57 changes: 2 additions & 55 deletions core/src/main/java/com/netflix/conductor/model/WorkflowModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -483,65 +483,12 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
WorkflowModel that = (WorkflowModel) o;
return getEndTime() == that.getEndTime()
&& getPriority() == that.getPriority()
&& getLastRetriedTime() == that.getLastRetriedTime()
&& getStatus() == that.getStatus()
&& Objects.equals(getWorkflowId(), that.getWorkflowId())
&& Objects.equals(getParentWorkflowId(), that.getParentWorkflowId())
&& Objects.equals(getParentWorkflowTaskId(), that.getParentWorkflowTaskId())
&& Objects.equals(getTasks(), that.getTasks())
&& Objects.equals(getInput(), that.getInput())
&& Objects.equals(getOutput(), that.getOutput())
&& Objects.equals(getCorrelationId(), that.getCorrelationId())
&& Objects.equals(getReRunFromWorkflowId(), that.getReRunFromWorkflowId())
&& Objects.equals(getReasonForIncompletion(), that.getReasonForIncompletion())
&& Objects.equals(getEvent(), that.getEvent())
&& Objects.equals(getTaskToDomain(), that.getTaskToDomain())
&& Objects.equals(getFailedReferenceTaskNames(), that.getFailedReferenceTaskNames())
&& Objects.equals(getWorkflowDefinition(), that.getWorkflowDefinition())
&& Objects.equals(
getExternalInputPayloadStoragePath(),
that.getExternalInputPayloadStoragePath())
&& Objects.equals(
getExternalOutputPayloadStoragePath(),
that.getExternalOutputPayloadStoragePath())
&& Objects.equals(getVariables(), that.getVariables())
&& Objects.equals(getOwnerApp(), that.getOwnerApp())
&& Objects.equals(getCreateTime(), that.getCreateTime())
&& Objects.equals(getUpdatedTime(), that.getUpdatedTime())
&& Objects.equals(getCreatedBy(), that.getCreatedBy())
&& Objects.equals(getUpdatedBy(), that.getUpdatedBy());
return Objects.equals(getWorkflowId(), that.getWorkflowId());
}

@Override
public int hashCode() {
return Objects.hash(
getStatus(),
getEndTime(),
getWorkflowId(),
getParentWorkflowId(),
getParentWorkflowTaskId(),
getTasks(),
getInput(),
getOutput(),
getCorrelationId(),
getReRunFromWorkflowId(),
getReasonForIncompletion(),
getEvent(),
getTaskToDomain(),
getFailedReferenceTaskNames(),
getWorkflowDefinition(),
getExternalInputPayloadStoragePath(),
getExternalOutputPayloadStoragePath(),
getPriority(),
getVariables(),
getLastRetriedTime(),
getOwnerApp(),
getCreateTime(),
getUpdatedTime(),
getCreatedBy(),
getUpdatedBy());
return Objects.hash(getWorkflowId());
}

public Workflow toWorkflow() {
Expand Down

0 comments on commit 1bfb797

Please sign in to comment.