diff --git a/app.arc b/app.arc index 43e7be1fa..1d46b86d5 100644 --- a/app.arc +++ b/app.arc @@ -94,6 +94,15 @@ legacy_users email *String PointInTimeRecovery true +kafka_acls + aclId *String + PointInTimeRecovery true + +kafka_acl_log + partitionKey *Number + syncedOn **Number + PointInTimeRecovery ture + @tables-indexes email_notification_subscription topic *String @@ -143,6 +152,10 @@ synonyms synonymId *String name synonymsByUuid +kafka_acls + resourceName *String + name aclsByResourceName + @aws runtime nodejs20.x region us-east-1 diff --git a/app/lib/kafka.server.ts b/app/lib/kafka.server.ts index eb85f2fa5..da1632a01 100644 --- a/app/lib/kafka.server.ts +++ b/app/lib/kafka.server.ts @@ -5,10 +5,22 @@ * * SPDX-License-Identifier: Apache-2.0 */ +import { tables } from '@architect/functions' +import { paginateScan } from '@aws-sdk/lib-dynamodb' +import type { DynamoDBDocument } from '@aws-sdk/lib-dynamodb' +import crypto from 'crypto' import { Kafka } from 'gcn-kafka' +import type { AclEntry } from 'kafkajs' +import { + AclOperationTypes, + AclPermissionTypes, + AclResourceTypes, + ResourcePatternTypes, +} from 'kafkajs' import memoizee from 'memoizee' import { domain, getEnvOrDieInProduction } from './env.server' +import type { User } from '~/routes/_auth/user.server' const client_id = getEnvOrDieInProduction('KAFKA_CLIENT_ID') ?? '' const client_secret = getEnvOrDieInProduction('KAFKA_CLIENT_SECRET') @@ -68,3 +80,124 @@ if (process.env.ARC_SANDBOX) { await producer.send({ topic, messages: [{ value }] }) } } + +export type KafkaACL = AclEntry & { + aclId?: string +} + +export type UserClientType = 'producer' | 'consumer' + +export const adminGroup = 'gcn.nasa.gov/gcn-admin' + +export const consumerOperations = [ + AclOperationTypes.READ, + AclOperationTypes.DESCRIBE, +] +export const producerOperations = [ + AclOperationTypes.CREATE, + AclOperationTypes.WRITE, + AclOperationTypes.DESCRIBE, +] + +const admin_client_id = getEnvOrDieInProduction('KAFKA_ADMIN_CLIENT_ID') ?? '' +const admin_client_secret = getEnvOrDieInProduction('KAFKA_ADMIN_CLIENT_SECRET') +const adminKafka = new Kafka({ + client_id: admin_client_id, + client_secret: admin_client_secret, + domain, +}) + +function validateUser(user: User) { + if (!user.groups.includes(adminGroup)) + throw new Response(null, { status: 403 }) +} + +export async function getKafkaACLsFromDynamoDB(user: User, filter?: string) { + validateUser(user) + const db = await tables() + const client = db._doc as unknown as DynamoDBDocument + const TableName = db.name('kafka_acls') + const pages = paginateScan( + { client }, + { + TableName, + FilterExpression: filter + ? 'contains(resourceName, :filter) OR contains(cognitoGroup, :filter)' + : undefined, + ExpressionAttributeValues: filter + ? { + ':filter': filter, + } + : undefined, + } + ) + + const acls: KafkaACL[] = [] + for await (const page of pages) { + const newACL = page.Items as KafkaACL[] + if (newACL) acls.push(...newACL) + } + return acls +} + +export async function getAclsFromBrokers() { + const adminClient = adminKafka.admin() + await adminClient.connect() + const acls = await adminClient.describeAcls({ + resourceType: AclResourceTypes.ANY, + host: '*', + permissionType: AclPermissionTypes.ANY, + operation: AclOperationTypes.ANY, + resourcePatternType: ResourcePatternTypes.ANY, + }) + await adminClient.disconnect() + + const results: KafkaACL[] = [] + for (const item of acls.resources) { + results.push( + ...item.acls.map((acl) => { + return { + ...acl, + resourceName: item.resourceName, + resourceType: item.resourceType, + resourcePatternType: item.resourcePatternType, + } + }) + ) + } + + return results +} + +export async function updateDbFromBrokers(user: User) { + const kafkaDefinedAcls = await getAclsFromBrokers() + const db = await tables() + await Promise.all([ + ...kafkaDefinedAcls.map((acl) => + db.kafka_acls.put({ ...acl, aclId: crypto.randomUUID() }) + ), + db.kafka_acl_log.put({ + partitionKey: 1, + syncedOn: Date.now(), + syncedBy: user.email, + }), + ]) +} + +type KafkaAclSyncLog = { + partitionKey: number + syncedOn: number + syncedBy: string +} + +export async function getLastSyncDate(): Promise { + const db = await tables() + return ( + await db.kafka_acl_log.query({ + KeyConditionExpression: 'partitionKey = :1', + ExpressionAttributeValues: { ':1': 1 }, + ScanIndexForward: false, + Limit: 1, + }) + ).Items.pop() as KafkaAclSyncLog +} diff --git a/app/root.tsx b/app/root.tsx index 2622d8f4c..4ee0e070e 100644 --- a/app/root.tsx +++ b/app/root.tsx @@ -46,6 +46,7 @@ import { useSpinDelay } from 'spin-delay' import invariant from 'tiny-invariant' import { features, getEnvOrDieInProduction, origin } from './lib/env.server' +import { adminGroup } from './lib/kafka.server' import { DevBanner } from './root/DevBanner' import { Footer } from './root/Footer' import NewsBanner from './root/NewsBanner' @@ -119,6 +120,7 @@ export async function loader({ request }: LoaderFunctionArgs) { const recaptchaSiteKey = getEnvOrDieInProduction('RECAPTCHA_SITE_KEY') const userIsMod = user?.groups.includes(moderatorGroup) const userIsVerifiedSubmitter = user?.groups.includes(submitterGroup) + const userIsAdmin = user?.groups.includes(adminGroup) return { origin, @@ -129,6 +131,7 @@ export async function loader({ request }: LoaderFunctionArgs) { idp, userIsMod, userIsVerifiedSubmitter, + userIsAdmin, } } @@ -168,6 +171,11 @@ export function useSubmitterStatus() { return userIsVerifiedSubmitter } +export function useAdminStatus() { + const { userIsAdmin } = useLoaderDataRoot() + return userIsAdmin +} + export function useRecaptchaSiteKey() { const { recaptchaSiteKey } = useLoaderDataRoot() return recaptchaSiteKey diff --git a/app/root/header/Header.tsx b/app/root/header/Header.tsx index dde3947ad..61e3e9fcd 100644 --- a/app/root/header/Header.tsx +++ b/app/root/header/Header.tsx @@ -17,7 +17,7 @@ import { useEffect, useState } from 'react' import { useClickAnyWhere, useWindowSize } from 'usehooks-ts' import { Meatball } from '~/components/meatball/Meatball' -import { useEmail, useUserIdp } from '~/root' +import { useAdminStatus, useEmail, useUserIdp } from '~/root' import styles from './header.module.css' @@ -74,6 +74,7 @@ export function Header() { const [expanded, setExpanded] = useState(false) const [userMenuIsOpen, setUserMenuIsOpen] = useState(false) const isMobile = useWindowSize().width < 1024 + const userIsAdmin = useAdminStatus() function toggleMobileNav() { setExpanded((expanded) => !expanded) @@ -162,6 +163,11 @@ export function Header() { Profile , + userIsAdmin && ( + + Admin + + ), Peer Endorsements , diff --git a/app/routes/admin.kafka._index.tsx b/app/routes/admin.kafka._index.tsx new file mode 100644 index 000000000..df1d4d3e7 --- /dev/null +++ b/app/routes/admin.kafka._index.tsx @@ -0,0 +1,207 @@ +/*! + * Copyright © 2023 United States Government as represented by the + * Administrator of the National Aeronautics and Space Administration. + * All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ +import type { ActionFunctionArgs, LoaderFunctionArgs } from '@remix-run/node' +import { useFetcher, useLoaderData } from '@remix-run/react' +import { Button, Label, Table, TextInput } from '@trussworks/react-uswds' +import { groupBy, sortBy } from 'lodash' +import { useEffect, useState } from 'react' + +import { getUser } from './_auth/user.server' +import SegmentedCards from '~/components/SegmentedCards' +import Spinner from '~/components/Spinner' +import TimeAgo from '~/components/TimeAgo' +import type { KafkaACL } from '~/lib/kafka.server' +import { + adminGroup, + getKafkaACLsFromDynamoDB, + getLastSyncDate, + updateDbFromBrokers, +} from '~/lib/kafka.server' +import { getFormDataString } from '~/lib/utils' + +export async function loader({ request }: LoaderFunctionArgs) { + const user = await getUser(request) + if (!user || !user.groups.includes(adminGroup)) + throw new Response(null, { status: 403 }) + const { aclFilter } = Object.fromEntries(new URL(request.url).searchParams) + const dynamoDbAclData = groupBy( + sortBy(await getKafkaACLsFromDynamoDB(user, aclFilter), [ + 'resourceName', + 'principal', + ]), + 'resourceName' + ) + const latestSync = await getLastSyncDate() + return { dynamoDbAclData, latestSync } +} + +export async function action({ request }: ActionFunctionArgs) { + const user = await getUser(request) + if (!user?.groups.includes(adminGroup)) + throw new Response(null, { status: 403 }) + const data = await request.formData() + const intent = getFormDataString(data, 'intent') + + if (intent === 'migrateFromBroker') { + await updateDbFromBrokers(user) + } + return null +} + +export default function Index() { + const { dynamoDbAclData, latestSync } = useLoaderData() + const [aclData, setAclData] = useState(dynamoDbAclData) + const updateFetcher = useFetcher() + const aclFetcher = useFetcher() + const brokerFromDbFetcher = useFetcher() + + useEffect(() => { + setAclData(aclFetcher.data?.dynamoDbAclData ?? aclData) + }, [aclFetcher.data, aclData]) + + return ( + <> +

Kafka

+

Kafka ACLs

+

+ Kafka Access Control Lists (ACLs) are a security mechanism used to + control access to resources within a Kafka cluster. They define which + users or client applications have permissions to perform specific + operations on Kafka resources, such as topics, consumer groups, and + broker resources. ACLs specify who can produce (write) or consume (read) + data from topics, create or delete topics, manage consumer groups, and + perform administrative tasks. +

+ + + {updateFetcher.state !== 'idle' && ( + + Updating... + + )} + + {latestSync ? ( +

+ Last synced by {latestSync.syncedBy}{' '} + +

+ ) : ( +
+ )} + {brokerFromDbFetcher.state !== 'idle' && ( + + Updating... + + )} + {aclData && ( + <> + + + + + {aclFetcher.state !== 'idle' && ( + + Loading... + + )} + + + {Object.keys(aclData) + .sort((a, b) => a.localeCompare(b)) + .flatMap((key) => ( + +
+

Resource: {key}

+
+ + + + + + + + + + + {aclData[key].map((acl, index) => ( + + ))} + +
TypeGroupPermissionOperation
+
+ ))} +
+ + )} + + ) +} + +function KafkaAclCard({ acl }: { acl: KafkaACL }) { + // TODO: These maps can probably be refactored, since they are + // just inverting the enum from kafka, but importing them + // directly here causes some errors. Same for mapping them to + // dropdowns + const permissionMap: { [key: number]: string } = { + 2: 'Deny', + 3: 'Allow', + } + + const operationMap: { [key: number]: string } = { + 0: 'Unknown', + 1: 'Any', + 2: 'All', + 3: 'Read', + 4: 'Write', + 5: 'Create', + 6: 'Delete', + 7: 'Alter', + 8: 'Describe', + 9: 'Cluster Action', + 10: 'Describe Configs', + 11: 'Alter Configs', + 12: 'Idempotent Write', + } + + const resourceTypeMap: { [key: number]: string } = { + 0: 'Unknown', + 1: 'Any', + 2: 'Topic', + 3: 'Group', + 4: 'Cluster', + 5: 'Transactional Id', + 6: 'Delegation Token', + } + + return ( + + {resourceTypeMap[acl.resourceType]} + {acl.principal} + {permissionMap[acl.permissionType]} + {operationMap[acl.operation]} + + ) +} diff --git a/app/routes/admin.tsx b/app/routes/admin.tsx index a38586307..0531f8c35 100644 --- a/app/routes/admin.tsx +++ b/app/routes/admin.tsx @@ -27,6 +27,9 @@ export default function () {
+ Kafka + , Users , diff --git a/playwright.config.ts b/playwright.config.ts index 5aa5401b4..76bf15d53 100644 --- a/playwright.config.ts +++ b/playwright.config.ts @@ -4,7 +4,6 @@ import { defineConfig, devices } from '@playwright/test' * Read environment variables from file. * https://github.com/motdotla/dotenv */ -// require('dotenv').config(); const deviceList = ['Desktop Firefox', 'Desktop Chrome', 'Desktop Safari'] @@ -69,7 +68,6 @@ export default defineConfig({ command: 'npm run dev', url: 'http://localhost:3333', reuseExistingServer: !process.env.CI, - stdout: 'pipe', timeout: 120 * 1000, // 120 Seconds timeout on webServer }, }) diff --git a/sandbox-seed.json b/sandbox-seed.json index 35534487e..a40536177 100644 --- a/sandbox-seed.json +++ b/sandbox-seed.json @@ -5155,5 +5155,29 @@ "requestorEmail": "example@example.com", "subject": "Optical Observations for GRB 971227" } + ], + "kafka_acls": [ + { + "aclId": "12345678-abcd-1234-abcd-1234abcd1234", + "resourceName": "test_topic_created_from_website", + "prefixed": false, + "permissionType": 3, + "operation": 3, + "host": "*", + "principal": "some-user-group", + "resourcePatternType": 3, + "resourceType": 2 + }, + { + "aclId": "62fe8590-42e4-4917-afea-db6a0a84079a", + "resourceName": "test_topic_created_from_website", + "prefixed": false, + "permissionType": 3, + "operation": 4, + "host": "*", + "principal": "some-user-group", + "resourcePatternType": 3, + "resourceType": 2 + } ] }