Skip to content

Commit

Permalink
[#5146] fix(core): Support to rename and delete metadata object in th…
Browse files Browse the repository at this point in the history
…e authorization plugin
  • Loading branch information
jerqi committed Oct 28, 2024
1 parent 8b7709b commit 08d412e
Show file tree
Hide file tree
Showing 21 changed files with 377 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@
import org.apache.gravitino.Configs;
import org.apache.gravitino.MetadataObject;
import org.apache.gravitino.MetadataObjects;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.auth.AuthConstants;
import org.apache.gravitino.auth.AuthenticatorType;
import org.apache.gravitino.authorization.Owner;
import org.apache.gravitino.authorization.Privileges;
import org.apache.gravitino.authorization.SecurableObject;
import org.apache.gravitino.authorization.SecurableObjects;
Expand Down Expand Up @@ -93,6 +93,8 @@ public class RangerHiveE2EIT extends BaseIT {

private static final String SQL_CREATE_SCHEMA = String.format("CREATE DATABASE %s", schemaName);

private static final String SQL_DROP_SCHEMA = String.format("DROP DATABASE %s", schemaName);

private static final String SQL_USE_SCHEMA = String.format("USE SCHEMA %s", schemaName);

private static final String SQL_CREATE_TABLE =
Expand All @@ -112,6 +114,12 @@ public class RangerHiveE2EIT extends BaseIT {
private static final String SQL_ALTER_TABLE =
String.format("ALTER TABLE %s ADD COLUMN d string", tableName);

private static final String SQL_RENAME_TABLE =
String.format("ALTER TABLE %s RENAME TO new_table", tableName);

private static final String SQL_RENAME_BACK_TABLE =
String.format("ALTER TABLE new_table RENAME TO %s", tableName);

private static final String SQL_DROP_TABLE = String.format("DROP TABLE %s", tableName);

private static String RANGER_ADMIN_URL = null;
Expand Down Expand Up @@ -289,7 +297,6 @@ void testCreateTable() throws InterruptedException {
AccessControlException.class, () -> sparkSession.sql(SQL_SELECT_TABLE).collectAsList());

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, true);
metalake.deleteRole(createTableRole);
metalake.deleteRole(createSchemaRole);
Expand Down Expand Up @@ -339,7 +346,6 @@ void testReadWriteTable() throws InterruptedException {
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_DROP_TABLE));

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, true);
metalake.deleteRole(readWriteRole);
}
Expand Down Expand Up @@ -387,7 +393,6 @@ void testReadOnlyTable() throws InterruptedException {
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_DROP_TABLE));

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, true);
metalake.deleteRole(readOnlyRole);
}
Expand Down Expand Up @@ -436,7 +441,6 @@ void testWriteOnlyTable() throws InterruptedException {
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_DROP_TABLE));

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, true);
metalake.deleteRole(readOnlyRole);
}
Expand All @@ -452,13 +456,10 @@ void testCreateAllPrivilegesRole() throws InterruptedException {
Privileges.UseCatalog.allow(),
Privileges.UseSchema.allow(),
Privileges.CreateSchema.allow(),
Privileges.CreateFileset.allow(),
Privileges.ReadFileset.allow(),
Privileges.WriteFileset.allow(),
Privileges.CreateTopic.allow(),
Privileges.ConsumeTopic.allow(),
Privileges.ProduceTopic.allow(),
Privileges.CreateTable.allow(),
Privileges.SelectTable.allow(),
Privileges.ModifyTable.allow(),
Privileges.ManageUsers.allow(),
Expand All @@ -472,15 +473,10 @@ void testCreateAllPrivilegesRole() throws InterruptedException {

waitForUpdatingPolicies();

// Test to create a schema
// Test to use the catalog
sparkSession.sql(SQL_CREATE_SCHEMA);

// Test to creat a table
sparkSession.sql(SQL_USE_SCHEMA);
sparkSession.sql(SQL_CREATE_TABLE);

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, true);
metalake.deleteRole(roleName);
}
Expand Down Expand Up @@ -528,6 +524,199 @@ void testDeleteAndRecreateRole() throws InterruptedException {
metalake.deleteRole(roleName);
}

@Test
void testDeleteAndRecreateMetadataObject() throws InterruptedException {
// Create a role with CREATE_SCHEMA privilege
String roleName = "createSchemaRole";
SecurableObject securableObject =
SecurableObjects.parse(
String.format("%s", catalogName),
MetadataObject.Type.CATALOG,
Lists.newArrayList(Privileges.CreateSchema.allow()));
metalake.createRole(roleName, Collections.emptyMap(), Lists.newArrayList(securableObject));

// Granted this role to the spark execution user `HADOOP_USER_NAME`
String userName1 = System.getenv(HADOOP_USER_NAME);
metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);
waitForUpdatingPolicies();

// Create a schema
sparkSession.sql(SQL_CREATE_SCHEMA);

// Set owner
MetadataObject schemaObject =
MetadataObjects.of(catalogName, schemaName, MetadataObject.Type.SCHEMA);
metalake.setOwner(schemaObject, userName1, Owner.Type.USER);
waitForUpdatingPolicies();

// Delete a schema
sparkSession.sql(SQL_DROP_SCHEMA);

// Recreate a schema
sparkSession.sql(SQL_CREATE_SCHEMA);

// Clean up
catalog.asSchemas().dropSchema(schemaName, true);
metalake.deleteRole(roleName);
}

void testRenameMetadataObject() throws InterruptedException {
// Create a role with CREATE_SCHEMA and CREATE_TABLE privilege
String roleName = "createRole";
SecurableObject securableObject =
SecurableObjects.parse(
String.format("%s", catalogName),
MetadataObject.Type.CATALOG,
Lists.newArrayList(
Privileges.UseCatalog.allow(),
Privileges.CreateSchema.allow(),
Privileges.CreateTable.allow()));
metalake.createRole(roleName, Collections.emptyMap(), Lists.newArrayList(securableObject));
// Granted this role to the spark execution user `HADOOP_USER_NAME`
String userName1 = System.getenv(HADOOP_USER_NAME);
metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);

waitForUpdatingPolicies();

// Create a schema and a table
sparkSession.sql(SQL_CREATE_SCHEMA);
sparkSession.sql(SQL_CREATE_TABLE);

// Rename a table and rename back
sparkSession.sql(SQL_RENAME_TABLE);
sparkSession.sql(SQL_RENAME_BACK_TABLE);
sparkSession.sql(SQL_DROP_TABLE);

// Clean up
catalog.asSchemas().dropSchema(schemaName, true);
}

@Test
void testChangeOwner() throws InterruptedException {
// Create a schema and a table
String helperRole = "helperRole";
SecurableObject securableObject =
SecurableObjects.ofMetalake(
metalakeName,
Lists.newArrayList(
Privileges.UseSchema.allow(),
Privileges.CreateSchema.allow(),
Privileges.CreateTable.allow(),
Privileges.ModifyTable.allow()));
String userName1 = System.getenv(HADOOP_USER_NAME);
metalake.createRole(helperRole, Collections.emptyMap(), Lists.newArrayList(securableObject));
metalake.grantRolesToUser(Lists.newArrayList(helperRole), userName1);
waitForUpdatingPolicies();

// Create a schema and a table
sparkSession.sql(SQL_CREATE_SCHEMA);
sparkSession.sql(SQL_USE_SCHEMA);
sparkSession.sql(SQL_CREATE_TABLE);
sparkSession.sql(SQL_INSERT_TABLE);

metalake.revokeRolesFromUser(Lists.newArrayList(helperRole), userName1);
metalake.deleteRole(helperRole);
waitForUpdatingPolicies();

// case 1. Have none of privileges of the table

// - a. Fail to insert data into the table
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_INSERT_TABLE));

// - b. Fail to select data from the table
Assertions.assertThrows(
AccessControlException.class, () -> sparkSession.sql(SQL_SELECT_TABLE).collectAsList());

// - c: Fail to update data in the table
Assertions.assertThrows(
SparkUnsupportedOperationException.class, () -> sparkSession.sql(SQL_UPDATE_TABLE));

// - d: Fail to delete data from the table
Assertions.assertThrows(AnalysisException.class, () -> sparkSession.sql(SQL_DELETE_TABLE));

// - e: Fail to alter the table
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_ALTER_TABLE));

// - f: Fail to drop the table
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_DROP_TABLE));

// case 2. user is the table owner
MetadataObject tableObject =
MetadataObjects.of(
Lists.newArrayList(catalogName, schemaName, tableName), MetadataObject.Type.TABLE);
metalake.setOwner(tableObject, userName1, Owner.Type.USER);
waitForUpdatingPolicies();

// Owner has all the privileges except for creating table
checkTableAllPrivilegesExceptForCreating();

// Fail to create the table
// Assertions.assertThrows(AccessControlException.class, () ->
// sparkSession.sql(SQL_CREATE_TABLE));

// Remove the owner of the table
metalake.setOwner(tableObject, AuthConstants.ANONYMOUS_USER, Owner.Type.USER);

// case 3. user is the schema owner
MetadataObject schemaObject =
MetadataObjects.of(catalogName, schemaName, MetadataObject.Type.SCHEMA);
metalake.setOwner(schemaObject, userName1, Owner.Type.USER);
waitForUpdatingPolicies();

// Succeed to create a table
sparkSession.sql(SQL_CREATE_TABLE);

// Succeed to check other table privileges
checkTableAllPrivilegesExceptForCreating();

// Succeed to drop schema
sparkSession.sql(SQL_DROP_SCHEMA);

// Fail to create schema
// Assertions.assertThrows(AccessControlException.class, () ->
// sparkSession.sql(SQL_CREATE_SCHEMA));

// Remove the owner
metalake.setOwner(schemaObject, AuthConstants.ANONYMOUS_USER, Owner.Type.USER);

// case 4. user is the catalog owner
MetadataObject catalogObject =
MetadataObjects.of(null, catalogName, MetadataObject.Type.CATALOG);
metalake.setOwner(catalogObject, userName1, Owner.Type.USER);
waitForUpdatingPolicies();

// Succeed to create a schema
sparkSession.sql(SQL_CREATE_SCHEMA);

// Succeed to create a table
sparkSession.sql(SQL_CREATE_TABLE);

// Succeed to check other table privileges
checkTableAllPrivilegesExceptForCreating();

// Succeed to drop schema
sparkSession.sql(SQL_DROP_SCHEMA);

metalake.setOwner(catalogObject, AuthConstants.ANONYMOUS_USER, Owner.Type.USER);
// case 5. user is the metalake owner
MetadataObject metalakeObject =
MetadataObjects.of(null, metalakeName, MetadataObject.Type.METALAKE);
metalake.setOwner(metalakeObject, userName1, Owner.Type.USER);
waitForUpdatingPolicies();

// Succeed to create a schema
sparkSession.sql(SQL_CREATE_SCHEMA);

// Succeed to create a table
sparkSession.sql(SQL_CREATE_TABLE);

// Succeed to check other table privileges
checkTableAllPrivilegesExceptForCreating();

// Succeed to drop schema
sparkSession.sql(SQL_DROP_SCHEMA);
}

@Test
void testAllowUseSchemaPrivilege() throws InterruptedException {
// Create a role with CREATE_SCHEMA privilege
Expand Down Expand Up @@ -582,7 +771,6 @@ void testAllowUseSchemaPrivilege() throws InterruptedException {
1, rows2.stream().filter(row -> row.getString(0).equals(schemaName)).count());

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, true);
metalake.revokeRolesFromUser(Lists.newArrayList(roleName), userName1);
metalake.deleteRole(roleName);
Expand Down Expand Up @@ -624,6 +812,27 @@ private static void createCatalog() {
LOG.info("Catalog created: {}", catalog);
}

private void checkTableAllPrivilegesExceptForCreating() {
// - a. Succeed to insert data into the table
sparkSession.sql(SQL_INSERT_TABLE);

// - b. Succeed to select data from the table
sparkSession.sql(SQL_SELECT_TABLE).collectAsList();

// - c: Fail to update data in the table. Because Hive doesn't support
Assertions.assertThrows(
SparkUnsupportedOperationException.class, () -> sparkSession.sql(SQL_UPDATE_TABLE));

// - d: Fail to delete data from the table, Because Hive doesn't support
Assertions.assertThrows(AnalysisException.class, () -> sparkSession.sql(SQL_DELETE_TABLE));

// - e: Succeed to alter the table
sparkSession.sql(SQL_ALTER_TABLE);

// - f: Succeed to drop the table
sparkSession.sql(SQL_DROP_TABLE);
}

private static void waitForUpdatingPolicies() throws InterruptedException {
// After Ranger authorization, Must wait a period of time for the Ranger Spark plugin to update
// the policy Sleep time must be greater than the policy update interval
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ enum Type {
/** Role and user relationship */
ROLE_USER_REL,
/** Role and group relationship */
ROLE_GROUP_REL
ROLE_GROUP_REL,
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ Role createRole(
* Lists the role names associated the metadata object.
*
* @param metalake The Metalake of the Role.
* @param object The object of the Roles.
* @return The role list.
* @throws NoSuchMetalakeException If the Metalake with the given name does not exist.
* @throws NoSuchMetadataObjectException If the Metadata object with the given name does not
Expand Down
Loading

0 comments on commit 08d412e

Please sign in to comment.