Skip to content

Commit

Permalink
New First commit
Browse files Browse the repository at this point in the history
  • Loading branch information
dakota002 committed Nov 19, 2024
1 parent 95edeb6 commit b6f5ee8
Show file tree
Hide file tree
Showing 9 changed files with 782 additions and 3 deletions.
13 changes: 13 additions & 0 deletions app.arc
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ legacy_users
email *String
PointInTimeRecovery true

kafka_acls
aclId *String
PointInTimeRecovery true

kafka_acl_log
partitionKey *Number
syncedOn **Number
PointInTimeRecovery ture

@tables-indexes
email_notification_subscription
topic *String
Expand Down Expand Up @@ -143,6 +152,10 @@ synonyms
synonymId *String
name synonymsByUuid

kafka_acls
resourceName *String
name aclsByResourceName

@aws
runtime nodejs20.x
region us-east-1
Expand Down
260 changes: 260 additions & 0 deletions app/lib/kafka.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,23 @@
*
* SPDX-License-Identifier: Apache-2.0
*/
import { tables } from '@architect/functions'
import { paginateScan } from '@aws-sdk/lib-dynamodb'
import type { DynamoDBDocument } from '@aws-sdk/lib-dynamodb'
import crypto from 'crypto'
import type { AclFilter } from 'gcn-kafka'
import { Kafka } from 'gcn-kafka'
import type { AclEntry } from 'kafkajs'
import {
AclOperationTypes,
AclPermissionTypes,
AclResourceTypes,
ResourcePatternTypes,
} from 'kafkajs'
import memoizee from 'memoizee'

import { domain, getEnvOrDieInProduction } from './env.server'
import type { User } from '~/routes/_auth/user.server'

const client_id = getEnvOrDieInProduction('KAFKA_CLIENT_ID') ?? ''
const client_secret = getEnvOrDieInProduction('KAFKA_CLIENT_SECRET')
Expand Down Expand Up @@ -68,3 +81,250 @@ if (process.env.ARC_SANDBOX) {
await producer.send({ topic, messages: [{ value }] })
}
}

export type KafkaACL = AclEntry & {
aclId?: string
}

export type UserClientType = 'producer' | 'consumer'

export const adminGroup = 'gcn.nasa.gov/gcn-admin'

export const consumerOperations = [
AclOperationTypes.READ,
AclOperationTypes.DESCRIBE,
]
export const producerOperations = [
AclOperationTypes.CREATE,
AclOperationTypes.WRITE,
AclOperationTypes.DESCRIBE,
]

const admin_client_id = getEnvOrDieInProduction('KAFKA_ADMIN_CLIENT_ID') ?? ''
const admin_client_secret = getEnvOrDieInProduction('KAFKA_ADMIN_CLIENT_SECRET')
const adminKafka = new Kafka({
client_id: admin_client_id,
client_secret: admin_client_secret,
domain,
})

function validateUser(user: User) {
if (!user.groups.includes(adminGroup))
throw new Response(null, { status: 403 })
}

export async function createKafkaACL(
user: User,
userClientType: UserClientType,
resourceName: string,
group: string,
permissionType: number,
resourceType: number,
includePrefixed: boolean
) {
const acls: KafkaACL[] =
userClientType == 'consumer'
? consumerOperations.map((operation) => {
return {
resourceName,
principal: `User:${group}`,
host: '*',
operation,
permissionType,
resourcePatternType: 3,
resourceType,
}
})
: producerOperations.map((operation) => {
return {
resourceName,
principal: `User:${group}`,
host: '*',
operation,
permissionType,
resourcePatternType: 3,
resourceType,
}
})

if (includePrefixed) {
const prefixedAcls =
userClientType === 'consumer'
? consumerOperations.map((operation) => {
return {
resourceName,
principal: `User:${group}`,
host: '*',
operation,
permissionType,
resourcePatternType: 4,
resourceType,
}
})
: producerOperations.map((operation) => {
return {
resourceName,
principal: `User:${group}`,
host: '*',
operation,
permissionType,
resourcePatternType: 4,
resourceType,
}
})
acls.push(...prefixedAcls)
}

await createKafkaACLInternal(user, acls)
}

async function createKafkaACLInternal(user: User, acls: KafkaACL[]) {
validateUser(user)
// Save to db
const db = await tables()
await Promise.all(
acls.map((acl) => db.kafka_acls.put({ ...acl, aclId: crypto.randomUUID() }))
)

// Add to Kafka
const adminClient = adminKafka.admin()
await adminClient.connect()
if (acls.some((acl) => acl.resourceType === AclResourceTypes.TOPIC))
await Promise.all(
acls
.filter((acl) => acl.resourceType === AclResourceTypes.TOPIC)
.map((acl) =>
adminClient.createTopics({
topics: [
{
topic: acl.resourceName,
},
],
})
)
)

await adminClient.createAcls({ acl: acls })
await adminClient.disconnect()
}

export async function getKafkaACLsFromDynamoDB(user: User, filter?: string) {
validateUser(user)
const db = await tables()
const client = db._doc as unknown as DynamoDBDocument
const TableName = db.name('kafka_acls')
const pages = paginateScan(
{ client },
{
TableName,
FilterExpression: filter
? 'contains(resourceName, :filter) OR contains(cognitoGroup, :filter)'
: undefined,
ExpressionAttributeValues: filter
? {
':filter': filter,
}
: undefined,
}
)

const acls: KafkaACL[] = []
for await (const page of pages) {
const newACL = page.Items as KafkaACL[]
if (newACL) acls.push(...newACL)
}
return acls
}

export async function getAclsFromBrokers() {
const adminClient = adminKafka.admin()
await adminClient.connect()
const acls = await adminClient.describeAcls({
resourceType: AclResourceTypes.ANY,
host: '*',
permissionType: AclPermissionTypes.ANY,
operation: AclOperationTypes.ANY,
resourcePatternType: ResourcePatternTypes.ANY,
})
await adminClient.disconnect()

const results: KafkaACL[] = []
for (const item of acls.resources) {
console.log('Item:', item)

results.push(
...item.acls.map((acl) => {
return {
...acl,
resourceName: item.resourceName,
resourceType: item.resourceType,
resourcePatternType: item.resourcePatternType,
}
})
)
}

return results
}

export async function deleteKafkaACL(user: User, aclIds: string[]) {
validateUser(user)
const db = await tables()
const acls: KafkaACL[] = await Promise.all(
aclIds.map((aclId) => db.kafka_acls.get({ aclId }))
)

const adminClient = adminKafka.admin()
await adminClient.connect()
await adminClient.deleteAcls({ filters: acls as AclFilter[] })
await adminClient.disconnect()

await Promise.all(
acls.map((acl) =>
db.kafka_acls.delete({
aclId: acl.aclId,
})
)
)
}

export async function updateBrokersFromDb(user: User) {
const dbDefinedAcls = await getKafkaACLsFromDynamoDB(user)
const adminClient = adminKafka.admin()
await adminClient.connect()
await adminClient.createAcls({ acl: dbDefinedAcls })
await adminClient.disconnect()
}

export async function updateDbFromBrokers(user: User) {
const kafkaDefinedAcls = await getAclsFromBrokers()
const db = await tables()
await Promise.all([
...kafkaDefinedAcls.map((acl) =>
db.kafka_acls.put({ ...acl, aclId: crypto.randomUUID() })
),
db.kafka_acl_log.put({
partitionKey: 1,
syncedOn: Date.now(),
syncedBy: user.email,
}),
])
}

type KafkaAclSyncLog = {
partitionKey: number
syncedOn: number
syncedBy: string
}

export async function getLastSyncDate(): Promise<KafkaAclSyncLog> {
const db = await tables()
return (
await db.kafka_acl_log.query({
KeyConditionExpression: 'partitionKey = :1',
ExpressionAttributeValues: { ':1': 1 },
ScanIndexForward: false,
Limit: 1,
})
).Items.pop() as KafkaAclSyncLog
}
8 changes: 8 additions & 0 deletions app/root.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import { useSpinDelay } from 'spin-delay'
import invariant from 'tiny-invariant'

import { features, getEnvOrDieInProduction, origin } from './lib/env.server'
import { adminGroup } from './lib/kafka.server'
import { DevBanner } from './root/DevBanner'
import { Footer } from './root/Footer'
import NewsBanner from './root/NewsBanner'
Expand Down Expand Up @@ -119,6 +120,7 @@ export async function loader({ request }: LoaderFunctionArgs) {
const recaptchaSiteKey = getEnvOrDieInProduction('RECAPTCHA_SITE_KEY')
const userIsMod = user?.groups.includes(moderatorGroup)
const userIsVerifiedSubmitter = user?.groups.includes(submitterGroup)
const userIsAdmin = user?.groups.includes(adminGroup)

return {
origin,
Expand All @@ -129,6 +131,7 @@ export async function loader({ request }: LoaderFunctionArgs) {
idp,
userIsMod,
userIsVerifiedSubmitter,
userIsAdmin,
}
}

Expand Down Expand Up @@ -168,6 +171,11 @@ export function useSubmitterStatus() {
return userIsVerifiedSubmitter
}

export function useAdminStatus() {
const { userIsAdmin } = useLoaderDataRoot()
return userIsAdmin
}

export function useRecaptchaSiteKey() {
const { recaptchaSiteKey } = useLoaderDataRoot()
return recaptchaSiteKey
Expand Down
8 changes: 7 additions & 1 deletion app/root/header/Header.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { useEffect, useState } from 'react'
import { useClickAnyWhere, useWindowSize } from 'usehooks-ts'

import { Meatball } from '~/components/meatball/Meatball'
import { useEmail, useUserIdp } from '~/root'
import { useAdminStatus, useEmail, useUserIdp } from '~/root'

import styles from './header.module.css'

Expand Down Expand Up @@ -74,6 +74,7 @@ export function Header() {
const [expanded, setExpanded] = useState(false)
const [userMenuIsOpen, setUserMenuIsOpen] = useState(false)
const isMobile = useWindowSize().width < 1024
const userIsAdmin = useAdminStatus()

function toggleMobileNav() {
setExpanded((expanded) => !expanded)
Expand Down Expand Up @@ -162,6 +163,11 @@ export function Header() {
<NavLink end key="user" to="/user">
Profile
</NavLink>,
userIsAdmin && (
<NavLink key="admin" to="/admin/kafka">
Admin
</NavLink>
),
<NavLink key="endorsements" to="/user/endorsements">
Peer Endorsements
</NavLink>,
Expand Down
Loading

0 comments on commit b6f5ee8

Please sign in to comment.