Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kafka ACL admin to website #2301

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions app.arc
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ legacy_users
email *String
PointInTimeRecovery true

kafka_acls
aclId *String
PointInTimeRecovery true

kafka_acl_log
partitionKey *Number
syncedOn **Number
PointInTimeRecovery ture

@tables-indexes
email_notification_subscription
topic *String
Expand Down Expand Up @@ -143,6 +152,10 @@ synonyms
synonymId *String
name synonymsByUuid

kafka_acls
resourceName *String
name aclsByResourceName

@aws
runtime nodejs20.x
region us-east-1
Expand Down
133 changes: 133 additions & 0 deletions app/lib/kafka.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,22 @@
*
* SPDX-License-Identifier: Apache-2.0
*/
import { tables } from '@architect/functions'
import { paginateScan } from '@aws-sdk/lib-dynamodb'

Check warning on line 9 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L8-L9

Added lines #L8 - L9 were not covered by tests
import type { DynamoDBDocument } from '@aws-sdk/lib-dynamodb'
import crypto from 'crypto'

Check warning on line 11 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L11

Added line #L11 was not covered by tests
import { Kafka } from 'gcn-kafka'
import type { AclEntry } from 'kafkajs'
import {

Check warning on line 14 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L14

Added line #L14 was not covered by tests
AclOperationTypes,
AclPermissionTypes,
AclResourceTypes,
ResourcePatternTypes,
} from 'kafkajs'
import memoizee from 'memoizee'

import { domain, getEnvOrDieInProduction } from './env.server'
import type { User } from '~/routes/_auth/user.server'

const client_id = getEnvOrDieInProduction('KAFKA_CLIENT_ID') ?? ''
const client_secret = getEnvOrDieInProduction('KAFKA_CLIENT_SECRET')
Expand Down Expand Up @@ -68,3 +80,124 @@
await producer.send({ topic, messages: [{ value }] })
}
}

export type KafkaACL = AclEntry & {
aclId?: string
}

export type UserClientType = 'producer' | 'consumer'

export const adminGroup = 'gcn.nasa.gov/gcn-admin'

Check warning on line 90 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L90

Added line #L90 was not covered by tests

export const consumerOperations = [

Check warning on line 92 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L92

Added line #L92 was not covered by tests
AclOperationTypes.READ,
AclOperationTypes.DESCRIBE,
]
export const producerOperations = [

Check warning on line 96 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L96

Added line #L96 was not covered by tests
AclOperationTypes.CREATE,
AclOperationTypes.WRITE,
AclOperationTypes.DESCRIBE,
]

const admin_client_id = getEnvOrDieInProduction('KAFKA_ADMIN_CLIENT_ID') ?? ''
const admin_client_secret = getEnvOrDieInProduction('KAFKA_ADMIN_CLIENT_SECRET')
const adminKafka = new Kafka({

Check warning on line 104 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L103-L104

Added lines #L103 - L104 were not covered by tests
client_id: admin_client_id,
client_secret: admin_client_secret,
domain,
})

function validateUser(user: User) {

Check warning on line 110 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L110

Added line #L110 was not covered by tests
if (!user.groups.includes(adminGroup))
throw new Response(null, { status: 403 })

Check warning on line 112 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L112

Added line #L112 was not covered by tests
}

export async function getKafkaACLsFromDynamoDB(user: User, filter?: string) {
validateUser(user)
const db = await tables()
const client = db._doc as unknown as DynamoDBDocument
const TableName = db.name('kafka_acls')
const pages = paginateScan(

Check warning on line 120 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L115-L120

Added lines #L115 - L120 were not covered by tests
{ client },
{
TableName,
FilterExpression: filter
? 'contains(resourceName, :filter) OR contains(cognitoGroup, :filter)'
: undefined,
ExpressionAttributeValues: filter
? {
':filter': filter,
}
: undefined,
}
)

const acls: KafkaACL[] = []
for await (const page of pages) {
const newACL = page.Items as KafkaACL[]

Check warning on line 137 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L135-L137

Added lines #L135 - L137 were not covered by tests
if (newACL) acls.push(...newACL)
}
return acls

Check warning on line 140 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L140

Added line #L140 was not covered by tests
}

export async function getAclsFromBrokers() {
const adminClient = adminKafka.admin()
await adminClient.connect()
const acls = await adminClient.describeAcls({

Check warning on line 146 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L143-L146

Added lines #L143 - L146 were not covered by tests
resourceType: AclResourceTypes.ANY,
host: '*',
permissionType: AclPermissionTypes.ANY,
operation: AclOperationTypes.ANY,
resourcePatternType: ResourcePatternTypes.ANY,
})
await adminClient.disconnect()

Check warning on line 153 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L153

Added line #L153 was not covered by tests

const results: KafkaACL[] = []
for (const item of acls.resources) {
results.push(
...item.acls.map((acl) => {
return {

Check warning on line 159 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L155-L159

Added lines #L155 - L159 were not covered by tests
...acl,
resourceName: item.resourceName,
resourceType: item.resourceType,
resourcePatternType: item.resourcePatternType,
}
})
)
}

return results

Check warning on line 169 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L169

Added line #L169 was not covered by tests
}

export async function updateDbFromBrokers(user: User) {
const kafkaDefinedAcls = await getAclsFromBrokers()
const db = await tables()
await Promise.all([
...kafkaDefinedAcls.map((acl) =>
db.kafka_acls.put({ ...acl, aclId: crypto.randomUUID() })

Check warning on line 177 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L172-L177

Added lines #L172 - L177 were not covered by tests
),
db.kafka_acl_log.put({
partitionKey: 1,
syncedOn: Date.now(),
syncedBy: user.email,
}),
])
}

type KafkaAclSyncLog = {
partitionKey: number
syncedOn: number
syncedBy: string
}

export async function getLastSyncDate(): Promise<KafkaAclSyncLog> {
const db = await tables()
return (

Check warning on line 195 in app/lib/kafka.server.ts

View check run for this annotation

Codecov / codecov/patch

app/lib/kafka.server.ts#L193-L195

Added lines #L193 - L195 were not covered by tests
await db.kafka_acl_log.query({
KeyConditionExpression: 'partitionKey = :1',
ExpressionAttributeValues: { ':1': 1 },
ScanIndexForward: false,
Limit: 1,
})
).Items.pop() as KafkaAclSyncLog
}
8 changes: 8 additions & 0 deletions app/root.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import invariant from 'tiny-invariant'

import { features, getEnvOrDieInProduction, origin } from './lib/env.server'
import { adminGroup } from './lib/kafka.server'

Check warning on line 49 in app/root.tsx

View check run for this annotation

Codecov / codecov/patch

app/root.tsx#L49

Added line #L49 was not covered by tests
import { DevBanner } from './root/DevBanner'
import { Footer } from './root/Footer'
import NewsBanner from './root/NewsBanner'
Expand Down Expand Up @@ -119,6 +120,7 @@
const recaptchaSiteKey = getEnvOrDieInProduction('RECAPTCHA_SITE_KEY')
const userIsMod = user?.groups.includes(moderatorGroup)
const userIsVerifiedSubmitter = user?.groups.includes(submitterGroup)
const userIsAdmin = user?.groups.includes(adminGroup)

Check warning on line 123 in app/root.tsx

View check run for this annotation

Codecov / codecov/patch

app/root.tsx#L123

Added line #L123 was not covered by tests

return {
origin,
Expand All @@ -129,6 +131,7 @@
idp,
userIsMod,
userIsVerifiedSubmitter,
userIsAdmin,
}
}

Expand Down Expand Up @@ -168,6 +171,11 @@
return userIsVerifiedSubmitter
}

export function useAdminStatus() {
const { userIsAdmin } = useLoaderDataRoot()
return userIsAdmin

Check warning on line 176 in app/root.tsx

View check run for this annotation

Codecov / codecov/patch

app/root.tsx#L174-L176

Added lines #L174 - L176 were not covered by tests
}

export function useRecaptchaSiteKey() {
const { recaptchaSiteKey } = useLoaderDataRoot()
return recaptchaSiteKey
Expand Down
8 changes: 7 additions & 1 deletion app/root/header/Header.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import { useClickAnyWhere, useWindowSize } from 'usehooks-ts'

import { Meatball } from '~/components/meatball/Meatball'
import { useEmail, useUserIdp } from '~/root'
import { useAdminStatus, useEmail, useUserIdp } from '~/root'

Check warning on line 20 in app/root/header/Header.tsx

View check run for this annotation

Codecov / codecov/patch

app/root/header/Header.tsx#L20

Added line #L20 was not covered by tests

import styles from './header.module.css'

Expand Down Expand Up @@ -74,6 +74,7 @@
const [expanded, setExpanded] = useState(false)
const [userMenuIsOpen, setUserMenuIsOpen] = useState(false)
const isMobile = useWindowSize().width < 1024
const userIsAdmin = useAdminStatus()

Check warning on line 77 in app/root/header/Header.tsx

View check run for this annotation

Codecov / codecov/patch

app/root/header/Header.tsx#L77

Added line #L77 was not covered by tests

function toggleMobileNav() {
setExpanded((expanded) => !expanded)
Expand Down Expand Up @@ -162,6 +163,11 @@
<NavLink end key="user" to="/user">
Profile
</NavLink>,
userIsAdmin && (
<NavLink key="admin" to="/admin/kafka">
Admin
</NavLink>
),
<NavLink key="endorsements" to="/user/endorsements">
Peer Endorsements
</NavLink>,
Expand Down
Loading