diff --git a/app.arc b/app.arc index f4de4c93e..bb64bab2b 100644 --- a/app.arc +++ b/app.arc @@ -95,8 +95,7 @@ legacy_users PointInTimeRecovery true kafka_acls - topicName *String - cognitoGroup **String + aclId *String PointInTimeRecovery true kafka_acl_log @@ -154,9 +153,8 @@ synonyms name synonymsByUuid kafka_acls - cognitoGroup *String - userClientType **String - name aclsByGroup + resourceName *String + name aclsByResourceName @aws runtime nodejs20.x diff --git a/app/lib/kafka.server.ts b/app/lib/kafka.server.ts index 5720ec7a6..831aa0701 100644 --- a/app/lib/kafka.server.ts +++ b/app/lib/kafka.server.ts @@ -8,6 +8,7 @@ import { tables } from '@architect/functions' import { paginateScan } from '@aws-sdk/lib-dynamodb' import type { DynamoDBDocument } from '@aws-sdk/lib-dynamodb' +import crypto from 'crypto' import { Kafka } from 'gcn-kafka' import type { AclEntry } from 'kafkajs' import { @@ -80,20 +81,30 @@ if (process.env.ARC_SANDBOX) { } } -export type KafkaACL = { - topicName: string - userClientType: UserClientType - cognitoGroup: string - prefixed: boolean - permissionType: number +/** + * AclEntry already contains definitions for the following: + * + * principal: string --> 'User:{cognito_group_name}' + * host: string --> '*' + * operation: AclOperationTypes --> Read,Write, etc from enum + * permissionType: AclPermissionTypes --> Allow, Deny, etc from enum + * resourceType: AclResourceTypes --> TOPIC, etc + * resourceName: string --> name of topic: 'gcn.notices.burstcube' + * resourcePatternType: ResourcePatternTypes --> PREFIXED or LITERAL + */ +export type KafkaACL = AclEntry & { + aclId?: string } export type UserClientType = 'producer' | 'consumer' export const adminGroup = 'gcn.nasa.gov/gcn-admin' -const consumerOperations = [AclOperationTypes.READ, AclOperationTypes.DESCRIBE] -const producerOperations = [ +export const consumerOperations = [ + AclOperationTypes.READ, + AclOperationTypes.DESCRIBE, +] +export const producerOperations = [ AclOperationTypes.CREATE, AclOperationTypes.WRITE, AclOperationTypes.DESCRIBE, @@ -112,36 +123,101 @@ function validateUser(user: User) { throw new Response(null, { status: 403 }) } -export async function createKafkaACL(user: User, acl: KafkaACL) { +export async function createKafkaACL( + user: User, + userClientType: UserClientType, + resourceName: string, + group: string, + permissionType: number, + resourceType: number, + includePrefixed: boolean +) { + const acls: KafkaACL[] = + userClientType == 'consumer' + ? consumerOperations.map((operation) => { + return { + resourceName, + principal: `User:${group}`, + host: '*', + operation, // Read, write, etc + permissionType, // Allow, deny etc + resourcePatternType: 3, // LITERAL | PREFIXED + resourceType, + } + }) + : producerOperations.map((operation) => { + return { + resourceName, + principal: `User:${group}`, + host: '*', + operation, // Read, write, etc + permissionType, + resourcePatternType: 3, // LITERAL | PREFIX + resourceType, + } + }) + + if (includePrefixed) { + const prefixedAcls = + userClientType === 'consumer' + ? consumerOperations.map((operation) => { + return { + resourceName, + principal: `User:${group}`, + host: '*', + operation, + permissionType, + resourcePatternType: 4, + resourceType, + } + }) + : producerOperations.map((operation) => { + return { + resourceName, + principal: `User:${group}`, + host: '*', + operation, + permissionType, + resourcePatternType: 4, + resourceType, + } + }) + acls.push(...prefixedAcls) + } + + await createKafkaACLInternal(user, acls) +} + +async function createKafkaACLInternal(user: User, acls: KafkaACL[]) { validateUser(user) // Save to db const db = await tables() - await db.kafka_acls.put(acl) + await Promise.all( + acls.map((acl) => db.kafka_acls.put({ ...acl, aclId: crypto.randomUUID() })) + ) // Add to Kafka const adminClient = adminKafka.admin() await adminClient.connect() - await adminClient.createTopics({ - topics: [ - { - topic: acl.topicName, - }, - ], - }) - const acls = - acl.userClientType == 'producer' - ? createProducerAcls(acl) - : createConsumerAcls(acl) + if (acls.some((acl) => acl.resourceType === AclResourceTypes.TOPIC)) + await Promise.all( + acls + .filter((acl) => acl.resourceType === AclResourceTypes.TOPIC) + .map((acl) => + adminClient.createTopics({ + topics: [ + { + topic: acl.resourceName, + }, + ], + }) + ) + ) + await adminClient.createAcls({ acl: acls }) await adminClient.disconnect() } -export async function getKafkaACLByTopicName(user: User, topicName: string) { - validateUser(user) - const db = await tables() - return (await db.kafka_acls.get({ topicName })) as KafkaACL -} - export async function getKafkaACLsFromDynamoDB(user: User, filter?: string) { validateUser(user) const db = await tables() @@ -152,7 +228,7 @@ export async function getKafkaACLsFromDynamoDB(user: User, filter?: string) { { TableName, FilterExpression: filter - ? 'contains(topicName, :filter) OR contains(cognitoGroup, :filter)' + ? 'contains(resourceName, :filter) OR contains(cognitoGroup, :filter)' : undefined, ExpressionAttributeValues: filter ? { @@ -170,126 +246,63 @@ export async function getKafkaACLsFromDynamoDB(user: User, filter?: string) { return acls } -export async function getKafkaTopicsForUser(user: User) { - validateUser(user) - const userGroups = user.groups.filter((x) => - x.startsWith('gcn.nasa.gov/kafka-') - ) - const db = await tables() - const items = ( - await Promise.all([ - ...userGroups.map((cognitoGroup) => - db.kafka_acls.query({ - IndexName: 'aclsByGroup', - KeyConditionExpression: - 'cognitoGroup = :group AND permissionType = :permission', - ProjectionExpression: 'topicName', - ExpressionAttributeValues: { - ':group': cognitoGroup, - ':permission': 'consumer', - }, - }) - ), - ]) - ) - .filter((x) => x.Count && x.Count > 0) - .flatMap((x) => x.Items) - .map((x) => x.topicName) - - return items -} export async function getAclsFromBrokers() { const adminClient = adminKafka.admin() await adminClient.connect() const acls = await adminClient.describeAcls({ - resourceType: AclResourceTypes.TOPIC, + resourceType: AclResourceTypes.ANY, host: '*', permissionType: AclPermissionTypes.ANY, operation: AclOperationTypes.ANY, resourcePatternType: ResourcePatternTypes.ANY, }) await adminClient.disconnect() + const results: KafkaACL[] = [] for (const item of acls.resources) { console.log('Item:', item) - const topicName = item.resourceName - const prefixed = item.resourcePatternType === ResourcePatternTypes.PREFIXED results.push( - ...item.acls - .filter((acl) => acl.operation !== AclOperationTypes.DESCRIBE) - .map((acl) => { - const principal = acl.principal.split('-') - return { - topicName, - prefixed, - permissionType: acl.permissionType, - cognitoGroup: acl.principal.replace('User:', ''), - userClientType: principal[principal.length - 1] as UserClientType, - } - }) + ...item.acls.map((acl) => { + return { + ...acl, + resourceName: item.resourceName, + resourceType: item.resourceType, + resourcePatternType: item.resourcePatternType, + } + }) ) } return results } -export async function deleteKafkaACL(user: User, acl: KafkaACL) { +export async function deleteKafkaACL(user: User, aclIds: string[]) { validateUser(user) const db = await tables() - await db.kafka_acls.delete({ - topicName: acl.topicName, - cognitoGroup: acl.cognitoGroup, - }) - - const acls = - acl.userClientType == 'producer' - ? createProducerAcls(acl) - : createConsumerAcls(acl) + const acls = await Promise.all( + aclIds.map((aclId) => db.kafka_acls.get({ aclId })) + ) const adminClient = adminKafka.admin() await adminClient.connect() await adminClient.deleteAcls({ filters: acls }) await adminClient.disconnect() -} - -function createProducerAcls(acl: KafkaACL): AclEntry[] { - // Create, Write, and Describe operations - return mapAclAndOperations(acl, producerOperations) -} - -function createConsumerAcls(acl: KafkaACL): AclEntry[] { - // Read and Describe operations - return mapAclAndOperations(acl, consumerOperations) -} -function mapAclAndOperations(acl: KafkaACL, operations: AclOperationTypes[]) { - return operations.map((operation) => { - return { - resourceType: AclResourceTypes.TOPIC, - resourceName: acl.topicName, - resourcePatternType: acl.prefixed - ? ResourcePatternTypes.PREFIXED - : ResourcePatternTypes.LITERAL, - principal: `User:${acl.cognitoGroup}`, - host: '*', - operation, - permissionType: acl.permissionType, - } - }) + await Promise.all( + acls.map((acl) => + db.kafka_acls.delete({ + aclId: acl.aclId, + }) + ) + ) } export async function updateBrokersFromDb(user: User) { const dbDefinedAcls = await getKafkaACLsFromDynamoDB(user) - const mappedAcls = dbDefinedAcls.flatMap((x) => - x.userClientType === 'producer' - ? createProducerAcls(x) - : createConsumerAcls(x) - ) - const adminClient = adminKafka.admin() await adminClient.connect() - await adminClient.createAcls({ acl: mappedAcls }) + await adminClient.createAcls({ acl: dbDefinedAcls }) await adminClient.disconnect() } @@ -297,7 +310,9 @@ export async function updateDbFromBrokers(user: User) { const kafkaDefinedAcls = await getAclsFromBrokers() const db = await tables() await Promise.all([ - ...kafkaDefinedAcls.map((acl) => db.kafka_acls.put(acl)), + ...kafkaDefinedAcls.map((acl) => + db.kafka_acls.put({ ...acl, aclId: crypto.randomUUID() }) + ), db.kafka_acl_log.put({ partitionKey: 1, syncedOn: Date.now(), diff --git a/app/routes/admin.kafka._index.tsx b/app/routes/admin.kafka._index.tsx index 66639e00c..b2b877679 100644 --- a/app/routes/admin.kafka._index.tsx +++ b/app/routes/admin.kafka._index.tsx @@ -64,52 +64,49 @@ export async function action({ request }: ActionFunctionArgs) { return null } - const topicName = getFormDataString(data, 'topicName') - const userClientType = getFormDataString( - data, - 'userClientType' - ) as UserClientType - const permissionTypeString = getFormDataString(data, 'permissionType') - const group = getFormDataString(data, 'group') - const includePrefixed = getFormDataString(data, 'includePrefixed') - if (!topicName || !userClientType || !group || !permissionTypeString) - throw new Response(null, { status: 400 }) - const permissionType = parseInt(permissionTypeString) + const aclId = getFormDataString(data, 'aclId') const promises = [] switch (intent) { case 'delete': - promises.push( - deleteKafkaACL(user, { - topicName, - userClientType, - cognitoGroup: group, - prefixed: topicName.endsWith('.'), - permissionType, - }) - ) + if (!aclId) throw new Response(null, { status: 400 }) + promises.push(deleteKafkaACL(user, [aclId])) break case 'create': + const resourceName = getFormDataString(data, 'resourceName') + const userClientType = getFormDataString( + data, + 'userClientType' + ) as UserClientType + const permissionTypeString = getFormDataString(data, 'permissionType') + const group = getFormDataString(data, 'group') + const includePrefixed = getFormDataString(data, 'includePrefixed') + const resourceTypeString = getFormDataString(data, 'resourceType') + + if ( + !resourceName || + !userClientType || + !group || + !permissionTypeString || + !resourceTypeString + ) + throw new Response(null, { status: 400 }) + + const permissionType = parseInt(permissionTypeString) // Allow, deny + const resourceType = parseInt(resourceTypeString) + promises.push( - createKafkaACL(user, { - topicName, + createKafkaACL( + user, userClientType, - cognitoGroup: group, - prefixed: false, + resourceName, + group, permissionType, - }) + resourceType, + Boolean(includePrefixed) + ) ) - if (includePrefixed) - promises.push( - createKafkaACL(user, { - topicName: `${topicName}.`, - userClientType, - cognitoGroup: group, - prefixed: true, - permissionType, - }) - ) break default: break @@ -208,7 +205,7 @@ export default function Index() { {aclData - .sort((a, b) => a.topicName.localeCompare(b.topicName)) + .sort((a, b) => a.resourceName.localeCompare(b.resourceName)) .map((x, index) => ( ))} @@ -234,16 +231,16 @@ function KafkaAclCard({ acl }: { acl: KafkaACL }) {
- Group: {acl.cognitoGroup} + Group: {acl.principal}
-
+ {/*
Client Type: {acl.userClientType} -
+
*/}
Permission: {permissionMap[acl.permissionType]}
- Topic: {acl.topicName} + Resource: {acl.resourceName}
@@ -270,27 +267,13 @@ function KafkaAclCard({ acl }: { acl: KafkaACL }) { renderToPortal={false} > - - - - + Delete Kafka ACL diff --git a/sandbox-seed.json b/sandbox-seed.json index 4daab0ee7..9c9a94b3e 100644 --- a/sandbox-seed.json +++ b/sandbox-seed.json @@ -5158,15 +5158,15 @@ ], "kafka_acls": [ { - "topicName": "test_topic_created_from_website", - "userClientType": "consumer", + "aclId": "12345678-abcd-1234-abcd-1234abcd1234", + "resourceName": "test_topic_created_from_website", "cognitoGroup": "gcn.nasa.gov/kafka-gcn-test-consumer", "prefixed": false, "permissionType": "3" }, { - "topicName": "test_topic_created_from_website", - "userClientType": "producer", + "aclId": "62fe8590-42e4-4917-afea-db6a0a84079a", + "resourceName": "test_topic_created_from_website", "cognitoGroup": "gcn.nasa.gov/kafka-gcn-test-producer", "prefixed": false, "permissionType": "3"