Skip to content

Commit

Permalink
Updates final methods to be async calss
Browse files Browse the repository at this point in the history
Signed-off-by: Darshit Chanpura <[email protected]>
  • Loading branch information
DarshitChanpura committed Jan 17, 2025
1 parent b5f2961 commit f862223
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public void initializeRecipientTypes() {
/**
* Returns a set of accessible resource IDs for the current user within the specified resource index.
* @param resourceIndex The resource index to check for accessible resources.
* @return A set of accessible resource IDs.
* @param listener The listener to be notified with the set of accessible resource IDs.
*
*/
public void getAccessibleResourceIdsForCurrentUser(String resourceIndex, ActionListener<Set<String>> listener) {
final UserSubjectImpl userSubject = (UserSubjectImpl) threadContext.getPersistent(
Expand Down Expand Up @@ -134,19 +135,37 @@ public void onFailure(Exception e) {
loadOwnResources(resourceIndex, user.getName(), ownResourcesListener);

// Load resources shared with the user by its name.
ownResourcesListener.whenComplete(ownResources -> {
loadSharedWithResources(resourceIndex, Set.of(user.getName()), Recipient.USERS.toString(), userNameResourcesListener);
}, listener::onFailure);
ownResourcesListener.whenComplete(
ownResources -> loadSharedWithResources(
resourceIndex,
Set.of(user.getName()),
Recipient.USERS.toString(),
userNameResourcesListener
),
listener::onFailure
);

// Load resources shared with the user’s roles.
userNameResourcesListener.whenComplete(userNameResources -> {
loadSharedWithResources(resourceIndex, user.getSecurityRoles(), Recipient.ROLES.toString(), rolesResourcesListener);
}, listener::onFailure);
userNameResourcesListener.whenComplete(
userNameResources -> loadSharedWithResources(
resourceIndex,
user.getSecurityRoles(),
Recipient.ROLES.toString(),
rolesResourcesListener
),
listener::onFailure
);

// Load resources shared with the user’s backend roles.
rolesResourcesListener.whenComplete(rolesResources -> {
loadSharedWithResources(resourceIndex, user.getRoles(), Recipient.BACKEND_ROLES.toString(), backendRolesResourcesListener);
}, listener::onFailure);
rolesResourcesListener.whenComplete(
rolesResources -> loadSharedWithResources(
resourceIndex,
user.getRoles(),
Recipient.BACKEND_ROLES.toString(),
backendRolesResourcesListener
),
listener::onFailure
);

// Combine all results and pass them back to the original listener.
backendRolesResourcesListener.whenComplete(backendRolesResources -> {
Expand All @@ -167,29 +186,36 @@ public void onFailure(Exception e) {
* Returns a set of accessible resources for the current user within the specified resource index.
*
* @param resourceIndex The resource index to check for accessible resources.
* @param listener The listener to be notified with the set of accessible resources.
*/
@SuppressWarnings("unchecked")
public <T extends Resource> void getAccessibleResourcesForCurrentUser(String resourceIndex, ActionListener<Set<T>> listener) {
try {
validateArguments(resourceIndex);

ResourceParser<T> parser = OpenSearchSecurityPlugin.getResourceProviders().get(resourceIndex).getResourceParser();
Set<String> resourceIds = getAccessibleResourceIdsForCurrentUser(resourceIndex);

if (resourceIds.isEmpty()) {
listener.onResponse(Set.of());
return;
}
StepListener<Set<String>> resourceIdsListener = new StepListener<>();
StepListener<Set<T>> resourcesListener = new StepListener<>();

this.resourceSharingIndexHandler.getResourceDocumentsFromIds(
resourceIds,
resourceIndex,
parser,
ActionListener.wrap(
listener::onResponse,
exception -> listener.onFailure(
new ResourceSharingException("Failed to get accessible resources: " + exception.getMessage(), exception)
)
)
// Fetch resource IDs
getAccessibleResourceIdsForCurrentUser(resourceIndex, resourceIdsListener);

// Fetch docs
resourceIdsListener.whenComplete(resourceIds -> {
if (resourceIds.isEmpty()) {
// No accessible resources => immediately respond with empty set
listener.onResponse(Collections.emptySet());
} else {
// Fetch the resource documents asynchronously
this.resourceSharingIndexHandler.getResourceDocumentsFromIds(resourceIds, resourceIndex, parser, resourcesListener);
}
}, listener::onFailure);

// Send final response
resourcesListener.whenComplete(
listener::onResponse,
ex -> listener.onFailure(new ResourceSharingException("Failed to get accessible resources: " + ex.getMessage(), ex))
);
} catch (Exception e) {
listener.onFailure(new ResourceSharingException("Failed to process accessible resources request: " + e.getMessage(), e));
Expand All @@ -202,6 +228,7 @@ public <T extends Resource> void getAccessibleResourcesForCurrentUser(String res
* @param resourceId The resource ID to check access for.
* @param resourceIndex The resource index containing the resource.
* @param scope The permission scope to check.
* @param listener The listener to be notified with the permission check result.
*/
public void hasPermission(String resourceId, String resourceIndex, String scope, ActionListener<Boolean> listener) {
validateArguments(resourceId, resourceIndex, scope);
Expand Down Expand Up @@ -263,6 +290,7 @@ public void hasPermission(String resourceId, String resourceIndex, String scope,
* @param resourceId The resource ID to share.
* @param resourceIndex The index where resource is store
* @param shareWith The users, roles, and backend roles as well as scope to share the resource with.
* @param listener The listener to be notified with the updated ResourceSharing document.
*/
public void shareWith(String resourceId, String resourceIndex, ShareWith shareWith, ActionListener<ResourceSharing> listener) {
validateArguments(resourceId, resourceIndex, shareWith);
Expand All @@ -274,7 +302,7 @@ public void shareWith(String resourceId, String resourceIndex, ShareWith shareWi

if (user == null) {
LOGGER.warn("No authenticated user found in the ThreadContext.");
listener.onFailure(new OpenSearchException("No authenticated user found."));
listener.onFailure(new ResourceSharingException("No authenticated user found."));
return;
}

Expand Down Expand Up @@ -309,7 +337,7 @@ public void shareWith(String resourceId, String resourceIndex, ShareWith shareWi
* @param resourceIndex The index where resource is store
* @param revokeAccess The users, roles, and backend roles to revoke access for.
* @param scopes The permission scopes to revoke access for.
* @return The updated ResourceSharing document.
* @param listener The listener to be notified with the updated ResourceSharing document.
*/
public void revokeAccess(
String resourceId,
Expand Down Expand Up @@ -360,6 +388,7 @@ public void revokeAccess(
* Deletes a resource sharing record by its ID and the resource index it belongs to.
* @param resourceId The resource ID to delete.
* @param resourceIndex The resource index containing the resource.
* @param listener The listener to be notified with the deletion result.
*/
public void deleteResourceSharingRecord(String resourceId, String resourceIndex, ActionListener<Boolean> listener) {
try {
Expand All @@ -378,17 +407,21 @@ public void deleteResourceSharingRecord(String resourceId, String resourceIndex,
user.getName()
);
} else {
LOGGER.info("Deleting resource sharing record for resource {} in {} with no authenticated user", resourceId, resourceIndex);
listener.onFailure(new ResourceSharingException("No authenticated user available."));
}

resourceSharingIndexHandler.fetchDocumentById(resourceIndex, resourceId, ActionListener.wrap(document -> {
StepListener<ResourceSharing> fetchDocListener = new StepListener<>();
StepListener<Boolean> deleteDocListener = new StepListener<>();

resourceSharingIndexHandler.fetchDocumentById(resourceIndex, resourceId, fetchDocListener);

fetchDocListener.whenComplete(document -> {
if (document == null) {
LOGGER.info("Document {} does not exist in index {}", resourceId, resourceIndex);
listener.onResponse(false);
return;
}

// Check if the user is allowed to delete
boolean isAdmin = (user != null && adminDNs.isAdmin(user));
boolean isOwner = (user != null && isOwnerOfResource(document, user.getName()));

Expand All @@ -398,23 +431,14 @@ public void deleteResourceSharingRecord(String resourceId, String resourceIndex,
(user == null ? "UNKNOWN" : user.getName()),
resourceId
);
// Not allowed => no deletion
listener.onResponse(false);
return;
} else {
resourceSharingIndexHandler.deleteResourceSharingRecord(resourceId, resourceIndex, deleteDocListener);
}
}, listener::onFailure);

// Finally, perform the actual record deletion (assuming it's a synchronous call)
boolean result = resourceSharingIndexHandler.deleteResourceSharingRecord(resourceId, resourceIndex);
listener.onResponse(result);
}, exception -> {
// If an error happens while fetching
LOGGER.error(
"Failed to fetch resource sharing document for resource {} in index {}. Error: {}",
resourceId,
resourceIndex,
exception.getMessage()
);
listener.onFailure(exception);
}));
deleteDocListener.whenComplete(listener::onResponse, listener::onFailure);
} catch (Exception e) {
LOGGER.error("Failed to delete resource sharing record for resource {}", resourceId, e);
listener.onFailure(e);
Expand All @@ -423,24 +447,37 @@ public void deleteResourceSharingRecord(String resourceId, String resourceIndex,

/**
* Deletes all resource sharing records for the current user.
* @return True if all records were successfully deleted, false otherwise.
* @param listener The listener to be notified with the deletion result.
*/
public boolean deleteAllResourceSharingRecordsForCurrentUser() {

public void deleteAllResourceSharingRecordsForCurrentUser(ActionListener<Boolean> listener) {
final UserSubjectImpl userSubject = (UserSubjectImpl) threadContext.getPersistent(
ConfigConstants.OPENDISTRO_SECURITY_AUTHENTICATED_USER
);
User user = userSubject == null ? null : userSubject.getUser();
LOGGER.info("Deleting all resource sharing records for resource {}", user.getName());
final User user = (userSubject == null) ? null : userSubject.getUser();

return this.resourceSharingIndexHandler.deleteAllRecordsForUser(user.getName());
if (user == null) {
listener.onFailure(new OpenSearchException("No authenticated user available."));
return;
}

LOGGER.info("Deleting all resource sharing records for user {}", user.getName());

resourceSharingIndexHandler.deleteAllRecordsForUser(user.getName(), ActionListener.wrap(listener::onResponse, exception -> {
LOGGER.error(
"Failed to delete all resource sharing records for user {}: {}",
user.getName(),
exception.getMessage(),
exception
);
listener.onFailure(exception);
}));
}

/**
* Loads all resources within the specified resource index.
*
* @param resourceIndex The resource index to load resources from.
* @return A set of resource IDs.
* @param listener The listener to be notified with the set of resource IDs.
*/
private void loadAllResources(String resourceIndex, ActionListener<Set<String>> listener) {
this.resourceSharingIndexHandler.fetchAllDocuments(resourceIndex, listener);
Expand All @@ -451,7 +488,7 @@ private void loadAllResources(String resourceIndex, ActionListener<Set<String>>
*
* @param resourceIndex The resource index to load resources from.
* @param userName The username of the owner.
* @return A set of resource IDs owned by the user.
* @param listener The listener to be notified with the set of resource IDs.
*/
private void loadOwnResources(String resourceIndex, String userName, ActionListener<Set<String>> listener) {
this.resourceSharingIndexHandler.fetchDocumentsByField(resourceIndex, "created_by.user", userName, listener);
Expand All @@ -463,7 +500,7 @@ private void loadOwnResources(String resourceIndex, String userName, ActionListe
* @param resourceIndex The resource index to load resources from.
* @param entities The set of entities to check for shared resources.
* @param recipientType The type of entity (e.g., users, roles, backend_roles).
* @return A set of resource IDs shared with the specified entities.
* @param listener The listener to be notified with the set of resource IDs.
*/
private void loadSharedWithResources(
String resourceIndex,
Expand Down
Loading

0 comments on commit f862223

Please sign in to comment.