Skip to content

Commit

Permalink
✨ youtube livestream integration
Browse files Browse the repository at this point in the history
  • Loading branch information
Eric-Vondee committed Jul 30, 2024
1 parent 9ca8ba5 commit 5a5b4e7
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 146 deletions.
14 changes: 14 additions & 0 deletions packages/server/src/controllers/stage.controller.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { OrgIdDto } from '@dtos/organization/orgid.dto';
import { CreateStageDto } from '@dtos/stage/create-stage.dto';
import { CreateLiveStreamDto } from '@dtos/stage/livestream.dto';
import { UpdateStageDto } from '@dtos/stage/update-stage.dto';
import { IStage } from '@interfaces/stage.interface';
import StageService from '@services/stage.service';
Expand Down Expand Up @@ -116,4 +117,17 @@ export class StageController extends Controller {
const stage = await this.stageService.deleteOne(stageId);
return SendApiResponse('deleted', stage);
}

/**
* @summary Create Livestream on youtube & twitter
*/
@Security('jwt', ['org'])
@SuccessResponse('201')
@Post('livestream')
async youtubeStage(
@Body() body: CreateLiveStreamDto,
): Promise<IStandardResponse<{ streamKey: string; ingestUrl: string }>> {
const stream = await this.stageService.createLiveStream(body);
return SendApiResponse('livestream created', stream);
}
}
20 changes: 20 additions & 0 deletions packages/server/src/dtos/stage/livestream.dto.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { ILiveStream } from '@interfaces/stage.interface';
import { IsNotEmpty, IsString } from 'class-validator';

export class CreateLiveStreamDto implements ILiveStream {
@IsNotEmpty()
@IsString()
stageId: string;

@IsNotEmpty()
@IsString()
socialId: string;

@IsNotEmpty()
@IsString()
socialType: string;

@IsNotEmpty()
@IsString()
organizationId: string;
}
7 changes: 7 additions & 0 deletions packages/server/src/interfaces/stage.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ export interface IPlugin {
name: string;
}

export interface ILiveStream {
stageId: string;
socialId: string;
socialType: string;
organizationId: string;
}

export class IStage {
_id?: Types.ObjectId | string;
name: string;
Expand Down
19 changes: 18 additions & 1 deletion packages/server/src/services/stage.service.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import BaseController from '@databases/storage';
import { HttpException } from '@exceptions/HttpException';
import { IStage } from '@interfaces/stage.interface';
import { ILiveStream, IStage } from '@interfaces/stage.interface';
import Stage from '@models/stage.model';
import Events from '@models/event.model';
import { Types } from 'mongoose';
import { createStream, deleteStream, getStreamInfo } from '@utils/livepeer';
import { config } from '@config';
import Organization from '@models/organization.model';
import { refreshAccessToken } from '@utils/oauth';
import { createYoutubeLiveStream } from '@utils/youtube';

export default class StageService {
private path: string;
Expand Down Expand Up @@ -119,4 +122,18 @@ export default class StageService {
};
return metadata;
}

async createLiveStream(data: ILiveStream) {
const stage = await this.get(data.stageId);
const org = await Organization.findOne({ _id: stage.organizationId });
const token = org.socials.find(
(e) => e.type == data.socialType && e._id == data.socialId,
);
const refeshToken = await refreshAccessToken(token.refreshToken);
return await createYoutubeLiveStream({
accessToken: refeshToken,
title: stage.name,
streamDate: stage.streamDate.toString(),
});
}
}
219 changes: 74 additions & 145 deletions packages/server/src/utils/youtube.ts
Original file line number Diff line number Diff line change
@@ -1,156 +1,85 @@
import { youtube_v3 } from 'googleapis';
import { ISession } from '@interfaces/session.interface';
import { createReadStream, createWriteStream } from 'fs';
import https from 'https';
import StateService from '@services/state.service';
import EventService from '@services/event.service';
import { StateStatus, StateType } from '@interfaces/state.interface';
// Utility function to introduce a delay
function delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
import { getYoutubeClient } from './oauth';

// Checks the processing status of a YouTube video
async function checkVideoProcessingStatus(
videoId: string,
const createLiveBroadcast = async (
youtube: youtube_v3.Youtube,
): Promise<string> {
const response = await youtube.videos.list({
id: [videoId],
part: ['processingDetails'],
});

return response.data.items[0].processingDetails.processingStatus;
}

// Downloads an image from a URL and saves it to a file
async function downloadImage(
url: string,
filePath: string,
): Promise<{ success: boolean; message?: string }> {
return new Promise((resolve) => {
https
.get(url, (response) => {
if (response.statusCode === 200) {
const fileStream = createWriteStream(filePath);
response.pipe(fileStream);

fileStream.on('finish', () => {
fileStream.close();
console.log('Image download completed:', filePath);
resolve({ success: true });
});
} else {
console.error('Image download failed:', response.statusCode);
resolve({
success: false,
message: `Failed with status code: ${response.statusCode}`,
});
}
})
.on('error', (error) => {
console.error('Error downloading image:', error.message);
resolve({ success: false, message: error.message });
});
title: string,
streamDate: string,
): Promise<string> => {
const res: any = await youtube.liveBroadcasts.insert({
part: ['snippet', 'contentDetails', 'status'],
requestBody: {
snippet: {
title: title,
scheduledStartTime: new Date(streamDate).toISOString(),
},
contentDetails: {
monitorStream: {
enableMonitorStream: true,
},
},
status: {
privacyStatus: 'public',
},
},
});
}
return res.data.id;
};

// Sets the thumbnail for a YouTube video
async function setThumbnail(
const createLiveStream = async (
youtube: youtube_v3.Youtube,
videoId: string,
filePath: string,
stateService: StateService,
stateId: string,
): Promise<void> {
try {
const response = await youtube.thumbnails.set({
videoId: videoId,
media: {
body: createReadStream(filePath),
title: string,
): Promise<{ id: string; streamKey: string; ingestUrl: string }> => {
const res: any = await youtube.liveStreams.insert({
part: ['snippet', 'cdn', 'contentDetails', 'status'],
requestBody: {
snippet: {
title: title,
},
});

console.log('Thumbnail set successfully:', response.data);
await stateService.update(stateId, { status: StateStatus.completed });
} catch (error) {
console.error('Error setting thumbnail:', error);
await stateService.update(stateId, { status: StateStatus.canceled });
}
}

// Uploads a video to YouTube and handles thumbnail setting
export async function uploadToYouTube(
session: ISession,
youtube: youtube_v3.Youtube,
videoFilePath: string,
): Promise<void> {
const stateService = new StateService();

try {
const insertResponse = await youtube.videos.insert({
part: ['status', 'snippet'],
requestBody: {
snippet: {
title: session.name,
description: session.description,
defaultLanguage: 'en',
defaultAudioLanguage: 'en',
},
status: {
privacyStatus: session.published ? 'public' : 'unlisted',
selfDeclaredMadeForKids: false,
madeForKids: false,
},
cdn: {
frameRate: '30fps',
ingestionType: 'rtmp',
resolution: '720p',
},
media: {
body: createReadStream(videoFilePath),
status: {
streamStatus: 'active',
},
});

const state = await stateService.create({
type: StateType.video,
sessionId: session._id,
sessionSlug: session.slug,
});

let processingStatus = 'processing';
while (processingStatus === 'processing') {
processingStatus = await checkVideoProcessingStatus(
insertResponse.data.id,
youtube,
);
if (processingStatus === 'processing') {
await delay(180000); // Delay for 3 minutes
} else if (processingStatus === 'error') {
await stateService.update(state._id.toString(), {
status: StateStatus.canceled,
});
return;
}
}

if (!session.coverImage) return;
},
});
return {
id: res.data.id,
streamKey: res.data.cdn.ingestionInfo.streamName,
ingestUrl: res.data.cdn.ingestionInfo.ingestionAddress,
};
};

const filePath = `./tmp/${session.slug}.jpg`;
const imageResponse = await downloadImage(session.coverImage, filePath);
if (imageResponse.success) {
await setThumbnail(
youtube,
insertResponse.data.id,
filePath,
stateService,
state._id.toString(),
);
return;
}
const bindBroadCastToStream = async (
youtube: youtube_v3.Youtube,
broadcastId: string,
streamId: string,
) => {
await youtube.liveBroadcasts.bind({
part: ['id', 'contentDetails'],
id: broadcastId,
streamId: streamId,
});
};

await stateService.update(state._id.toString(), {
status: StateStatus.canceled,
});
return;
} catch (error) {
console.error('An error occurred:', error);
return;
}
}
export const createYoutubeLiveStream = async (data: {
accessToken: string;
title: string;
streamDate: string;
}): Promise<{ streamKey: string; ingestUrl: string }> => {
const youtube = await getYoutubeClient(data.accessToken);
const broadCastId = await createLiveBroadcast(
youtube,
data.title,
data.streamDate,
);
const stream = await createLiveStream(youtube, data.title);
await bindBroadCastToStream(youtube, broadCastId, stream.id);
return {
streamKey: stream.streamKey,
ingestUrl: stream.ingestUrl,
};
};

0 comments on commit 5a5b4e7

Please sign in to comment.