From d9845c82c565785973b99746c487af7c649ccaf9 Mon Sep 17 00:00:00 2001 From: Pablo Voorvaart Date: Fri, 3 Jan 2025 12:23:33 +0100 Subject: [PATCH] long transcribes supported --- packages/app/app/[organization]/layout.tsx | 1 + .../clips/[stageId]/ClipContext.tsx | 4 +- .../(no-side-bar)/clips/[stageId]/page.tsx | 22 +-- .../[stageId]/sidebar/Transcipts/index.tsx | 41 +++++ .../clips/[stageId]/sidebar/index.tsx | 15 +- .../components/SessionTranscriptions.tsx | 46 ++++-- .../(root)/library/[session]/page.tsx | 1 - packages/app/lib/actions/livepeer.ts | 3 + packages/server/src/databases/index.ts | 4 +- packages/server/workers/clips/index.ts | 24 ++- .../workers/session-transcriptions/index.ts | 149 +++++++++++++++++- 11 files changed, 260 insertions(+), 50 deletions(-) create mode 100644 packages/app/app/studio/[organization]/(no-side-bar)/clips/[stageId]/sidebar/Transcipts/index.tsx diff --git a/packages/app/app/[organization]/layout.tsx b/packages/app/app/[organization]/layout.tsx index 0adbf8dcd..89bbb2b1d 100644 --- a/packages/app/app/[organization]/layout.tsx +++ b/packages/app/app/[organization]/layout.tsx @@ -33,6 +33,7 @@ const Layout = async ({ const userData = await fetchUserAction(); + console.log(organization); if (!organization) { return NotFound(); } diff --git a/packages/app/app/studio/[organization]/(no-side-bar)/clips/[stageId]/ClipContext.tsx b/packages/app/app/studio/[organization]/(no-side-bar)/clips/[stageId]/ClipContext.tsx index 1e7eea736..5bbb6ab99 100644 --- a/packages/app/app/studio/[organization]/(no-side-bar)/clips/[stageId]/ClipContext.tsx +++ b/packages/app/app/studio/[organization]/(no-side-bar)/clips/[stageId]/ClipContext.tsx @@ -103,10 +103,8 @@ export const ClipProvider = ({ organizationId: string; clipUrl: string; }) => { - const { handleTermChange, searchParams } = useSearchParams(); + const { searchParams } = useSearchParams(); - const start = searchParams?.get('start'); - const end = searchParams?.get('end'); const [playbackStatus, setPlaybackStatus] = useState( null ); diff --git a/packages/app/app/studio/[organization]/(no-side-bar)/clips/[stageId]/page.tsx b/packages/app/app/studio/[organization]/(no-side-bar)/clips/[stageId]/page.tsx index 0497c70cf..0e8bb993c 100644 --- a/packages/app/app/studio/[organization]/(no-side-bar)/clips/[stageId]/page.tsx +++ b/packages/app/app/studio/[organization]/(no-side-bar)/clips/[stageId]/page.tsx @@ -1,9 +1,7 @@ import { fetchOrganization } from '@/lib/services/organizationService'; import { fetchAllSessions, - fetchAsset, - fetchSession, - sessionImport, + fetchSession } from '@/lib/services/sessionService'; import { fetchStage, fetchStageRecordings } from '@/lib/services/stageService'; import { ClipsPageParams } from '@/lib/types'; @@ -31,12 +29,11 @@ const fetchVideoDetails = async ( const stageRecordings = await fetchStageRecordings({ streamId }); if (!stageRecordings?.recordings[0]) return null; - return { - videoSrc: stageRecordings.recordings[0].recordingUrl, + videoSrc: `https://livepeercdn.studio/hls/${liveStage.streamSettings?.playbackId}/index.m3u8`, type: 'livepeer', name: liveStage.name, - words: liveStage.transcripts?.text, + words: liveStage.transcripts?.chunks, liveRecording: stageRecordings.recordings[0], }; } @@ -47,13 +44,13 @@ const fetchVideoDetails = async ( const stage = await fetchStage({ stage: session.stageId as string }); if (!stage?.streamSettings?.playbackId) return null; - + console.log('session', session.transcripts?.chunks); const videoSrc = await getVideoUrlAction(session); return { videoSrc, type: 'livepeer', name: session.name, - words: session.transcripts?.subtitleUrl, + words: session.transcripts?.chunks, }; } @@ -65,6 +62,7 @@ const fetchVideoDetails = async ( videoSrc: stage.source.m3u8Url, type: stage.source.type, name: stage.name, + words: stage.transcripts?.chunks, }; } @@ -106,13 +104,6 @@ const ClipsConfig = async ({ params, searchParams }: ClipsPageParams) => { clipUrl={videoDetails.videoSrc} >
- {/*
- {words?.split('\n').map((word) => ( -
- {word} -
- ))} -
*/}
{ stageSessions={stageSessions.sessions} organizationId={organizationId} animations={animations.sessions} + words={videoDetails.words} />
diff --git a/packages/app/app/studio/[organization]/(no-side-bar)/clips/[stageId]/sidebar/Transcipts/index.tsx b/packages/app/app/studio/[organization]/(no-side-bar)/clips/[stageId]/sidebar/Transcipts/index.tsx new file mode 100644 index 000000000..26d029afe --- /dev/null +++ b/packages/app/app/studio/[organization]/(no-side-bar)/clips/[stageId]/sidebar/Transcipts/index.tsx @@ -0,0 +1,41 @@ +"use client" +import { useClipContext } from '../../ClipContext'; + +const Transcripts = ({ + words, +}: { + words: { word: string; start: number }[]; +}) => { + const { currentTime, videoRef } = useClipContext(); + + // Helper function to determine if a word should be highlighted + const isWordActive = ( + word: { word: string; start: number }, + currentTime: number + ) => { + // You might want to adjust this logic based on your requirements + return word.start <= currentTime && word.start + 1 > currentTime; + }; + + return ( +
+ {words?.map((word, index) => ( + { + if (videoRef.current) { + videoRef.current.currentTime = word.start; + } + }} + > + {word.word} + + ))} +
+ ); +}; + +export default Transcripts; diff --git a/packages/app/app/studio/[organization]/(no-side-bar)/clips/[stageId]/sidebar/index.tsx b/packages/app/app/studio/[organization]/(no-side-bar)/clips/[stageId]/sidebar/index.tsx index 14e15c478..7105a0619 100644 --- a/packages/app/app/studio/[organization]/(no-side-bar)/clips/[stageId]/sidebar/index.tsx +++ b/packages/app/app/studio/[organization]/(no-side-bar)/clips/[stageId]/sidebar/index.tsx @@ -7,17 +7,24 @@ import CreateClipButton from '../topBar/CreateClipButton'; import AddOrEditMarkerForm from './markers/AddOrEditMarkerForm'; import { IExtendedSession } from '@/lib/types'; import ImportMarkersForm from './markers/ImportMarkersForm'; +import Transcripts from './Transcipts'; export default function Sidebar({ organizationId, stageSessions, liveRecordingId, animations, + words, }: { organizationId: string; stageSessions: IExtendedSession[]; liveRecordingId?: string; animations: IExtendedSession[]; + words?: { + word: string; + start: number; + end: number; + }[]; }) { const { isCreatingClip, isAddingOrEditingMarker, isImportingMarkers } = useClipContext(); @@ -57,9 +64,10 @@ export default function Sidebar({ ) )} - + Markers Clips + {words && Words} {} @@ -67,6 +75,11 @@ export default function Sidebar({ + {words && ( + + + + )} ); diff --git a/packages/app/app/studio/[organization]/(root)/library/[session]/components/SessionTranscriptions.tsx b/packages/app/app/studio/[organization]/(root)/library/[session]/components/SessionTranscriptions.tsx index f43d7165c..ef5e12bc6 100644 --- a/packages/app/app/studio/[organization]/(root)/library/[session]/components/SessionTranscriptions.tsx +++ b/packages/app/app/studio/[organization]/(root)/library/[session]/components/SessionTranscriptions.tsx @@ -44,27 +44,41 @@ const SessionTranscriptions = ({ }); }; - if (transcriptionState === TranscriptionStatus.processing) { - return ( -
- Processing - transcription...{' '} -

router.refresh()} - > - -

-
- ); - } + // if (transcriptionState === TranscriptionStatus.processing) { + // return ( + //
+ // Processing + // transcription...{' '} + //

router.refresh()} + // > + // + //

+ //
+ // ); + // } if ( transcriptionState === TranscriptionStatus.completed && videoTranscription ) { - return ; + return ( +
+ + +
+ ); } if (transcriptionState === TranscriptionStatus.failed) { diff --git a/packages/app/app/studio/[organization]/(root)/library/[session]/page.tsx b/packages/app/app/studio/[organization]/(root)/library/[session]/page.tsx index af872c482..1c240c830 100644 --- a/packages/app/app/studio/[organization]/(root)/library/[session]/page.tsx +++ b/packages/app/app/studio/[organization]/(root)/library/[session]/page.tsx @@ -29,7 +29,6 @@ const EditSession = async ({ params, searchParams }: studioPageParams) => { session: params.session, }); - console.log(session?.transcripts?.chunks[0]); if (!session?.playbackId || !organization) return notFound(); return ( diff --git a/packages/app/lib/actions/livepeer.ts b/packages/app/lib/actions/livepeer.ts index 68f92eb62..a5c6e0597 100644 --- a/packages/app/lib/actions/livepeer.ts +++ b/packages/app/lib/actions/livepeer.ts @@ -22,6 +22,9 @@ export const getVideoUrlAction = async ( session: IExtendedSession ): Promise => { try { + if (session.playback?.videoUrl) { + return session.playback.videoUrl; + } if (session.assetId) { const asset = await fetchAsset({ assetId: session.assetId }); if (asset?.playbackUrl) { diff --git a/packages/server/src/databases/index.ts b/packages/server/src/databases/index.ts index 35d8f895a..6a13ea3ff 100644 --- a/packages/server/src/databases/index.ts +++ b/packages/server/src/databases/index.ts @@ -8,9 +8,9 @@ console.log('Database:', name); console.log('Password length:', password?.length); export const dbConnection = { - url: `mongodb://${user}:${password}@${host}/${name}?authSource=admin&retryWrites=true&w=majority`, + //rl: `mongodb://${user}:${password}@${host}/${name}?authSource=admin&retryWrites=true&w=majority`, // For local development use this url - // url: `mongodb+srv://${user}:${password}@${host}/${name}?authSource=admin`, + url: `mongodb+srv://${user}:${password}@${host}/${name}?authSource=admin`, options: { useNewUrlParser: true, useUnifiedTopology: true, diff --git a/packages/server/workers/clips/index.ts b/packages/server/workers/clips/index.ts index e9532d97d..38e5470c5 100644 --- a/packages/server/workers/clips/index.ts +++ b/packages/server/workers/clips/index.ts @@ -50,21 +50,31 @@ const processClip = async (data: IClip) => { ); } const masterContent = await masterResponse.text(); - + console.log('masterContent', masterContent); // 2. Find the 1080p variant const linesMaster = masterContent.split('\n'); let variantUrl = ''; + let maxBandwidth = -1; for (let i = 0; i < linesMaster.length; i++) { - if (linesMaster[i].includes('1080p0')) { - variantUrl = linesMaster[i + 1].trim(); - break; + if (linesMaster[i].startsWith('#EXT-X-STREAM-INF')) { + // Parse bandwidth from the stream info + const bandwidthMatch = linesMaster[i].match(/BANDWIDTH=(\d+)/); + if (bandwidthMatch) { + const bandwidth = parseInt(bandwidthMatch[1]); + if (bandwidth > maxBandwidth) { + maxBandwidth = bandwidth; + variantUrl = linesMaster[i + 1].trim(); + } + } } } - variantUrl = clipUrl.replace('index.m3u8', variantUrl); + console.log('Selected variant URL:', variantUrl); + variantUrl = clipUrl.replace('index.m3u8', variantUrl); + console.log('Full variant URL:', variantUrl); if (!variantUrl) { - throw new Error('1080p variant not found in master playlist'); + throw new Error('No valid variant found in master playlist'); } const duration = end - start; @@ -77,7 +87,7 @@ const processClip = async (data: IClip) => { const manifestResponse = await fetch(variantUrl); if (!manifestResponse.ok) { throw new Error( - `Failed to fetch manifest: ${manifestResponse.statusText}`, + `Failed to fetch manifest ${variantUrl}: ${manifestResponse.statusText}`, ); } const manifestContent = await manifestResponse.text(); diff --git a/packages/server/workers/session-transcriptions/index.ts b/packages/server/workers/session-transcriptions/index.ts index 2f09c9094..423239539 100644 --- a/packages/server/workers/session-transcriptions/index.ts +++ b/packages/server/workers/session-transcriptions/index.ts @@ -10,6 +10,8 @@ import { ISession } from '@interfaces/session.interface'; import { TranscriptionStatus } from '@interfaces/state.interface'; import SessionService from '@services/session.service'; import Session from '@models/session.model'; +import fs from 'fs'; +import path from 'path'; interface SessionTranscriptionsJob { session: { @@ -113,6 +115,122 @@ const updateTranscriptionStatus = async ( }); }; +const splitAudioIntoChunks = async ( + inputPath: string, + maxChunkSize: number = 5 * 1024 * 1024 // Reduced to 5MB to be safe +): Promise => { + const tempDir = tmpdir(); + const chunkPaths: string[] = []; + + // Get audio duration + const duration = await new Promise((resolve, reject) => { + ffmpeg.ffprobe(inputPath, (err, metadata) => { + if (err) reject(err); + resolve(metadata.format.duration || 0); + }); + }); + + // Calculate chunk duration based on file size and total duration + const stats = await fs.promises.stat(inputPath); + console.log('stats', stats.size); + const numberOfChunks = Math.ceil(stats.size / maxChunkSize); + console.log('numberOfChunks', numberOfChunks); + const chunkDuration = duration / numberOfChunks; + console.log('chunkDuration', chunkDuration); + // Split into chunks + for (let i = 0; i < numberOfChunks; i++) { + const startTime = i * chunkDuration; + const chunkPath = join(tempDir, `chunk_${i}_${path.basename(inputPath)}`); + + await new Promise((resolve, reject) => { + ffmpeg(inputPath) + .setStartTime(startTime) + .setDuration(chunkDuration) + .audioCodec('libmp3lame') + .audioBitrate('16k') // Lower bitrate for smaller file size + .audioChannels(1) // Mono audio + .audioFrequency(8000) // 16kHz sample rate + .output(chunkPath) + .on('end', () => resolve()) + .on('error', reject) + .run(); + }); + + // Verify chunk size + const chunkStats = await fs.promises.stat(chunkPath); + console.log('chunkStats', chunkStats.size); + if (chunkStats.size > maxChunkSize) { + console.warn(`Chunk ${i} is too large (${chunkStats.size} bytes). Recreating with lower quality...`); + // If still too large, recreate with even lower quality + await new Promise((resolve, reject) => { + ffmpeg(inputPath) + .setStartTime(startTime) + .setDuration(chunkDuration) + .audioCodec('libmp3lame') + .audioBitrate('16k') // Even lower bitrate + .audioChannels(1) + .audioFrequency(8000) // Lower sample rate + .output(chunkPath) + .on('end', () => resolve()) + .on('error', reject) + .run(); + }); + } + + chunkPaths.push(chunkPath); + } + + return chunkPaths; +}; + +const mergeTranscripts = (chunks: any[]): any => { + let offset = 0; + const mergedWords = chunks.flatMap((chunk, index) => { + // Adjust timestamps for each chunk + const adjustedWords = chunk.words.map((word: any) => ({ + ...word, + start: word.start + offset, + end: word.end + offset + })); + + // Update offset for next chunk + if (chunks[index + 1]) { + const lastWord = chunk.words[chunk.words.length - 1]; + offset += lastWord.end; + } + + return adjustedWords; + }); + + return { + text: chunks.map(chunk => chunk.text).join(' '), + words: mergedWords + }; +}; + +const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); + +const transcribeWithRetry = async (chunkPath: string, retries = 3, delay = 1000) => { + // Add size verification before attempting transcription + const stats = await fs.promises.stat(chunkPath); + const maxSize = 5 * 1024 * 1024; // 5MB + + if (stats.size > maxSize) { + throw new Error(`File size (${stats.size} bytes) exceeds Whisper's limit of ${maxSize} bytes`); + } + + for (let i = 0; i < retries; i++) { + try { + return await WhisperAPI.transcribe(chunkPath); + } catch (error) { + if (i === retries - 1) throw error; + console.log(`Attempt ${i + 1} failed, retrying after ${delay}ms...`); + await sleep(delay); + delay *= 2; // Exponential backoff + } + } +}; + export async function transcribeAudio( streamUrl: string, session: ISession, @@ -130,7 +248,7 @@ export async function transcribeAudio( '-probesize', '20M', ]) - .audioBitrate('32k') // Very low bitrate + .audioBitrate('16k') // Very low bitrate .audioCodec('libmp3lame') // Use MP3 codec .audioFrequency(8000) // 16kHz audio frequency .audioChannels(1) // Mono audio @@ -148,16 +266,34 @@ export async function transcribeAudio( .on('end', async () => { console.log('FFmpeg processing completed'); try { - const transcript = await WhisperAPI.transcribe(outputPath); + // Split audio into chunks + const chunks = await splitAudioIntoChunks(outputPath); + console.log(`Split audio into ${chunks.length} chunks`); + + // Transcribe chunks sequentially instead of in parallel + const transcriptions = []; + for (const [index, chunkPath] of chunks.entries()) { + console.log(`Processing chunk ${index + 1}/${chunks.length}`); + const transcription = await transcribeWithRetry(chunkPath); + transcriptions.push(transcription); + + // Clean up chunk file after processing + await fs.promises.unlink(chunkPath).catch(console.error); + } + + // Merge transcriptions + const mergedTranscript = mergeTranscripts(transcriptions); + + // Update session with merged transcript await Session.findByIdAndUpdate( session._id, { $set: { 'transcripts.status': TranscriptionStatus.completed, - 'transcripts.text': transcript.text, + 'transcripts.text': mergedTranscript.text, 'transcripts.lastSegmentTimestamp': 0, - 'transcripts.chunks': transcript.words, - 'transcripts.subtitleUrl': await generateVtt(transcript.words), + 'transcripts.chunks': mergedTranscript.words, + 'transcripts.subtitleUrl': await generateVtt(mergedTranscript.words), } }, { runValidators: false } @@ -166,6 +302,9 @@ export async function transcribeAudio( } catch (err) { console.error('Transcription error:', err); reject(err); + } finally { + // Clean up the original file + fs.promises.unlink(outputPath).catch(console.error); } });