Skip to content

Commit

Permalink
♻️ video queue refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Eric-Vondee committed Aug 8, 2024
1 parent 0bc8e6f commit b1c75ad
Showing 1 changed file with 20 additions and 44 deletions.
64 changes: 20 additions & 44 deletions packages/video-uploader/src/video-queue.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,21 @@
import amqp from 'amqplib';
import { downloadM3U8ToMP4 } from './utils/ffmpeg';
import { uploadToYouTube } from './utils/youtube';
import fs from 'fs';
import { logger } from './utils/logger';
import { config } from 'dotenv';
import { MongoClient, ObjectId } from 'mongodb';
import { uploadToTwitter } from './utils/twitter';
import { config } from "dotenv";
import fs from "fs";
import { MongoClient, ObjectId } from "mongodb";
import { downloadM3U8ToMP4 } from "./utils/ffmpeg";
import { logger } from "./utils/logger";
import connection from "./utils/mq";
import { uploadToYouTube } from "./utils/youtube";

config();

const client = new MongoClient(process.env.DB_HOST);
const db = client.db(process.env.DB_NAME);
const sessions = db.collection('sessions');
const sessions = db.collection("sessions");

async function videoUploader() {
try {
const queue = 'videos';
const conn = await amqp.connect({
protocol: 'amqp',
hostname: process.env.MQ_HOST,
port: Number(process.env.MQ_PORT),
username: process.env.MQ_USERNAME,
password: process.env.MQ_SECRET,
vhost: '/',
});
const channel = await conn.createChannel();
conn.on('error', (e) => {
logger.error('RabbitMQ connection error:', e);
});
const queue = "videos";
const channel = await (await connection).createChannel();
channel.assertQueue(queue, {
durable: true,
});
Expand All @@ -38,42 +27,29 @@ async function videoUploader() {
await downloadM3U8ToMP4(
data.session.videoUrl,
data.session.slug,
'./tmp'
"./tmp",
);
await uploadToYouTube(
data.session,
`./tmp/${data.session.slug}.mp4`,
data.token,
);
switch (data.type) {
case 'youtube':
await uploadToYouTube(
data.session,
`./tmp/${data.session.slug}.mp4`,
data.token.secret
);
break;
case 'twitter':
await uploadToTwitter(
data.session,
`./tmp/${data.session.slug}.mp4`,
data.token
);
break;
default:
break;
}
fs.unlinkSync(`./tmp/${data.session.slug}.mp4`);
await sessions.findOneAndUpdate(
{ _id: ObjectId.createFromHexString(data.sessionId) },
{
$addToSet: {
socials: { name: data.type, date: new Date().getTime() },
},
}
},
);
},
{
noAck: true,
}
},
);
} catch (e) {
logger.error('error', e);
logger.error("error", e);
}
}

Expand Down

0 comments on commit b1c75ad

Please sign in to comment.