From 59c77e6efc222e46db014b997914007925631602 Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Wed, 22 Nov 2023 12:34:16 -0800 Subject: [PATCH] add rate limit config to workflow def also clean up equality checks --- .../metadata/workflow/RateLimitConfig.java | 41 +++++++++++++ .../common/metadata/workflow/WorkflowDef.java | 14 ++--- .../conductor/common/run/Workflow.java | 52 +---------------- .../conductor/model/WorkflowModel.java | 57 +------------------ 4 files changed, 52 insertions(+), 112 deletions(-) create mode 100644 common/src/main/java/com/netflix/conductor/common/metadata/workflow/RateLimitConfig.java diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/RateLimitConfig.java b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/RateLimitConfig.java new file mode 100644 index 0000000000..09c9ed7579 --- /dev/null +++ b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/RateLimitConfig.java @@ -0,0 +1,41 @@ +/* + * Copyright 2023 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.common.metadata.workflow; + +/** Rate limit configuration for workflows */ +public class RateLimitConfig { + /** + * Key that defines the rate limit. Rate limit key is a combination of workflow payload such as + * name, or correlationId etc. + */ + private String rateLimitKey; + + /** Number of concurrently running workflows that are allowed per key */ + private int concurrentExecLimit; + + public String getRateLimitKey() { + return rateLimitKey; + } + + public void setRateLimitKey(String rateLimitKey) { + this.rateLimitKey = rateLimitKey; + } + + public int getConcurrentExecLimit() { + return concurrentExecLimit; + } + + public void setConcurrentExecLimit(int concurrentExecLimit) { + this.concurrentExecLimit = concurrentExecLimit; + } +} diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java index 2e2b1f2eb3..1cd9a8fe9d 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java @@ -99,7 +99,7 @@ public enum TimeoutPolicy { @ProtoField(id = 16) private String workflowStatusListenerSink; - private @Valid Map> onStateChange = new LinkedHashMap<>(); + private RateLimitConfig rateLimitConfig; /** * @return the name @@ -321,10 +321,6 @@ public static String getKey(String name, int version) { return name + "." + version; } - public Map> getOnStateChange() { - return onStateChange; - } - public String getWorkflowStatusListenerSink() { return workflowStatusListenerSink; } @@ -333,8 +329,12 @@ public void setWorkflowStatusListenerSink(String workflowStatusListenerSink) { this.workflowStatusListenerSink = workflowStatusListenerSink; } - public void setOnStateChange(Map> onStateChange) { - this.onStateChange = onStateChange; + public RateLimitConfig getRateLimitConfig() { + return rateLimitConfig; + } + + public void setRateLimitConfig(RateLimitConfig rateLimitConfig) { + this.rateLimitConfig = rateLimitConfig; } public boolean containsType(String taskType) { diff --git a/common/src/main/java/com/netflix/conductor/common/run/Workflow.java b/common/src/main/java/com/netflix/conductor/common/run/Workflow.java index dc2455f0dd..fa56d43e43 100644 --- a/common/src/main/java/com/netflix/conductor/common/run/Workflow.java +++ b/common/src/main/java/com/netflix/conductor/common/run/Workflow.java @@ -526,59 +526,11 @@ public boolean equals(Object o) { return false; } Workflow workflow = (Workflow) o; - return getEndTime() == workflow.getEndTime() - && getWorkflowVersion() == workflow.getWorkflowVersion() - && getStatus() == workflow.getStatus() - && Objects.equals(getWorkflowId(), workflow.getWorkflowId()) - && Objects.equals(getParentWorkflowId(), workflow.getParentWorkflowId()) - && Objects.equals(getParentWorkflowTaskId(), workflow.getParentWorkflowTaskId()) - && Objects.equals(getTasks(), workflow.getTasks()) - && Objects.equals(getInput(), workflow.getInput()) - && Objects.equals(getOutput(), workflow.getOutput()) - && Objects.equals(getWorkflowName(), workflow.getWorkflowName()) - && Objects.equals(getCorrelationId(), workflow.getCorrelationId()) - && Objects.equals(getReRunFromWorkflowId(), workflow.getReRunFromWorkflowId()) - && Objects.equals(getReasonForIncompletion(), workflow.getReasonForIncompletion()) - && Objects.equals(getEvent(), workflow.getEvent()) - && Objects.equals(getTaskToDomain(), workflow.getTaskToDomain()) - && Objects.equals( - getFailedReferenceTaskNames(), workflow.getFailedReferenceTaskNames()) - && Objects.equals( - getExternalInputPayloadStoragePath(), - workflow.getExternalInputPayloadStoragePath()) - && Objects.equals( - getExternalOutputPayloadStoragePath(), - workflow.getExternalOutputPayloadStoragePath()) - && Objects.equals(getPriority(), workflow.getPriority()) - && Objects.equals(getWorkflowDefinition(), workflow.getWorkflowDefinition()) - && Objects.equals(getVariables(), workflow.getVariables()) - && Objects.equals(getLastRetriedTime(), workflow.getLastRetriedTime()); + return Objects.equals(getWorkflowId(), workflow.getWorkflowId()); } @Override public int hashCode() { - return Objects.hash( - getStatus(), - getEndTime(), - getWorkflowId(), - getParentWorkflowId(), - getParentWorkflowTaskId(), - getTasks(), - getInput(), - getOutput(), - getWorkflowName(), - getWorkflowVersion(), - getCorrelationId(), - getReRunFromWorkflowId(), - getReasonForIncompletion(), - getEvent(), - getTaskToDomain(), - getFailedReferenceTaskNames(), - getWorkflowDefinition(), - getExternalInputPayloadStoragePath(), - getExternalOutputPayloadStoragePath(), - getPriority(), - getVariables(), - getLastRetriedTime()); + return Objects.hash(getWorkflowId()); } } diff --git a/core/src/main/java/com/netflix/conductor/model/WorkflowModel.java b/core/src/main/java/com/netflix/conductor/model/WorkflowModel.java index 7e35e7b380..613a01870a 100644 --- a/core/src/main/java/com/netflix/conductor/model/WorkflowModel.java +++ b/core/src/main/java/com/netflix/conductor/model/WorkflowModel.java @@ -483,65 +483,12 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; WorkflowModel that = (WorkflowModel) o; - return getEndTime() == that.getEndTime() - && getPriority() == that.getPriority() - && getLastRetriedTime() == that.getLastRetriedTime() - && getStatus() == that.getStatus() - && Objects.equals(getWorkflowId(), that.getWorkflowId()) - && Objects.equals(getParentWorkflowId(), that.getParentWorkflowId()) - && Objects.equals(getParentWorkflowTaskId(), that.getParentWorkflowTaskId()) - && Objects.equals(getTasks(), that.getTasks()) - && Objects.equals(getInput(), that.getInput()) - && Objects.equals(getOutput(), that.getOutput()) - && Objects.equals(getCorrelationId(), that.getCorrelationId()) - && Objects.equals(getReRunFromWorkflowId(), that.getReRunFromWorkflowId()) - && Objects.equals(getReasonForIncompletion(), that.getReasonForIncompletion()) - && Objects.equals(getEvent(), that.getEvent()) - && Objects.equals(getTaskToDomain(), that.getTaskToDomain()) - && Objects.equals(getFailedReferenceTaskNames(), that.getFailedReferenceTaskNames()) - && Objects.equals(getWorkflowDefinition(), that.getWorkflowDefinition()) - && Objects.equals( - getExternalInputPayloadStoragePath(), - that.getExternalInputPayloadStoragePath()) - && Objects.equals( - getExternalOutputPayloadStoragePath(), - that.getExternalOutputPayloadStoragePath()) - && Objects.equals(getVariables(), that.getVariables()) - && Objects.equals(getOwnerApp(), that.getOwnerApp()) - && Objects.equals(getCreateTime(), that.getCreateTime()) - && Objects.equals(getUpdatedTime(), that.getUpdatedTime()) - && Objects.equals(getCreatedBy(), that.getCreatedBy()) - && Objects.equals(getUpdatedBy(), that.getUpdatedBy()); + return Objects.equals(getWorkflowId(), that.getWorkflowId()); } @Override public int hashCode() { - return Objects.hash( - getStatus(), - getEndTime(), - getWorkflowId(), - getParentWorkflowId(), - getParentWorkflowTaskId(), - getTasks(), - getInput(), - getOutput(), - getCorrelationId(), - getReRunFromWorkflowId(), - getReasonForIncompletion(), - getEvent(), - getTaskToDomain(), - getFailedReferenceTaskNames(), - getWorkflowDefinition(), - getExternalInputPayloadStoragePath(), - getExternalOutputPayloadStoragePath(), - getPriority(), - getVariables(), - getLastRetriedTime(), - getOwnerApp(), - getCreateTime(), - getUpdatedTime(), - getCreatedBy(), - getUpdatedBy()); + return Objects.hash(getWorkflowId()); } public Workflow toWorkflow() {