Skip to content

Commit

Permalink
Introduces resource sharing model as a feature flag
Browse files Browse the repository at this point in the history
Signed-off-by: Darshit Chanpura <[email protected]>
  • Loading branch information
DarshitChanpura committed Jan 20, 2025
1 parent d2a30d8 commit a5ded4b
Show file tree
Hide file tree
Showing 12 changed files with 124 additions and 28 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ dependencies {
implementation "org.opensearch:opensearch:${opensearch_version}"
compileOnly "org.opensearch.plugin:opensearch-scripting-painless-spi:${opensearch_version}"
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${job_scheduler_version}"
compileOnly "org.opensearch:opensearch-resource-sharing-spi:${opensearch_build}"
implementation "org.opensearch:common-utils:${common_utils_version}"
implementation "org.opensearch.client:opensearch-rest-client:${opensearch_version}"
compileOnly group: 'com.google.guava', name: 'guava', version:'32.1.3-jre'
Expand Down Expand Up @@ -205,7 +206,7 @@ opensearchplugin {
name 'opensearch-anomaly-detection'
description 'OpenSearch anomaly detector plugin'
classname 'org.opensearch.timeseries.TimeSeriesAnalyticsPlugin'
extendedPlugins = ['lang-painless', 'opensearch-job-scheduler']
extendedPlugins = ['lang-painless', 'opensearch-job-scheduler', 'opensearch-security;optional=true']
}

// Handle case where older versions of esplugin doesn't expose the joda time version it uses
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/org/opensearch/ad/constant/ADResourceScope.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.opensearch.ad.constant;

import org.opensearch.security.spi.resources.ResourceAccessScope;

public enum ADResourceScope implements ResourceAccessScope<ADResourceScope> {
AD_READ_ACCESS("ad_read_access"),
AD_FULL_ACCESS("ad_full_access");

private final String scopeName;

ADResourceScope(String scopeName) {
this.scopeName = scopeName;
}

@Override
public String value() {
return scopeName;
}
}
6 changes: 6 additions & 0 deletions src/main/java/org/opensearch/ad/constant/ConfigConstants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.opensearch.ad.constant;

public class ConfigConstants {
public static final String OPENSEARCH_RESOURCE_SHARING_ENABLED = "plugins.security.resource_sharing.enabled";
public static final Boolean OPENSEARCH_RESOURCE_SHARING_ENABLED_DEFAULT = true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.ad.constant.ConfigConstants;
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.rest.handler.IndexAnomalyDetectorActionHandler;
Expand Down Expand Up @@ -64,6 +65,7 @@ public class IndexAnomalyDetectorTransportAction extends HandledTransportAction<
private volatile Boolean filterByEnabled;
private final SearchFeatureDao searchFeatureDao;
private final Settings settings;
private final boolean resourceSharingEnabled;

@Inject
public IndexAnomalyDetectorTransportAction(
Expand All @@ -90,6 +92,8 @@ public IndexAnomalyDetectorTransportAction(
filterByEnabled = AnomalyDetectorSettings.AD_FILTER_BY_BACKEND_ROLES.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_FILTER_BY_BACKEND_ROLES, it -> filterByEnabled = it);
this.settings = settings;
this.resourceSharingEnabled = settings
.getAsBoolean(ConfigConstants.OPENSEARCH_RESOURCE_SHARING_ENABLED, ConfigConstants.OPENSEARCH_RESOURCE_SHARING_ENABLED_DEFAULT);
}

@Override
Expand All @@ -115,9 +119,10 @@ private void resolveUserAndExecute(
Consumer<AnomalyDetector> function
) {
try {
// Check if user has backend roles
// When filter by is enabled, block users creating/updating detectors who do not have backend roles.
if (filterByEnabled) {
// If resource sharing flag is enabled then access evaluation will be performed at DLS level
if (!resourceSharingEnabled && filterByEnabled) {
// Check if user has backend roles
// When filter by is enabled, block users creating/updating detectors who do not have backend roles.
String error = checkFilterByBackendRoles(requestedUser);
if (error != null) {
listener.onFailure(new TimeSeriesException(error));
Expand All @@ -140,7 +145,8 @@ private void resolveUserAndExecute(
clusterService,
xContentRegistry,
filterByBackendRole,
AnomalyDetector.class
AnomalyDetector.class,
resourceSharingEnabled
);
} else {
// Create Detector. No need to get current detector.
Expand Down Expand Up @@ -175,6 +181,8 @@ protected void adExecute(
checkIndicesAndExecute(detector.getIndices(), () -> {
// Don't replace detector's user when update detector
// Github issue: https://github.com/opensearch-project/anomaly-detection/issues/124
// TODO this and similar code should be updated to remove reference to a user

User detectorUser = currentDetector == null ? user : currentDetector.getUser();
IndexAnomalyDetectorActionHandler indexAnomalyDetectorActionHandler = new IndexAnomalyDetectorActionHandler(
clusterService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.ad.AnomalyDetectorRunner;
import org.opensearch.ad.constant.ADCommonMessages;
import org.opensearch.ad.constant.ConfigConstants;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class PreviewAnomalyDetectorTransportAction extends
private volatile Boolean filterByEnabled;
private final CircuitBreakerService adCircuitBreakerService;
private Semaphore lock;
private final boolean resourceSharingEnabled;

@Inject
public PreviewAnomalyDetectorTransportAction(
Expand All @@ -93,6 +95,8 @@ public PreviewAnomalyDetectorTransportAction(
clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_FILTER_BY_BACKEND_ROLES, it -> filterByEnabled = it);
this.adCircuitBreakerService = adCircuitBreakerService;
this.lock = new Semaphore(MAX_CONCURRENT_PREVIEW.get(settings), true);
this.resourceSharingEnabled = settings
.getAsBoolean(ConfigConstants.OPENSEARCH_RESOURCE_SHARING_ENABLED, ConfigConstants.OPENSEARCH_RESOURCE_SHARING_ENABLED_DEFAULT);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_CONCURRENT_PREVIEW, it -> { lock = new Semaphore(it); });
}

Expand All @@ -115,7 +119,8 @@ protected void doExecute(
client,
clusterService,
xContentRegistry,
AnomalyDetector.class
AnomalyDetector.class,
resourceSharingEnabled
);
} catch (Exception e) {
logger.error(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.ad.constant.ConfigConstants;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -97,6 +98,7 @@ public class ForecastRunOnceTransportAction extends HandledTransportAction<Forec
private final FeatureManager featureManager;
private final ForecastStats forecastStats;
private volatile Boolean filterByEnabled;
private final boolean resourceSharingEnabled;

protected volatile Integer maxSingleStreamForecasters;
protected volatile Integer maxHCForecasters;
Expand Down Expand Up @@ -147,6 +149,8 @@ public ForecastRunOnceTransportAction(
this.maxHCForecasters = MAX_HC_FORECASTERS.get(settings);
this.maxForecastFeatures = MAX_FORECAST_FEATURES;
this.maxCategoricalFields = ForecastNumericSetting.maxCategoricalFields();
this.resourceSharingEnabled = settings
.getAsBoolean(ConfigConstants.OPENSEARCH_RESOURCE_SHARING_ENABLED, ConfigConstants.OPENSEARCH_RESOURCE_SHARING_ENABLED_DEFAULT);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_SINGLE_STREAM_FORECASTERS, it -> maxSingleStreamForecasters = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_HC_FORECASTERS, it -> maxHCForecasters = it);
}
Expand All @@ -166,7 +170,8 @@ protected void doExecute(Task task, ForecastResultRequest request, ActionListene
client,
clusterService,
xContentRegistry,
Forecaster.class
Forecaster.class,
resourceSharingEnabled
);
} catch (Exception e) {
LOG.error(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.ad.constant.ConfigConstants;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
Expand Down Expand Up @@ -63,6 +64,7 @@ public class IndexForecasterTransportAction extends HandledTransportAction<Index
private final SearchFeatureDao searchFeatureDao;
private final ForecastTaskManager taskManager;
private final Settings settings;
private final boolean resourceSharingEnabled;

@Inject
public IndexForecasterTransportAction(
Expand All @@ -89,6 +91,8 @@ public IndexForecasterTransportAction(
this.searchFeatureDao = searchFeatureDao;
this.taskManager = taskManager;
this.settings = settings;
this.resourceSharingEnabled = settings
.getAsBoolean(ConfigConstants.OPENSEARCH_RESOURCE_SHARING_ENABLED, ConfigConstants.OPENSEARCH_RESOURCE_SHARING_ENABLED_DEFAULT);
}

@Override
Expand Down Expand Up @@ -125,9 +129,10 @@ private void resolveUserAndExecute(
// this case, so we can keep current forecaster's user data.
boolean filterByBackendRole = requestedUser == null ? false : filterByEnabled;

// Check if user has backend roles
// When filter by is enabled, block users creating/updating detectors who do not have backend roles.
if (filterByEnabled) {
// If resource sharing flag is enabled then access evaluation will be performed at DLS level
if (!resourceSharingEnabled && filterByEnabled) {
// Check if user has backend roles
// When filter by is enabled, block users creating/updating detectors who do not have backend roles.
String error = checkFilterByBackendRoles(requestedUser);
if (error != null) {
listener.onFailure(new IllegalArgumentException(error));
Expand All @@ -146,7 +151,8 @@ private void resolveUserAndExecute(
clusterService,
xContentRegistry,
filterByBackendRole,
Forecaster.class
Forecaster.class,
resourceSharingEnabled
);
} else {
// Create Detector. No need to get current detector.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.script.ScriptService;
import org.opensearch.security.spi.resources.ResourceSharingExtension;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -327,7 +328,13 @@
/**
* Entry point of time series analytics plugin.
*/
public class TimeSeriesAnalyticsPlugin extends Plugin implements ActionPlugin, ScriptPlugin, SystemIndexPlugin, JobSchedulerExtension {
public class TimeSeriesAnalyticsPlugin extends Plugin
implements
ActionPlugin,
ScriptPlugin,
SystemIndexPlugin,
JobSchedulerExtension,
ResourceSharingExtension {

private static final Logger LOG = LogManager.getLogger(TimeSeriesAnalyticsPlugin.class);

Expand Down Expand Up @@ -1758,4 +1765,14 @@ public void close() {
}
}
}

@Override
public String getResourceType() {
return "detectors";
}

@Override
public String getResourceIndex() {
return CommonName.CONFIG_INDEX;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.ad.constant.ConfigConstants;
import org.opensearch.ad.model.ADTask;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -69,6 +70,7 @@ public abstract class BaseDeleteConfigTransportAction<TaskCacheManagerType exten
private final String stateIndex;
private final Class<ConfigType> configTypeClass;
private final List<TaskTypeEnum> batchTaskTypes;
private final boolean resourceSharingEnabled;

public BaseDeleteConfigTransportAction(
TransportService transportService,
Expand Down Expand Up @@ -100,6 +102,8 @@ public BaseDeleteConfigTransportAction(
this.stateIndex = stateIndex;
this.configTypeClass = configTypeClass;
this.batchTaskTypes = historicalTaskTypes;
this.resourceSharingEnabled = settings
.getAsBoolean(ConfigConstants.OPENSEARCH_RESOURCE_SHARING_ENABLED, ConfigConstants.OPENSEARCH_RESOURCE_SHARING_ENABLED_DEFAULT);
}

@Override
Expand Down Expand Up @@ -142,7 +146,8 @@ protected void doExecute(Task task, DeleteConfigRequest request, ActionListener<
client,
clusterService,
xContentRegistry,
configTypeClass
configTypeClass,
resourceSharingEnabled
);
} catch (Exception e) {
LOG.error(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.opensearch.action.get.MultiGetResponse;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.ad.constant.ConfigConstants;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.CheckedConsumer;
Expand Down Expand Up @@ -101,6 +102,7 @@ public abstract class BaseGetConfigTransportAction<GetConfigResponseType extends
private final String singleStreamHistoricalTaskname;
private final String hcHistoricalTaskName;
private final TaskProfileRunnerType taskProfileRunner;
private final boolean resourceSharingEnabled;

public BaseGetConfigTransportAction(
TransportService transportService,
Expand Down Expand Up @@ -154,6 +156,8 @@ public BaseGetConfigTransportAction(
this.hcHistoricalTaskName = hcHistoricalTaskName;
this.singleStreamHistoricalTaskname = singleStreamHistoricalTaskname;
this.taskProfileRunner = taskProfileRunner;
this.resourceSharingEnabled = settings
.getAsBoolean(ConfigConstants.OPENSEARCH_RESOURCE_SHARING_ENABLED, ConfigConstants.OPENSEARCH_RESOURCE_SHARING_ENABLED_DEFAULT);
}

@Override
Expand All @@ -172,7 +176,8 @@ public void doExecute(Task task, ActionRequest request, ActionListener<GetConfig
client,
clusterService,
xContentRegistry,
configTypeClass
configTypeClass,
resourceSharingEnabled
);
} catch (Exception e) {
LOG.error(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.action.ActionType;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.ad.constant.ConfigConstants;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
Expand Down Expand Up @@ -53,6 +54,7 @@ public abstract class BaseJobTransportAction<IndexType extends Enum<IndexType> &
private final String failtoStopMsg;
private final Class<? extends Config> configClass;
private final IndexJobActionHandlerType indexJobActionHandlerType;
private final boolean resourceSharingEnabled;

public BaseJobTransportAction(
TransportService transportService,
Expand Down Expand Up @@ -82,6 +84,8 @@ public BaseJobTransportAction(
this.failtoStopMsg = failtoStopMsg;
this.configClass = configClass;
this.indexJobActionHandlerType = indexJobActionHandlerType;
this.resourceSharingEnabled = settings
.getAsBoolean(ConfigConstants.OPENSEARCH_RESOURCE_SHARING_ENABLED, ConfigConstants.OPENSEARCH_RESOURCE_SHARING_ENABLED_DEFAULT);
}

@Override
Expand All @@ -106,7 +110,8 @@ protected void doExecute(Task task, JobRequest request, ActionListener<JobRespon
client,
clusterService,
xContentRegistry,
configClass
configClass,
resourceSharingEnabled
);
} catch (Exception e) {
logger.error(e);
Expand Down
Loading

0 comments on commit a5ded4b

Please sign in to comment.