Skip to content

Commit

Permalink
⚡️ Video processing fix (#852)
Browse files Browse the repository at this point in the history
# Pull Request Info

## Description
"Refactor: Livepeer Webhook and Session Model Enhancements

This pull request refines the Livepeer webhook handling system, focusing
on 'asset ready' and 'asset failed' events.

Key improvements include:
- Updating the Session model to support new propert fields.
- Modifying the state object to accurately reflect current asset status.

These enhancements boost our system's ability to process Livepeer
notifications, ensuring better user interactions.

## Type of change
- [x] Code refactor (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)

Fixes (#775 )
## Checklist:

- [x] My code follows the style guidelines of this project
- [ ] I have performed a self-review of my own code
- [ ] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my
feature works
- [ ] New and existing unit tests pass locally with my changes
- [ ] Any dependent changes have been merged and published
  • Loading branch information
Eric-Vondee authored Aug 10, 2024
1 parent 268713c commit 11c0363
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 50 deletions.
79 changes: 34 additions & 45 deletions packages/server/src/controllers/index.controller.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
import { IStandardResponse, SendApiResponse } from '@utils/api.response';
import { HttpException } from '@exceptions/HttpException';
import { LivepeerEvent } from '@interfaces/livepeer.interface';
import { ClippingStatus } from '@interfaces/session.interface';
import { StateStatus, StateType } from '@interfaces/state.interface';
import SessionService from '@services/session.service';
import StageService from '@services/stage.service';
import StateService from '@services/state.service';
import { type IStandardResponse, SendApiResponse } from '@utils/api.response';
import { updateEventVideoById } from '@utils/firebase';
import { getAsset, getDownloadUrl } from '@utils/livepeer';
import StorageService from '@utils/s3';
import { validateWebhook } from '@utils/validateWebhook';
import {
Body,
Controller,
FormField,
Get,
Header,
Post,
Route,
Security,
Tags,
Body,
Post,
Header,
UploadedFile,
FormField,
Security,
} from 'tsoa';
import startAITools from '@aitools/main';
import { validateWebhook } from '@utils/validateWebhook';
import StageService from '@services/stage.service';
import { LivepeerEvent } from '@interfaces/livepeer.interface';
import SessionService from '@services/session.service';
import { getAsset, getDownloadUrl } from '@utils/livepeer';
import StateService from '@services/state.service';
import { StateStatus } from '@interfaces/state.interface';
import StorageService from '@utils/s3';
import { HttpException } from '@exceptions/HttpException';
import { updateEventVideoById } from '@utils/firebase';

@Tags('Index')
@Route('')
Expand Down Expand Up @@ -96,18 +96,12 @@ export class IndexController extends Controller {

private async assetReady(id: string) {
const asset = await getAsset(id);
const session = await this.sessionService.findOne({
assetId: asset.id,
});

if (!session) {
return SendApiResponse('No session found', null, '400');
}
// const ipfs = await uploadToIpfs(id);
const session = await this.sessionService.findOne({ assetId: asset.id });
if (!session) throw new HttpException(404, 'No session found');
await this.sessionService.update(session._id.toString(), {
// ipfsURI: ipfs,
videoUrl: asset.playbackUrl,
playbackId: asset.playbackId,
clippingStatus: ClippingStatus.completed,
} as any);

if (session.firebaseId && asset.playbackUrl) {
Expand All @@ -116,19 +110,14 @@ export class IndexController extends Controller {
mp4Url: await getDownloadUrl(asset.id),
});
}

const state = await this.stateService.getAll({
sessionId: session._id.toString(),
const state = await this.stateService.findOne({
_id: session._id.toString(),
type: StateType.video,
});

if (!state || state.length === 0) {
return SendApiResponse('No state found', null, '400');
}
await this.stateService.update(state[0]._id.toString(), {
if (!state) throw new HttpException(404, 'No state found');
await this.stateService.update(state._id.toString(), {
status: StateStatus.completed,
});

// await startAITools(payload.payload.id);
}

private async assetFailed(id: string) {
Expand All @@ -137,20 +126,20 @@ export class IndexController extends Controller {
assetId: asset.id,
});

if (!session) {
throw 'No session found';
}
if (!session) throw new HttpException(404, 'No session found');

const state = await this.stateService.getAll({
const state = await this.stateService.findOne({
sessionId: session._id.toString(),
type: StateType.video,
});
if (!state) throw new HttpException(404, 'No state found');

if (!state || state.length === 0) {
throw 'No state found';
}
await this.sessionService.update(session._id.toString(), {
clippingStatus: ClippingStatus.failed,
} as any);

await this.stateService.update(state[0]._id.toString(), {
status: StateStatus.error,
await this.stateService.update(state._id.toString(), {
status: StateStatus.failed,
});
}
}
11 changes: 9 additions & 2 deletions packages/server/src/interfaces/session.interface.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Document, Types } from 'mongoose';
import { ISpeaker } from './speaker.interface';
import type { Document, Types } from 'mongoose';
import type { ISpeaker } from './speaker.interface';

export interface ISource {
streamUrl?: string;
Expand All @@ -21,6 +21,12 @@ export enum SessionType {
video = 'video',
}

export enum ClippingStatus {
pending = 'pending',
failed = 'failed',
completed = 'completed',
}

export interface ISession {
_id?: Types.ObjectId;
name: string;
Expand Down Expand Up @@ -54,6 +60,7 @@ export interface ISession {
socials?: { name: string; date: number }[];
firebaseId?: string;
talkType?: string;
clippingStatus?: ClippingStatus;
}

export interface ISessionModel extends Omit<ISession, '_id'>, Document {}
4 changes: 2 additions & 2 deletions packages/server/src/interfaces/state.interface.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Types, Document } from 'mongoose';
import type { Document, Types } from 'mongoose';

export enum SheetType {
gsheet = 'gsheet',
Expand All @@ -10,7 +10,7 @@ export enum StateStatus {
completed = 'completed',
canceled = 'canceled',
sync = 'sync',
error = 'error',
failed = 'failed',
}

export enum StateType {
Expand Down
7 changes: 6 additions & 1 deletion packages/server/src/models/session.model.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { ISessionModel, SessionType } from '@interfaces/session.interface';
import {
ClippingStatus,
type ISessionModel,
SessionType,
} from '@interfaces/session.interface';
import { Schema, model } from 'mongoose';

const SessionSchema = new Schema<ISessionModel>(
Expand Down Expand Up @@ -59,6 +63,7 @@ const SessionSchema = new Schema<ISessionModel>(
],
firebaseId: { type: String, default: '' },
talkType: { type: String, default: '' },
clippingStatus: { type: String, enum: Object.keys(ClippingStatus) },
},
{
timestamps: true,
Expand Down

0 comments on commit 11c0363

Please sign in to comment.