Skip to content

Commit

Permalink
Revert "revert some test cases"
Browse files Browse the repository at this point in the history
This reverts commit fb3ab74.
  • Loading branch information
jerqi committed Oct 28, 2024
1 parent e956976 commit e712257
Showing 1 changed file with 224 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@
import org.apache.gravitino.Configs;
import org.apache.gravitino.MetadataObject;
import org.apache.gravitino.MetadataObjects;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.auth.AuthConstants;
import org.apache.gravitino.auth.AuthenticatorType;
import org.apache.gravitino.authorization.Owner;
import org.apache.gravitino.authorization.Privileges;
import org.apache.gravitino.authorization.SecurableObject;
import org.apache.gravitino.authorization.SecurableObjects;
Expand Down Expand Up @@ -93,6 +93,8 @@ public class RangerHiveE2EIT extends BaseIT {

private static final String SQL_CREATE_SCHEMA = String.format("CREATE DATABASE %s", schemaName);

private static final String SQL_DROP_SCHEMA = String.format("DROP DATABASE %s", schemaName);

private static final String SQL_USE_SCHEMA = String.format("USE SCHEMA %s", schemaName);

private static final String SQL_CREATE_TABLE =
Expand All @@ -112,6 +114,12 @@ public class RangerHiveE2EIT extends BaseIT {
private static final String SQL_ALTER_TABLE =
String.format("ALTER TABLE %s ADD COLUMN d string", tableName);

private static final String SQL_RENAME_TABLE =
String.format("ALTER TABLE %s RENAME TO new_table", tableName);

private static final String SQL_RENAME_BACK_TABLE =
String.format("ALTER TABLE new_table RENAME TO %s", tableName);

private static final String SQL_DROP_TABLE = String.format("DROP TABLE %s", tableName);

private static String RANGER_ADMIN_URL = null;
Expand Down Expand Up @@ -289,7 +297,6 @@ void testCreateTable() throws InterruptedException {
AccessControlException.class, () -> sparkSession.sql(SQL_SELECT_TABLE).collectAsList());

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, true);
metalake.deleteRole(createTableRole);
metalake.deleteRole(createSchemaRole);
Expand Down Expand Up @@ -339,7 +346,6 @@ void testReadWriteTable() throws InterruptedException {
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_DROP_TABLE));

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, true);
metalake.deleteRole(readWriteRole);
}
Expand Down Expand Up @@ -387,7 +393,6 @@ void testReadOnlyTable() throws InterruptedException {
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_DROP_TABLE));

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, true);
metalake.deleteRole(readOnlyRole);
}
Expand Down Expand Up @@ -436,7 +441,6 @@ void testWriteOnlyTable() throws InterruptedException {
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_DROP_TABLE));

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, true);
metalake.deleteRole(readOnlyRole);
}
Expand All @@ -452,13 +456,10 @@ void testCreateAllPrivilegesRole() throws InterruptedException {
Privileges.UseCatalog.allow(),
Privileges.UseSchema.allow(),
Privileges.CreateSchema.allow(),
Privileges.CreateFileset.allow(),
Privileges.ReadFileset.allow(),
Privileges.WriteFileset.allow(),
Privileges.CreateTopic.allow(),
Privileges.ConsumeTopic.allow(),
Privileges.ProduceTopic.allow(),
Privileges.CreateTable.allow(),
Privileges.SelectTable.allow(),
Privileges.ModifyTable.allow(),
Privileges.ManageUsers.allow(),
Expand All @@ -472,15 +473,10 @@ void testCreateAllPrivilegesRole() throws InterruptedException {

waitForUpdatingPolicies();

// Test to create a schema
// Test to use the catalog
sparkSession.sql(SQL_CREATE_SCHEMA);

// Test to creat a table
sparkSession.sql(SQL_USE_SCHEMA);
sparkSession.sql(SQL_CREATE_TABLE);

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, true);
metalake.deleteRole(roleName);
}
Expand Down Expand Up @@ -528,6 +524,199 @@ void testDeleteAndRecreateRole() throws InterruptedException {
metalake.deleteRole(roleName);
}

@Test
void testDeleteAndRecreateMetadataObject() throws InterruptedException {
// Create a role with CREATE_SCHEMA privilege
String roleName = "createSchemaRole";
SecurableObject securableObject =
SecurableObjects.parse(
String.format("%s", catalogName),
MetadataObject.Type.CATALOG,
Lists.newArrayList(Privileges.CreateSchema.allow()));
metalake.createRole(roleName, Collections.emptyMap(), Lists.newArrayList(securableObject));

// Granted this role to the spark execution user `HADOOP_USER_NAME`
String userName1 = System.getenv(HADOOP_USER_NAME);
metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);
waitForUpdatingPolicies();

// Create a schema
sparkSession.sql(SQL_CREATE_SCHEMA);

// Set owner
MetadataObject schemaObject =
MetadataObjects.of(catalogName, schemaName, MetadataObject.Type.SCHEMA);
metalake.setOwner(schemaObject, userName1, Owner.Type.USER);
waitForUpdatingPolicies();

// Delete a schema
sparkSession.sql(SQL_DROP_SCHEMA);

// Recreate a schema
sparkSession.sql(SQL_CREATE_SCHEMA);

// Clean up
catalog.asSchemas().dropSchema(schemaName, true);
metalake.deleteRole(roleName);
}

void testRenameMetadataObject() throws InterruptedException {
// Create a role with CREATE_SCHEMA and CREATE_TABLE privilege
String roleName = "createRole";
SecurableObject securableObject =
SecurableObjects.parse(
String.format("%s", catalogName),
MetadataObject.Type.CATALOG,
Lists.newArrayList(
Privileges.UseCatalog.allow(),
Privileges.CreateSchema.allow(),
Privileges.CreateTable.allow()));
metalake.createRole(roleName, Collections.emptyMap(), Lists.newArrayList(securableObject));
// Granted this role to the spark execution user `HADOOP_USER_NAME`
String userName1 = System.getenv(HADOOP_USER_NAME);
metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);

waitForUpdatingPolicies();

// Create a schema and a table
sparkSession.sql(SQL_CREATE_SCHEMA);
sparkSession.sql(SQL_CREATE_TABLE);

// Rename a table and rename back
sparkSession.sql(SQL_RENAME_TABLE);
sparkSession.sql(SQL_RENAME_BACK_TABLE);
sparkSession.sql(SQL_DROP_TABLE);

// Clean up
catalog.asSchemas().dropSchema(schemaName, true);
}

@Test
void testChangeOwner() throws InterruptedException {
// Create a schema and a table
String helperRole = "helperRole";
SecurableObject securableObject =
SecurableObjects.ofMetalake(
metalakeName,
Lists.newArrayList(
Privileges.UseSchema.allow(),
Privileges.CreateSchema.allow(),
Privileges.CreateTable.allow(),
Privileges.ModifyTable.allow()));
String userName1 = System.getenv(HADOOP_USER_NAME);
metalake.createRole(helperRole, Collections.emptyMap(), Lists.newArrayList(securableObject));
metalake.grantRolesToUser(Lists.newArrayList(helperRole), userName1);
waitForUpdatingPolicies();

// Create a schema and a table
sparkSession.sql(SQL_CREATE_SCHEMA);
sparkSession.sql(SQL_USE_SCHEMA);
sparkSession.sql(SQL_CREATE_TABLE);
sparkSession.sql(SQL_INSERT_TABLE);

metalake.revokeRolesFromUser(Lists.newArrayList(helperRole), userName1);
metalake.deleteRole(helperRole);
waitForUpdatingPolicies();

// case 1. Have none of privileges of the table

// - a. Fail to insert data into the table
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_INSERT_TABLE));

// - b. Fail to select data from the table
Assertions.assertThrows(
AccessControlException.class, () -> sparkSession.sql(SQL_SELECT_TABLE).collectAsList());

// - c: Fail to update data in the table
Assertions.assertThrows(
SparkUnsupportedOperationException.class, () -> sparkSession.sql(SQL_UPDATE_TABLE));

// - d: Fail to delete data from the table
Assertions.assertThrows(AnalysisException.class, () -> sparkSession.sql(SQL_DELETE_TABLE));

// - e: Fail to alter the table
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_ALTER_TABLE));

// - f: Fail to drop the table
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_DROP_TABLE));

// case 2. user is the table owner
MetadataObject tableObject =
MetadataObjects.of(
Lists.newArrayList(catalogName, schemaName, tableName), MetadataObject.Type.TABLE);
metalake.setOwner(tableObject, userName1, Owner.Type.USER);
waitForUpdatingPolicies();

// Owner has all the privileges except for creating table
checkTableAllPrivilegesExceptForCreating();

// Fail to create the table
// Assertions.assertThrows(AccessControlException.class, () ->
// sparkSession.sql(SQL_CREATE_TABLE));

// Remove the owner of the table
metalake.setOwner(tableObject, AuthConstants.ANONYMOUS_USER, Owner.Type.USER);

// case 3. user is the schema owner
MetadataObject schemaObject =
MetadataObjects.of(catalogName, schemaName, MetadataObject.Type.SCHEMA);
metalake.setOwner(schemaObject, userName1, Owner.Type.USER);
waitForUpdatingPolicies();

// Succeed to create a table
sparkSession.sql(SQL_CREATE_TABLE);

// Succeed to check other table privileges
checkTableAllPrivilegesExceptForCreating();

// Succeed to drop schema
sparkSession.sql(SQL_DROP_SCHEMA);

// Fail to create schema
// Assertions.assertThrows(AccessControlException.class, () ->
// sparkSession.sql(SQL_CREATE_SCHEMA));

// Remove the owner
metalake.setOwner(schemaObject, AuthConstants.ANONYMOUS_USER, Owner.Type.USER);

// case 4. user is the catalog owner
MetadataObject catalogObject =
MetadataObjects.of(null, catalogName, MetadataObject.Type.CATALOG);
metalake.setOwner(catalogObject, userName1, Owner.Type.USER);
waitForUpdatingPolicies();

// Succeed to create a schema
sparkSession.sql(SQL_CREATE_SCHEMA);

// Succeed to create a table
sparkSession.sql(SQL_CREATE_TABLE);

// Succeed to check other table privileges
checkTableAllPrivilegesExceptForCreating();

// Succeed to drop schema
sparkSession.sql(SQL_DROP_SCHEMA);

metalake.setOwner(catalogObject, AuthConstants.ANONYMOUS_USER, Owner.Type.USER);
// case 5. user is the metalake owner
MetadataObject metalakeObject =
MetadataObjects.of(null, metalakeName, MetadataObject.Type.METALAKE);
metalake.setOwner(metalakeObject, userName1, Owner.Type.USER);
waitForUpdatingPolicies();

// Succeed to create a schema
sparkSession.sql(SQL_CREATE_SCHEMA);

// Succeed to create a table
sparkSession.sql(SQL_CREATE_TABLE);

// Succeed to check other table privileges
checkTableAllPrivilegesExceptForCreating();

// Succeed to drop schema
sparkSession.sql(SQL_DROP_SCHEMA);
}

@Test
void testAllowUseSchemaPrivilege() throws InterruptedException {
// Create a role with CREATE_SCHEMA privilege
Expand Down Expand Up @@ -582,7 +771,6 @@ void testAllowUseSchemaPrivilege() throws InterruptedException {
1, rows2.stream().filter(row -> row.getString(0).equals(schemaName)).count());

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, true);
metalake.revokeRolesFromUser(Lists.newArrayList(roleName), userName1);
metalake.deleteRole(roleName);
Expand Down Expand Up @@ -624,6 +812,27 @@ private static void createCatalog() {
LOG.info("Catalog created: {}", catalog);
}

private void checkTableAllPrivilegesExceptForCreating() {
// - a. Succeed to insert data into the table
sparkSession.sql(SQL_INSERT_TABLE);

// - b. Succeed to select data from the table
sparkSession.sql(SQL_SELECT_TABLE).collectAsList();

// - c: Fail to update data in the table. Because Hive doesn't support
Assertions.assertThrows(
SparkUnsupportedOperationException.class, () -> sparkSession.sql(SQL_UPDATE_TABLE));

// - d: Fail to delete data from the table, Because Hive doesn't support
Assertions.assertThrows(AnalysisException.class, () -> sparkSession.sql(SQL_DELETE_TABLE));

// - e: Succeed to alter the table
sparkSession.sql(SQL_ALTER_TABLE);

// - f: Succeed to drop the table
sparkSession.sql(SQL_DROP_TABLE);
}

private static void waitForUpdatingPolicies() throws InterruptedException {
// After Ranger authorization, Must wait a period of time for the Ranger Spark plugin to update
// the policy Sleep time must be greater than the policy update interval
Expand Down

0 comments on commit e712257

Please sign in to comment.