From c6fa53f03318dde539b47b93c9c3a508e111f253 Mon Sep 17 00:00:00 2001 From: dakota002 Date: Fri, 26 Apr 2024 14:41:02 -0400 Subject: [PATCH] Basic kafka admin stuff New routes and full form, successful creation of ACLs Cleanup, and testing acl verification method Simplify some functions, fix form Working on syncronizing between brokers and dynamo Trying a fix for slow tests and needing to wait for the circulars to actually load Log sync, rough draft of sync functionality, group check --- app.arc | 10 ++ app/lib/kafka.server.ts | 246 ++++++++++++++++++++++++++++++ app/root.tsx | 10 +- app/root/header/Header.tsx | 8 +- app/routes/admin.kafka._index.tsx | 167 ++++++++++++++++++++ app/routes/admin.kafka.edit.tsx | 95 ++++++++++++ app/routes/admin.kafka.tsx | 101 ++++++++++++ sandbox-seed.json | 14 ++ 8 files changed, 649 insertions(+), 2 deletions(-) create mode 100644 app/routes/admin.kafka._index.tsx create mode 100644 app/routes/admin.kafka.edit.tsx create mode 100644 app/routes/admin.kafka.tsx diff --git a/app.arc b/app.arc index 28dd2106b7..5cf095b137 100644 --- a/app.arc +++ b/app.arc @@ -94,6 +94,16 @@ legacy_users email *String PointInTimeRecovery true +kafka_acls + topicName *String + group **String + PointInTimeRecovery true + +kafka_acl_log + partitionKey *Number + syncedOn **Number + PointInTimeRecovery ture + @tables-indexes email_notification_subscription topic *String diff --git a/app/lib/kafka.server.ts b/app/lib/kafka.server.ts index dcaa60ba04..41cb002a4f 100644 --- a/app/lib/kafka.server.ts +++ b/app/lib/kafka.server.ts @@ -5,10 +5,21 @@ * * SPDX-License-Identifier: Apache-2.0 */ +import { tables } from '@architect/functions' +import { paginateScan } from '@aws-sdk/lib-dynamodb' +import type { DynamoDBDocument } from '@aws-sdk/lib-dynamodb' import { Kafka } from 'gcn-kafka' +import type { AclEntry } from 'kafkajs' +import { + AclOperationTypes, + AclPermissionTypes, + AclResourceTypes, + ResourcePatternTypes, +} from 'kafkajs' import memoizee from 'memoizee' import { domain, getEnvOrDie } from './env.server' +import type { User } from '~/routes/_auth/user.server' const client_id = getEnvOrDie('KAFKA_CLIENT_ID') const client_secret = getEnvOrDie('KAFKA_CLIENT_SECRET') @@ -68,3 +79,238 @@ if (process.env.ARC_SANDBOX) { await producer.send({ topic, messages: [{ value }] }) } } + +export type KafkaACL = { + topicName: string + permissionType: PermissionType + group: string + prefixed: boolean +} + +export type PermissionType = 'producer' | 'consumer' + +export const adminGroup = 'gcn.nasa.gov/gcn-admin' + +const consumerOperations = [AclOperationTypes.READ, AclOperationTypes.DESCRIBE] +const producerOperations = [ + AclOperationTypes.CREATE, + AclOperationTypes.WRITE, + AclOperationTypes.DESCRIBE, +] + +const admin_client_id = getEnvOrDie('KAFKA_ADMIN_CLIENT_ID') +const admin_client_secret = getEnvOrDie('KAFKA_ADMIN_CLIENT_SECRET') +const adminClient = new Kafka({ + client_id: admin_client_id, + client_secret: admin_client_secret, + domain: 'dev.gcn.nasa.gov', // TODO: replace w/ useDomain +}).admin() + +function validateUser(user: User) { + if (!user.groups.includes(adminGroup)) + throw new Response(null, { status: 403 }) +} + +// Not sure if this is a useful method, but may be helpful if we +// want to verify that our table matches the defined kafka acls +export async function verifyKafkaACL(acl: KafkaACL) { + const operations = + acl.permissionType == 'producer' ? producerOperations : consumerOperations + + const promises = operations.map((operation) => + adminClient.describeAcls({ + resourceName: acl.topicName, + resourceType: AclResourceTypes.TOPIC, + host: '*', + permissionType: AclPermissionTypes.ALLOW, + operation, + resourcePatternType: ResourcePatternTypes.LITERAL, + }) + ) + + const results = await Promise.all(promises) + console.log(results) +} + +export async function createKafkaACL(user: User, acl: KafkaACL) { + validateUser(user) + // Save to db + const db = await tables() + await db.kafka_acls.put(acl) + + // Add to Kafka + await adminClient.connect() + await adminClient.createTopics({ + topics: [ + { + topic: acl.topicName, + }, + ], + }) + const acls = + acl.permissionType == 'producer' + ? createProducerAcls(acl) + : createConsumerAcls(acl) + await adminClient.createAcls({ acl: acls }) + await adminClient.disconnect() +} + +export async function getKafkaACLByTopicName(user: User, topicName: string) { + validateUser(user) + const db = await tables() + return (await db.kafka_acls.get({ topicName })) as KafkaACL +} + +export async function getKafkaACLsFromDynamoDB(user: User) { + validateUser(user) + const db = await tables() + const client = db._doc as unknown as DynamoDBDocument + const TableName = db.name('kafka_acls') + const pages = paginateScan({ client }, { TableName }) + const acls: KafkaACL[] = [] + for await (const page of pages) { + const newACL = page.Items as KafkaACL[] + if (newACL) acls.push(...newACL) + } + return acls +} + +export async function getAclsFromBrokers() { + await adminClient.connect() + const acls = await adminClient.describeAcls({ + resourceType: AclResourceTypes.TOPIC, + host: '*', + permissionType: AclPermissionTypes.ALLOW, + operation: AclOperationTypes.ANY, + resourcePatternType: ResourcePatternTypes.ANY, + }) + await adminClient.disconnect() + const results: KafkaACL[] = [] + for (const item of acls.resources) { + const topicName = item.resourceName + const prefixed = item.resourcePatternType === ResourcePatternTypes.PREFIXED + const producerRules = producerOperations.every((op) => + item.acls.map((x) => x.operation).includes(op) + ) + const producerGroup = + producerRules && + [ + ...new Set( + item.acls + .filter((acl) => producerOperations.includes(acl.operation)) + .map((x) => x.principal) + ), + ][0]?.replace('User:', '') + const consumerRules = consumerOperations.every((op) => + item.acls.map((x) => x.operation).includes(op) + ) + const consumerGroup = + consumerRules && + [ + ...new Set( + item.acls + .filter((acl) => consumerOperations.includes(acl.operation)) + .map((x) => x.principal) + ), + ][0]?.replace('User:', '') + if (producerRules && producerGroup) + results.push({ + topicName, + permissionType: 'producer', + group: producerGroup, + prefixed, + }) + if (consumerRules && consumerGroup) + results.push({ + topicName, + permissionType: 'consumer', + group: consumerGroup, + prefixed, + }) + } + return results +} + +export async function deleteKafkaACL(user: User, acl: KafkaACL) { + validateUser(user) + const db = await tables() + await db.kafka_acls.delete({ topicName: acl.topicName, group: acl.group }) + + const acls = + acl.permissionType == 'producer' + ? createProducerAcls(acl) + : createConsumerAcls(acl) + + await adminClient.connect() + await adminClient.deleteAcls({ filters: acls }) + await adminClient.disconnect() +} + +function createProducerAcls(acl: KafkaACL): AclEntry[] { + // Create, Write, and Describe operations + return mapAclAndOperations(acl, producerOperations) +} + +function createConsumerAcls(acl: KafkaACL): AclEntry[] { + // Read and Describe operations + return mapAclAndOperations(acl, consumerOperations) +} + +function mapAclAndOperations(acl: KafkaACL, operations: AclOperationTypes[]) { + return operations.map((operation) => { + return { + resourceType: AclResourceTypes.TOPIC, + resourceName: acl.topicName, + resourcePatternType: acl.prefixed + ? ResourcePatternTypes.PREFIXED + : ResourcePatternTypes.LITERAL, + principal: `User:${acl.group}`, + host: '*', + operation, + permissionType: AclPermissionTypes.ALLOW, + } + }) +} + +// TODO: Write these next +export async function updateBrokersFromDb(user: User) { + const dbDefinedAcls = await getKafkaACLsFromDynamoDB(user) + const mappedAcls = dbDefinedAcls.flatMap((x) => + x.permissionType === 'producer' + ? createProducerAcls(x) + : createConsumerAcls(x) + ) + await adminClient.connect() + await adminClient.createAcls({ acl: mappedAcls }) + await adminClient.disconnect() +} +export async function updateDbFromBrokers(user: User) { + const kafkaDefinedAcls = await getAclsFromBrokers() + const db = await tables() + await Promise.all([ + ...kafkaDefinedAcls.map((acl) => db.kafka_acls.put(acl)), + db.kafka_acl_log.put({ + partitionKey: 1, + syncedOn: Date.now(), + syncedBy: user.email, + }), + ]) +} + +type KafkaAclSyncLog = { + partitionKey: number + syncedOn: number + syncedBy: string +} + +export async function getLastSyncDate(): Promise { + const db = await tables() + return ( + await db.kafka_acl_log.query({ + KeyConditionExpression: 'partitionKey = :1', + ExpressionAttributeValues: { ':1': 1 }, + ScanIndexForward: false, + Limit: 1, + }) + ).Items.pop() as KafkaAclSyncLog +} diff --git a/app/root.tsx b/app/root.tsx index 6a2d9f8a2b..e1b9c6e0b4 100644 --- a/app/root.tsx +++ b/app/root.tsx @@ -46,6 +46,7 @@ import { useSpinDelay } from 'spin-delay' import invariant from 'tiny-invariant' import { features, getEnvOrDieInProduction, origin } from './lib/env.server' +import { adminGroup } from './lib/kafka.server' import { DevBanner } from './root/DevBanner' import { Footer } from './root/Footer' import NewsBanner from './root/NewsBanner' @@ -116,6 +117,7 @@ export async function loader({ request }: LoaderFunctionArgs) { const recaptchaSiteKey = getEnvOrDieInProduction('RECAPTCHA_SITE_KEY') const userIsMod = user?.groups.includes(moderatorGroup) const userIsVerifiedSubmitter = user?.groups.includes(group) + const userIsAdmin = user?.groups.includes(adminGroup) return { origin, @@ -126,6 +128,7 @@ export async function loader({ request }: LoaderFunctionArgs) { idp, userIsMod, userIsVerifiedSubmitter, + userIsAdmin, } } @@ -165,6 +168,11 @@ export function useSubmitterStatus() { return userIsVerifiedSubmitter } +export function useAdminStatus() { + const { userIsAdmin } = useLoaderDataRoot() + return userIsAdmin +} + export function useRecaptchaSiteKey() { const { recaptchaSiteKey } = useLoaderDataRoot() return recaptchaSiteKey @@ -274,7 +282,7 @@ export function Layout({ children }: { children?: ReactNode }) { function ErrorUnexpected({ children }: { children?: ReactNode }) { return ( -

Unexpected error {children}

+

Unexpected error {children}

An unexpected error occurred.

diff --git a/app/root/header/Header.tsx b/app/root/header/Header.tsx index dde3947adc..1e55d4c8a1 100644 --- a/app/root/header/Header.tsx +++ b/app/root/header/Header.tsx @@ -17,7 +17,7 @@ import { useEffect, useState } from 'react' import { useClickAnyWhere, useWindowSize } from 'usehooks-ts' import { Meatball } from '~/components/meatball/Meatball' -import { useEmail, useUserIdp } from '~/root' +import { useAdminStatus, useEmail, useUserIdp } from '~/root' import styles from './header.module.css' @@ -74,6 +74,7 @@ export function Header() { const [expanded, setExpanded] = useState(false) const [userMenuIsOpen, setUserMenuIsOpen] = useState(false) const isMobile = useWindowSize().width < 1024 + const userIsAdmin = useAdminStatus() function toggleMobileNav() { setExpanded((expanded) => !expanded) @@ -162,6 +163,11 @@ export function Header() { Profile , + userIsAdmin && ( + + Admin + + ), Peer Endorsements , diff --git a/app/routes/admin.kafka._index.tsx b/app/routes/admin.kafka._index.tsx new file mode 100644 index 0000000000..0df4d76006 --- /dev/null +++ b/app/routes/admin.kafka._index.tsx @@ -0,0 +1,167 @@ +/*! + * Copyright © 2023 United States Government as represented by the + * Administrator of the National Aeronautics and Space Administration. + * All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ +import type { LoaderFunctionArgs } from '@remix-run/node' +import { useFetcher, useLoaderData } from '@remix-run/react' +import type { ModalRef } from '@trussworks/react-uswds' +import { + Button, + Grid, + Icon, + Modal, + ModalFooter, + ModalHeading, + ModalToggleButton, +} from '@trussworks/react-uswds' +import { useRef } from 'react' + +import { getUser } from './_auth/user.server' +import HeadingWithAddButton from '~/components/HeadingWithAddButton' +import SegmentedCards from '~/components/SegmentedCards' +import Spinner from '~/components/Spinner' +import TimeAgo from '~/components/TimeAgo' +import type { KafkaACL } from '~/lib/kafka.server' +import { getKafkaACLsFromDynamoDB, getLastSyncDate } from '~/lib/kafka.server' + +export async function loader({ request }: LoaderFunctionArgs) { + const user = await getUser(request) + if (!user) throw new Response(null, { status: 403 }) + const dynamoDbAclData = await getKafkaACLsFromDynamoDB(user) + const latestSync = await getLastSyncDate() + return { dynamoDbAclData, latestSync } +} + +export default function Index() { + const { dynamoDbAclData, latestSync } = useLoaderData() + const aclFetcher = useFetcher() + + return ( + <> + Kafka Admin +

Kafka ACLs

+

+ Information about the Kafka ACLs listed here. Click the button to sync + the db to the kafka broker's current state. +

+ + + + {aclFetcher.state !== 'idle' && ( + + Saving... + + )} + + {latestSync && ( +

+ Last synced by {latestSync.syncedBy}{' '} + +

+ )} +

DynamoDB ACLs

+ {dynamoDbAclData && ( + <> + ({dynamoDbAclData.length}) ACLs + + {dynamoDbAclData + .sort((a, b) => a.topicName.localeCompare(b.topicName)) + .map((x, index) => ( + + ))} + + + )} + + ) +} + +function KafkaAclCard({ acl }: { acl: KafkaACL }) { + const ref = useRef(null) + const fetcher = useFetcher() + const disabled = fetcher.state !== 'idle' + + return ( + <> + +
+
+ + Topic: {acl.topicName} + +
+
+ + Permission Type: {acl.permissionType} + +
+
+ + Group: {acl.group} + +
+
+
+ + + Delete + +
+
+ + + + + + + Delete Kafka ACL + + + + + Cancel + + + + + + + ) +} diff --git a/app/routes/admin.kafka.edit.tsx b/app/routes/admin.kafka.edit.tsx new file mode 100644 index 0000000000..7d9af231a1 --- /dev/null +++ b/app/routes/admin.kafka.edit.tsx @@ -0,0 +1,95 @@ +/*! + * Copyright © 2023 United States Government as represented by the + * Administrator of the National Aeronautics and Space Administration. + * All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ +import type { LoaderFunctionArgs } from '@remix-run/node' +import { Form, useLoaderData } from '@remix-run/react' +import { + Button, + Checkbox, + Label, + Select, + TextInput, +} from '@trussworks/react-uswds' + +import { getUser } from './_auth/user.server' +import { getGroups } from '~/lib/cognito.server' + +export async function loader({ request }: LoaderFunctionArgs) { + const user = await getUser(request) + if (!user) throw new Response(null, { status: 403 }) + const userGroups = (await getGroups()) + .filter((group) => group.GroupName?.startsWith('gcn.nasa.gov/')) + .map((group) => group.GroupName) + + return { userGroups } +} + +export default function Kafka() { + const { userGroups } = useLoaderData() + return +} + +function KafkaAclForm({ groups }: { groups: string[] }) { + return ( + <> +

Create Kafka ACLs

+
+ + console.log(e.target.value)} + /> + + + + Producer will generate ACLs for the Create, Write, and Describe + operations. Consumer will generate ACLs for the Read and Describe + operations + + + + +
+ + If yes, submission will also trigger th generation of ACLs for the + provided topic name as a PREFIXED topic with a period included at + the end. For example, if checked, a topic of `gcn.notices.icecube` + will result in ACLs for both `gcn.notices.icecube` (literal) and + `gcn.notices.icecube.` (prefixed). + +
+ + + + ) +} diff --git a/app/routes/admin.kafka.tsx b/app/routes/admin.kafka.tsx new file mode 100644 index 0000000000..f42376550a --- /dev/null +++ b/app/routes/admin.kafka.tsx @@ -0,0 +1,101 @@ +/*! + * Copyright © 2023 United States Government as represented by the + * Administrator of the National Aeronautics and Space Administration. + * All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ +import type { ActionFunctionArgs } from '@remix-run/node' +import { NavLink, Outlet } from '@remix-run/react' +import { GridContainer, SideNav } from '@trussworks/react-uswds' + +import { getUser } from './_auth/user.server' +import type { PermissionType } from '~/lib/kafka.server' +import { + adminGroup, + createKafkaACL, + deleteKafkaACL, + updateDbFromBrokers, +} from '~/lib/kafka.server' +import { getFormDataString } from '~/lib/utils' + +export async function action({ request }: ActionFunctionArgs) { + const user = await getUser(request) + if (!user?.groups.includes(adminGroup)) + throw new Response(null, { status: 403 }) + const data = await request.formData() + const intent = getFormDataString(data, 'intent') + if (intent === 'migrateFromBroker') { + await updateDbFromBrokers(user) + return null + } + const topicName = getFormDataString(data, 'topicName') + const permissionType = getFormDataString( + data, + 'permissionType' + ) as PermissionType + const group = getFormDataString(data, 'group') + const includePrefixed = getFormDataString(data, 'includePrefixed') + if (!topicName || !permissionType || !group) + throw new Response(null, { status: 400 }) + const promises = [] + + switch (intent) { + case 'delete': + promises.push( + deleteKafkaACL(user, { + topicName, + permissionType, + group, + prefixed: false, + }) + ) + break + case 'create': + promises.push( + createKafkaACL(user, { + topicName, + permissionType, + group, + prefixed: false, + }) + ) + + if (includePrefixed) + promises.push( + createKafkaACL(user, { + topicName: `${topicName}.`, + permissionType, + group, + prefixed: true, + }) + ) + break + default: + break + } + await Promise.all(promises) + + return null +} + +export default function Kafka() { + return ( + +
+
+ + Kafka + , + ]} + /> +
+
+ +
+
+
+ ) +} diff --git a/sandbox-seed.json b/sandbox-seed.json index 8be8e4b9b0..c0a53f3f3b 100644 --- a/sandbox-seed.json +++ b/sandbox-seed.json @@ -5125,5 +5125,19 @@ "affiliation": "Example", "submit": 1 } + ], + "kafka_acls": [ + { + "topicName": "test_topic_created_from_website", + "permissionType": "consumer", + "group": "gcn.nasa.gov/kafka-gcn-test-consumer", + "prefixed": false + }, + { + "topicName": "test_topic_created_from_website", + "permissionType": "producer", + "group": "gcn.nasa.gov/kafka-gcn-test-producer", + "prefixed": false + } ] }