From 5a5b4e7949c8cf85bc1cb59ab73bb48c34e3365f Mon Sep 17 00:00:00 2001 From: Eric-Vondee Date: Tue, 30 Jul 2024 23:02:01 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20youtube=20livestream=20integration?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/controllers/stage.controller.ts | 14 ++ .../server/src/dtos/stage/livestream.dto.ts | 20 ++ .../server/src/interfaces/stage.interface.ts | 7 + packages/server/src/services/stage.service.ts | 19 +- packages/server/src/utils/youtube.ts | 219 ++++++------------ 5 files changed, 133 insertions(+), 146 deletions(-) create mode 100644 packages/server/src/dtos/stage/livestream.dto.ts diff --git a/packages/server/src/controllers/stage.controller.ts b/packages/server/src/controllers/stage.controller.ts index 430fcc595..241486a62 100644 --- a/packages/server/src/controllers/stage.controller.ts +++ b/packages/server/src/controllers/stage.controller.ts @@ -1,5 +1,6 @@ import { OrgIdDto } from '@dtos/organization/orgid.dto'; import { CreateStageDto } from '@dtos/stage/create-stage.dto'; +import { CreateLiveStreamDto } from '@dtos/stage/livestream.dto'; import { UpdateStageDto } from '@dtos/stage/update-stage.dto'; import { IStage } from '@interfaces/stage.interface'; import StageService from '@services/stage.service'; @@ -116,4 +117,17 @@ export class StageController extends Controller { const stage = await this.stageService.deleteOne(stageId); return SendApiResponse('deleted', stage); } + + /** + * @summary Create Livestream on youtube & twitter + */ + @Security('jwt', ['org']) + @SuccessResponse('201') + @Post('livestream') + async youtubeStage( + @Body() body: CreateLiveStreamDto, + ): Promise> { + const stream = await this.stageService.createLiveStream(body); + return SendApiResponse('livestream created', stream); + } } diff --git a/packages/server/src/dtos/stage/livestream.dto.ts b/packages/server/src/dtos/stage/livestream.dto.ts new file mode 100644 index 000000000..fec53be37 --- /dev/null +++ b/packages/server/src/dtos/stage/livestream.dto.ts @@ -0,0 +1,20 @@ +import { ILiveStream } from '@interfaces/stage.interface'; +import { IsNotEmpty, IsString } from 'class-validator'; + +export class CreateLiveStreamDto implements ILiveStream { + @IsNotEmpty() + @IsString() + stageId: string; + + @IsNotEmpty() + @IsString() + socialId: string; + + @IsNotEmpty() + @IsString() + socialType: string; + + @IsNotEmpty() + @IsString() + organizationId: string; +} diff --git a/packages/server/src/interfaces/stage.interface.ts b/packages/server/src/interfaces/stage.interface.ts index ce2a2288f..54e42781c 100644 --- a/packages/server/src/interfaces/stage.interface.ts +++ b/packages/server/src/interfaces/stage.interface.ts @@ -20,6 +20,13 @@ export interface IPlugin { name: string; } +export interface ILiveStream { + stageId: string; + socialId: string; + socialType: string; + organizationId: string; +} + export class IStage { _id?: Types.ObjectId | string; name: string; diff --git a/packages/server/src/services/stage.service.ts b/packages/server/src/services/stage.service.ts index a67124ffa..ffdaf2aa4 100644 --- a/packages/server/src/services/stage.service.ts +++ b/packages/server/src/services/stage.service.ts @@ -1,11 +1,14 @@ import BaseController from '@databases/storage'; import { HttpException } from '@exceptions/HttpException'; -import { IStage } from '@interfaces/stage.interface'; +import { ILiveStream, IStage } from '@interfaces/stage.interface'; import Stage from '@models/stage.model'; import Events from '@models/event.model'; import { Types } from 'mongoose'; import { createStream, deleteStream, getStreamInfo } from '@utils/livepeer'; import { config } from '@config'; +import Organization from '@models/organization.model'; +import { refreshAccessToken } from '@utils/oauth'; +import { createYoutubeLiveStream } from '@utils/youtube'; export default class StageService { private path: string; @@ -119,4 +122,18 @@ export default class StageService { }; return metadata; } + + async createLiveStream(data: ILiveStream) { + const stage = await this.get(data.stageId); + const org = await Organization.findOne({ _id: stage.organizationId }); + const token = org.socials.find( + (e) => e.type == data.socialType && e._id == data.socialId, + ); + const refeshToken = await refreshAccessToken(token.refreshToken); + return await createYoutubeLiveStream({ + accessToken: refeshToken, + title: stage.name, + streamDate: stage.streamDate.toString(), + }); + } } diff --git a/packages/server/src/utils/youtube.ts b/packages/server/src/utils/youtube.ts index 9cb7de722..d2ab2d688 100644 --- a/packages/server/src/utils/youtube.ts +++ b/packages/server/src/utils/youtube.ts @@ -1,156 +1,85 @@ import { youtube_v3 } from 'googleapis'; -import { ISession } from '@interfaces/session.interface'; -import { createReadStream, createWriteStream } from 'fs'; -import https from 'https'; -import StateService from '@services/state.service'; -import EventService from '@services/event.service'; -import { StateStatus, StateType } from '@interfaces/state.interface'; -// Utility function to introduce a delay -function delay(ms: number): Promise { - return new Promise((resolve) => setTimeout(resolve, ms)); -} +import { getYoutubeClient } from './oauth'; -// Checks the processing status of a YouTube video -async function checkVideoProcessingStatus( - videoId: string, +const createLiveBroadcast = async ( youtube: youtube_v3.Youtube, -): Promise { - const response = await youtube.videos.list({ - id: [videoId], - part: ['processingDetails'], - }); - - return response.data.items[0].processingDetails.processingStatus; -} - -// Downloads an image from a URL and saves it to a file -async function downloadImage( - url: string, - filePath: string, -): Promise<{ success: boolean; message?: string }> { - return new Promise((resolve) => { - https - .get(url, (response) => { - if (response.statusCode === 200) { - const fileStream = createWriteStream(filePath); - response.pipe(fileStream); - - fileStream.on('finish', () => { - fileStream.close(); - console.log('Image download completed:', filePath); - resolve({ success: true }); - }); - } else { - console.error('Image download failed:', response.statusCode); - resolve({ - success: false, - message: `Failed with status code: ${response.statusCode}`, - }); - } - }) - .on('error', (error) => { - console.error('Error downloading image:', error.message); - resolve({ success: false, message: error.message }); - }); + title: string, + streamDate: string, +): Promise => { + const res: any = await youtube.liveBroadcasts.insert({ + part: ['snippet', 'contentDetails', 'status'], + requestBody: { + snippet: { + title: title, + scheduledStartTime: new Date(streamDate).toISOString(), + }, + contentDetails: { + monitorStream: { + enableMonitorStream: true, + }, + }, + status: { + privacyStatus: 'public', + }, + }, }); -} + return res.data.id; +}; -// Sets the thumbnail for a YouTube video -async function setThumbnail( +const createLiveStream = async ( youtube: youtube_v3.Youtube, - videoId: string, - filePath: string, - stateService: StateService, - stateId: string, -): Promise { - try { - const response = await youtube.thumbnails.set({ - videoId: videoId, - media: { - body: createReadStream(filePath), + title: string, +): Promise<{ id: string; streamKey: string; ingestUrl: string }> => { + const res: any = await youtube.liveStreams.insert({ + part: ['snippet', 'cdn', 'contentDetails', 'status'], + requestBody: { + snippet: { + title: title, }, - }); - - console.log('Thumbnail set successfully:', response.data); - await stateService.update(stateId, { status: StateStatus.completed }); - } catch (error) { - console.error('Error setting thumbnail:', error); - await stateService.update(stateId, { status: StateStatus.canceled }); - } -} - -// Uploads a video to YouTube and handles thumbnail setting -export async function uploadToYouTube( - session: ISession, - youtube: youtube_v3.Youtube, - videoFilePath: string, -): Promise { - const stateService = new StateService(); - - try { - const insertResponse = await youtube.videos.insert({ - part: ['status', 'snippet'], - requestBody: { - snippet: { - title: session.name, - description: session.description, - defaultLanguage: 'en', - defaultAudioLanguage: 'en', - }, - status: { - privacyStatus: session.published ? 'public' : 'unlisted', - selfDeclaredMadeForKids: false, - madeForKids: false, - }, + cdn: { + frameRate: '30fps', + ingestionType: 'rtmp', + resolution: '720p', }, - media: { - body: createReadStream(videoFilePath), + status: { + streamStatus: 'active', }, - }); - - const state = await stateService.create({ - type: StateType.video, - sessionId: session._id, - sessionSlug: session.slug, - }); - - let processingStatus = 'processing'; - while (processingStatus === 'processing') { - processingStatus = await checkVideoProcessingStatus( - insertResponse.data.id, - youtube, - ); - if (processingStatus === 'processing') { - await delay(180000); // Delay for 3 minutes - } else if (processingStatus === 'error') { - await stateService.update(state._id.toString(), { - status: StateStatus.canceled, - }); - return; - } - } - - if (!session.coverImage) return; + }, + }); + return { + id: res.data.id, + streamKey: res.data.cdn.ingestionInfo.streamName, + ingestUrl: res.data.cdn.ingestionInfo.ingestionAddress, + }; +}; - const filePath = `./tmp/${session.slug}.jpg`; - const imageResponse = await downloadImage(session.coverImage, filePath); - if (imageResponse.success) { - await setThumbnail( - youtube, - insertResponse.data.id, - filePath, - stateService, - state._id.toString(), - ); - return; - } +const bindBroadCastToStream = async ( + youtube: youtube_v3.Youtube, + broadcastId: string, + streamId: string, +) => { + await youtube.liveBroadcasts.bind({ + part: ['id', 'contentDetails'], + id: broadcastId, + streamId: streamId, + }); +}; - await stateService.update(state._id.toString(), { - status: StateStatus.canceled, - }); - return; - } catch (error) { - console.error('An error occurred:', error); - return; - } -} +export const createYoutubeLiveStream = async (data: { + accessToken: string; + title: string; + streamDate: string; +}): Promise<{ streamKey: string; ingestUrl: string }> => { + const youtube = await getYoutubeClient(data.accessToken); + const broadCastId = await createLiveBroadcast( + youtube, + data.title, + data.streamDate, + ); + const stream = await createLiveStream(youtube, data.title); + await bindBroadCastToStream(youtube, broadCastId, stream.id); + return { + streamKey: stream.streamKey, + ingestUrl: stream.ingestUrl, + }; +};