Skip to content

Commit

Permalink
testing
Browse files Browse the repository at this point in the history
  • Loading branch information
ShadowLp174 committed Nov 11, 2023
1 parent 556da35 commit 5fa57ec
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 24 deletions.
25 changes: 21 additions & 4 deletions src/Media.js
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ class MediaPlayer extends Media {
this.ffmpegKilled = false;
this.ready = true;
this.volCache = null;
this.preproc = true;

this.volumeTransformer = new prism.VolumeTransformer({ type: "s16le", volume: 1 });
this.volumeTransformer.pipe(this.ffmpeg.stdin);
Expand All @@ -177,6 +178,22 @@ class MediaPlayer extends Media {
setReadNative(bool=true) {
this.readAtNative = bool;
}
ffmpegArgs(port) {
return (this.readAtNative) ? super.ffmpegArgs(port).splice(0, 1) : super.ffmpegArgs(port);
}

/**
* enablePreProcessing
* @description
* Enable/Disable preprocessing of incoming audio. Revoice will most likely break when wrong audio is input if this is set to false.
* The audio has to match the output of the following ffmpeg arguments: `-re -i - -reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 4 -analyzeduration 0 -loglevel 0 -f s16le -ar 48000 -ac 2 -`
*
* @param {boolean} bool=true Enabled/disable pre-processing
* @return {void}
*/
enablePreProcessing(bool=true) {
this.preproc = bool;
}

static timestampToSeconds(timestamp="00:00:00", ceilMinutes=false) {
timestamp = timestamp.split(":").map((el, index) => {
Expand Down Expand Up @@ -406,7 +423,7 @@ class MediaPlayer extends Media {
this.ready = true;
}

const fpcm = require("child_process").spawn(ffmpeg, [
const fpcm = (this.preproc) ? require("child_process").spawn(ffmpeg, [
((this.readAtNative) ? "-re" : ""), "-i", "-",
"-reconnect", "1",
"-reconnect_streamed", "1",
Expand All @@ -417,8 +434,8 @@ class MediaPlayer extends Media {
"-ar", "48000",
"-ac", "2",
"-"
]);
const pcm = fpcm.stdout;
]) : this.originStream;
const pcm = fpcm.stdout || fpcm;
this.fpcm = fpcm;
this.pcm = pcm;
this.ffmpegKilled = false;
Expand All @@ -428,7 +445,7 @@ class MediaPlayer extends Media {
// ffmpeg stuff
this.#setupFmpeg();

this.originStream.pipe(fpcm.stdin); // start playing
if (this.preproc) this.originStream.pipe(fpcm.stdin); // start playing
}
async #ffmpegFinished() {
await this.sleep(1000); // prevent bug with no music after 3rd song
Expand Down
43 changes: 43 additions & 0 deletions src/Revoice.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ class VoiceConnection extends EventEmitter {

// user events
signaling.on("roomfetched", () => {
signaling.users.forEach(u => {
if (u.muted) return;
this.consumeUser(u.id);
})
this.initLeave();
signaling.users.forEach((user) => {
this.voice.users.set(user.id, user);
Expand Down Expand Up @@ -112,13 +116,52 @@ class VoiceConnection extends EventEmitter {
});
});

this.recvTransport = this.device.createRecvTransport({...data.data.recvTransport});
this.recvTransport.on("connect", ({ dtlsParameters }, callback) => {
this.signaling.connectTransport(this.recvTransport.id, dtlsParameters).then(callback);
});
this.signaling.on("UserStartProduce", async (data) => {
console.log("produce");
this.consumeUser(data.data.id);
});
this.recvTransport.on("connectionstatechange", async (state) => {
switch (state) {
case "connecting":
console.log("connecting");
break;
case "connected":
console.log("connected");
break;
case "failed":
console.log("failed");
this.recvTransport.close();
break;
default:
break;
}
});

this.updateState(Revoice.State.IDLE);
this.emit("join");
}
resetUser(user) {
this.emit("userLeave", user);
}

async consumeUser(id) {
console.log("consume", id);
const d = await this.signaling.startConsume("audio", id, true);
console.log(d);
const consumer = await this.recvTransport.consume(d);
console.log(consumer.track.onReceiveRtp.toString());
consumer.track.onReceiveRtp.subscribe(rtp => {
console.log("execute");
}, (...args) => console.log(args, "complete"), (...args) => console.log("error", args));
consumer.track.onReceiveRtp.execute("test");

};


/**
* @description Attach a Media object to this connection
*
Expand Down
83 changes: 63 additions & 20 deletions src/Signaling.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ const EventEmitter = require("events");
const { WebSocket } = require("ws");
const User = require("./User.js");

class Signaling {
class Signaling extends EventEmitter {
constructor(apiClient, channelId, reconnectTimeout=3000) {
super();
this.client = apiClient;
this.channelId = channelId;
this.reconnectTimeout = reconnectTimeout;

this.eventemitter = new EventEmitter();
this.currId = -1;
this.reconnecting = false;

Expand All @@ -17,15 +17,6 @@ class Signaling {

return this;
}
emit(event, cb) {
return this.eventemitter.emit(event, cb);
}
on(event, cb) {
return this.eventemitter.on(event, cb);
}
once(event, cb) {
return this.eventemitter.once(event, cb);
}

authenticate() { // start the authentication and join flow
this.client.post("/channels/" + this.channelId + "/join_call").then(data => {
Expand Down Expand Up @@ -76,15 +67,15 @@ class Signaling {
processWS(data) { // data == parsed websocket message
switch(data.type) {
case "InitializeTransports":
if (!this.reconnecting) this.eventemitter.emit("initTransports", data);
this.fetchRoomInfo().then(() => {
if (!this.reconnecting) this.emit("initTransports", data);
this.fetchRoomInfo().then((d) => {
this.roomEmpty = (this.users.length == 1);
this.emit("roomfetched");
this.emit("roomfetched", d);
});
break;
case "Authenticate":
// continue in signaling process
if (!this.reconnecting) this.eventemitter.emit("authenticate", data);
if (!this.reconnecting) this.emit("authenticate", data);
const request = {
id: ++this.currId,
type: "InitializeTransports",
Expand All @@ -96,13 +87,25 @@ class Signaling {
this.ws.send(JSON.stringify(request));
break;
case "ConnectTransport":
if (!this.reconnecting) this.eventemitter.emit("ConnectTransport", data);
if (!this.reconnecting) this.emit("ConnectTransport", data);
break;
case "StartProduce":
this.eventemitter.emit("StartProduce", data);
this.emit("StartProduce", data);
break;
case "StopProduce":
this.eventemitter.emit("StopProduce", data);
this.emit("StopProduce", data);
break;
case "UserStartProduce":
this.emit("UserStartProduce", data);
break;
case "UserStopProduce":
this.emit("UserStopProduce", data);
break;
case "StartConsume":
this.emit("StartConsume", data);
break;
case "StopConsume":
this.emit("StopConsume", data);
break;
case "UserJoined":
const user = new User(data.data.id, this.client);
Expand All @@ -123,7 +126,7 @@ class Signaling {
this.emit("userleave", removed);
default:
// events like startProduce or UserJoined; will be implemented later
this.eventemitter.emit("data", data);
this.emit("data", data);
// console.log("(yet) Unimplemented case: ", data);
break;
}
Expand Down Expand Up @@ -163,7 +166,7 @@ class Signaling {
user.muted = 1 - users[userId].audio;
this.addUser(user);
}
Promise.all(promises).then(res);
Promise.all(promises).then(() => res(data));
});
});
}
Expand Down Expand Up @@ -191,6 +194,46 @@ class Signaling {
})
});
}
startConsume(type, user, autostop=false) {
return new Promise((res, rej) => {
const request = {
id: ++this.currId,
type: "StartConsume",
data: {
type: type,
userId: user
}
};
console.log(user);
this.ws.send(JSON.stringify(request));
this.on("StartConsume", (data) => {
if (data.id !== request.id) return;
if (autostop) {
this.on("UserStopProduce", (d) => {
if (d.data.id != user) return;
this.stopConsume(data.data.id);
});
}
res(data.data);
});
});
}
stopConsume(id) {
return new Promise((res, rej) => {
const request = {
id: ++this.currId,
type: "StopConsume",
data: {
id: id
}
};
this.ws.send(JSON.stringify(request));
this.on("StopConsume", (data) => {
if (data.id !== request.id) return;
res(data.data);
});
});
}
startProduce(type, params) {
return new Promise((res, rej) => {
const request = {
Expand Down

0 comments on commit 5fa57ec

Please sign in to comment.