Skip to content

Commit

Permalink
refactors the data model
Browse files Browse the repository at this point in the history
  • Loading branch information
dakota002 committed Sep 12, 2024
1 parent 80cb32a commit ec31f5f
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 180 deletions.
8 changes: 3 additions & 5 deletions app.arc
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ legacy_users
PointInTimeRecovery true

kafka_acls
topicName *String
cognitoGroup **String
aclId *String
PointInTimeRecovery true

kafka_acl_log
Expand Down Expand Up @@ -154,9 +153,8 @@ synonyms
name synonymsByUuid

kafka_acls
cognitoGroup *String
userClientType **String
name aclsByGroup
resourceName *String
name aclsByResourceName

@aws
runtime nodejs20.x
Expand Down
243 changes: 129 additions & 114 deletions app/lib/kafka.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import { tables } from '@architect/functions'
import { paginateScan } from '@aws-sdk/lib-dynamodb'
import type { DynamoDBDocument } from '@aws-sdk/lib-dynamodb'
import crypto from 'crypto'
import { Kafka } from 'gcn-kafka'
import type { AclEntry } from 'kafkajs'
import {
Expand Down Expand Up @@ -80,20 +81,30 @@ if (process.env.ARC_SANDBOX) {
}
}

export type KafkaACL = {
topicName: string
userClientType: UserClientType
cognitoGroup: string
prefixed: boolean
permissionType: number
/**
* AclEntry already contains definitions for the following:
*
* principal: string --> 'User:{cognito_group_name}'
* host: string --> '*'
* operation: AclOperationTypes --> Read,Write, etc from enum
* permissionType: AclPermissionTypes --> Allow, Deny, etc from enum
* resourceType: AclResourceTypes --> TOPIC, etc
* resourceName: string --> name of topic: 'gcn.notices.burstcube'
* resourcePatternType: ResourcePatternTypes --> PREFIXED or LITERAL
*/
export type KafkaACL = AclEntry & {
aclId?: string
}

export type UserClientType = 'producer' | 'consumer'

export const adminGroup = 'gcn.nasa.gov/gcn-admin'

const consumerOperations = [AclOperationTypes.READ, AclOperationTypes.DESCRIBE]
const producerOperations = [
export const consumerOperations = [
AclOperationTypes.READ,
AclOperationTypes.DESCRIBE,
]
export const producerOperations = [
AclOperationTypes.CREATE,
AclOperationTypes.WRITE,
AclOperationTypes.DESCRIBE,
Expand All @@ -112,36 +123,101 @@ function validateUser(user: User) {
throw new Response(null, { status: 403 })
}

export async function createKafkaACL(user: User, acl: KafkaACL) {
export async function createKafkaACL(
user: User,
userClientType: UserClientType,
resourceName: string,
group: string,
permissionType: number,
resourceType: number,
includePrefixed: boolean
) {
const acls: KafkaACL[] =
userClientType == 'consumer'
? consumerOperations.map((operation) => {
return {
resourceName,
principal: `User:${group}`,
host: '*',
operation, // Read, write, etc
permissionType, // Allow, deny etc
resourcePatternType: 3, // LITERAL | PREFIXED
resourceType,
}
})
: producerOperations.map((operation) => {
return {
resourceName,
principal: `User:${group}`,
host: '*',
operation, // Read, write, etc
permissionType,
resourcePatternType: 3, // LITERAL | PREFIX
resourceType,
}
})

if (includePrefixed) {
const prefixedAcls =
userClientType === 'consumer'
? consumerOperations.map((operation) => {
return {
resourceName,
principal: `User:${group}`,
host: '*',
operation,
permissionType,
resourcePatternType: 4,
resourceType,
}
})
: producerOperations.map((operation) => {
return {
resourceName,
principal: `User:${group}`,
host: '*',
operation,
permissionType,
resourcePatternType: 4,
resourceType,
}
})
acls.push(...prefixedAcls)
}

await createKafkaACLInternal(user, acls)
}

async function createKafkaACLInternal(user: User, acls: KafkaACL[]) {
validateUser(user)
// Save to db
const db = await tables()
await db.kafka_acls.put(acl)
await Promise.all(
acls.map((acl) => db.kafka_acls.put({ ...acl, aclId: crypto.randomUUID() }))
)

// Add to Kafka
const adminClient = adminKafka.admin()
await adminClient.connect()
await adminClient.createTopics({
topics: [
{
topic: acl.topicName,
},
],
})
const acls =
acl.userClientType == 'producer'
? createProducerAcls(acl)
: createConsumerAcls(acl)
if (acls.some((acl) => acl.resourceType === AclResourceTypes.TOPIC))
await Promise.all(
acls
.filter((acl) => acl.resourceType === AclResourceTypes.TOPIC)
.map((acl) =>
adminClient.createTopics({
topics: [
{
topic: acl.resourceName,
},
],
})
)
)

await adminClient.createAcls({ acl: acls })
await adminClient.disconnect()
}

export async function getKafkaACLByTopicName(user: User, topicName: string) {
validateUser(user)
const db = await tables()
return (await db.kafka_acls.get({ topicName })) as KafkaACL
}

export async function getKafkaACLsFromDynamoDB(user: User, filter?: string) {
validateUser(user)
const db = await tables()
Expand All @@ -152,7 +228,7 @@ export async function getKafkaACLsFromDynamoDB(user: User, filter?: string) {
{
TableName,
FilterExpression: filter
? 'contains(topicName, :filter) OR contains(cognitoGroup, :filter)'
? 'contains(resourceName, :filter) OR contains(cognitoGroup, :filter)'
: undefined,
ExpressionAttributeValues: filter
? {
Expand All @@ -170,134 +246,73 @@ export async function getKafkaACLsFromDynamoDB(user: User, filter?: string) {
return acls
}

export async function getKafkaTopicsForUser(user: User) {
validateUser(user)
const userGroups = user.groups.filter((x) =>
x.startsWith('gcn.nasa.gov/kafka-')
)
const db = await tables()
const items = (
await Promise.all([
...userGroups.map((cognitoGroup) =>
db.kafka_acls.query({
IndexName: 'aclsByGroup',
KeyConditionExpression:
'cognitoGroup = :group AND permissionType = :permission',
ProjectionExpression: 'topicName',
ExpressionAttributeValues: {
':group': cognitoGroup,
':permission': 'consumer',
},
})
),
])
)
.filter((x) => x.Count && x.Count > 0)
.flatMap((x) => x.Items)
.map((x) => x.topicName)

return items
}
export async function getAclsFromBrokers() {
const adminClient = adminKafka.admin()
await adminClient.connect()
const acls = await adminClient.describeAcls({
resourceType: AclResourceTypes.TOPIC,
resourceType: AclResourceTypes.ANY,
host: '*',
permissionType: AclPermissionTypes.ANY,
operation: AclOperationTypes.ANY,
resourcePatternType: ResourcePatternTypes.ANY,
})
await adminClient.disconnect()

const results: KafkaACL[] = []
for (const item of acls.resources) {
console.log('Item:', item)
const topicName = item.resourceName

const prefixed = item.resourcePatternType === ResourcePatternTypes.PREFIXED
results.push(
...item.acls
.filter((acl) => acl.operation !== AclOperationTypes.DESCRIBE)
.map((acl) => {
const principal = acl.principal.split('-')
return {
topicName,
prefixed,
permissionType: acl.permissionType,
cognitoGroup: acl.principal.replace('User:', ''),
userClientType: principal[principal.length - 1] as UserClientType,
}
})
...item.acls.map((acl) => {
return {
...acl,
resourceName: item.resourceName,
resourceType: item.resourceType,
resourcePatternType: item.resourcePatternType,
}
})
)
}

return results
}

export async function deleteKafkaACL(user: User, acl: KafkaACL) {
export async function deleteKafkaACL(user: User, aclIds: string[]) {
validateUser(user)
const db = await tables()
await db.kafka_acls.delete({
topicName: acl.topicName,
cognitoGroup: acl.cognitoGroup,
})

const acls =
acl.userClientType == 'producer'
? createProducerAcls(acl)
: createConsumerAcls(acl)
const acls = await Promise.all(
aclIds.map((aclId) => db.kafka_acls.get({ aclId }))
)

const adminClient = adminKafka.admin()
await adminClient.connect()
await adminClient.deleteAcls({ filters: acls })
await adminClient.disconnect()
}

function createProducerAcls(acl: KafkaACL): AclEntry[] {
// Create, Write, and Describe operations
return mapAclAndOperations(acl, producerOperations)
}

function createConsumerAcls(acl: KafkaACL): AclEntry[] {
// Read and Describe operations
return mapAclAndOperations(acl, consumerOperations)
}

function mapAclAndOperations(acl: KafkaACL, operations: AclOperationTypes[]) {
return operations.map((operation) => {
return {
resourceType: AclResourceTypes.TOPIC,
resourceName: acl.topicName,
resourcePatternType: acl.prefixed
? ResourcePatternTypes.PREFIXED
: ResourcePatternTypes.LITERAL,
principal: `User:${acl.cognitoGroup}`,
host: '*',
operation,
permissionType: acl.permissionType,
}
})
await Promise.all(
acls.map((acl) =>
db.kafka_acls.delete({
aclId: acl.aclId,
})
)
)
}

export async function updateBrokersFromDb(user: User) {
const dbDefinedAcls = await getKafkaACLsFromDynamoDB(user)
const mappedAcls = dbDefinedAcls.flatMap((x) =>
x.userClientType === 'producer'
? createProducerAcls(x)
: createConsumerAcls(x)
)

const adminClient = adminKafka.admin()
await adminClient.connect()
await adminClient.createAcls({ acl: mappedAcls })
await adminClient.createAcls({ acl: dbDefinedAcls })
await adminClient.disconnect()
}

export async function updateDbFromBrokers(user: User) {
const kafkaDefinedAcls = await getAclsFromBrokers()
const db = await tables()
await Promise.all([
...kafkaDefinedAcls.map((acl) => db.kafka_acls.put(acl)),
...kafkaDefinedAcls.map((acl) =>
db.kafka_acls.put({ ...acl, aclId: crypto.randomUUID() })
),
db.kafka_acl_log.put({
partitionKey: 1,
syncedOn: Date.now(),
Expand Down
Loading

0 comments on commit ec31f5f

Please sign in to comment.