Skip to content

Commit

Permalink
Merge branch 'feat/websocket' of https://github.com/akitaSummer/lagon
Browse files Browse the repository at this point in the history
…into pr/887
  • Loading branch information
QuiiBz committed Jun 4, 2023
2 parents d88197b + 3931419 commit f84b28a
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 97 deletions.
24 changes: 18 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions crates/runtime/tests/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ mod utils;

async fn run_connection<S>(
connection: WebSocketStream<S>,
msg_tx: futures_channel::oneshot::Sender<Vec<Message>>,
msg_tx: tokio::sync::oneshot::Sender<Vec<Message>>,
) where
S: AsyncRead + AsyncWrite + Unpin,
{
Expand All @@ -27,8 +27,8 @@ async fn run_connection<S>(
async fn websocket_test() {
utils::setup();

let (con_tx, con_rx) = futures_channel::oneshot::channel();
let (msg_tx, msg_rx) = futures_channel::oneshot::channel();
let (con_tx, con_rx) = tokio::sync::oneshot::channel();
let (msg_tx, msg_rx) = tokio::sync::oneshot::channel();

let f = async move {
let listener = TcpListener::bind("127.0.0.1:12345").await.unwrap();
Expand Down
26 changes: 11 additions & 15 deletions crates/runtime_isolate/src/bindings/websocket.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
use std::{
collections::BTreeMap,
sync::{Arc, Mutex},
};
use std::{collections::BTreeMap, sync::Arc};

use anyhow::Result;
use lagon_runtime_v8_utils::v8_string;
use lagon_runtime_websocket::{
close_ws, get_ws_event, new_ws, send_ws_event, SendValue, Uuid, Ws, WsId,
};
use lagon_runtime_websocket::{new_ws, SendValue, Uuid, Ws, WsId};
use tokio::sync::Mutex;
use v8::{Local, ObjectTemplate};

use crate::Isolate;
Expand Down Expand Up @@ -88,7 +84,7 @@ pub async fn create_websocket_binding<'a>(

let res = new_ws(url, protocols).await;

let mut table = table.lock().unwrap();
let mut table = table.lock().await;

match res {
Ok((ws, protocols, extensions)) => {
Expand Down Expand Up @@ -121,7 +117,7 @@ pub async fn websocket_event_binding(
id: usize,
ws_id: EventArg,
) -> BindingResult {
let mut table = table.lock().unwrap();
let mut table = table.lock().await;

let uuid = match Uuid::parse_str(&ws_id) {
Ok(uuid) => uuid,
Expand All @@ -136,7 +132,7 @@ pub async fn websocket_event_binding(
let ws = table.get_mut(&uuid);

match ws {
Some(ws) => match get_ws_event(ws).await {
Some(ws) => match ws.get_ws_event().await {
Ok(res) => match res {
lagon_runtime_websocket::EventResponse::String(str) => BindingResult {
id,
Expand All @@ -147,7 +143,7 @@ pub async fn websocket_event_binding(
result: PromiseResult::ArrayBuffer(buf),
},
lagon_runtime_websocket::EventResponse::Close { code, reason } => {
match close_ws(ws, Some(code), Some(reason)).await {
match ws.close_ws(Some(code), Some(reason)).await {
Ok(_) => BindingResult {
id,
result: PromiseResult::String(
Expand Down Expand Up @@ -226,7 +222,7 @@ pub async fn websocket_send_binding(
let ws_id = arg.0;
let value = arg.1;

let mut table = table.lock().unwrap();
let mut table = table.lock().await;

let uuid = match Uuid::parse_str(&ws_id) {
Ok(uuid) => uuid,
Expand All @@ -241,7 +237,7 @@ pub async fn websocket_send_binding(
let ws = table.get_mut(&uuid);

match ws {
Some(ws) => match send_ws_event(ws, value).await {
Some(ws) => match ws.send_ws_event(value).await {
Ok(_) => BindingResult {
id,
result: PromiseResult::Undefined,
Expand Down Expand Up @@ -292,7 +288,7 @@ pub async fn websocket_close_binding(
let code = arg.1;
let reason = arg.2;

let mut table = table.lock().unwrap();
let mut table = table.lock().await;

let uuid = match Uuid::parse_str(&ws_id) {
Ok(uuid) => uuid,
Expand All @@ -305,7 +301,7 @@ pub async fn websocket_close_binding(
};

match table.remove(&uuid) {
Some(ref mut ws) => match close_ws(ws, code, reason).await {
Some(ref mut ws) => match ws.close_ws(code, reason).await {
Ok(_) => BindingResult {
id,
result: PromiseResult::Undefined,
Expand Down
3 changes: 2 additions & 1 deletion crates/runtime_isolate/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ use std::{
collections::{BTreeMap, HashMap},
pin::Pin,
rc::Rc,
sync::{Arc, Mutex, RwLock},
sync::{Arc, RwLock},
task::{Context, Poll},
time::Instant,
};
use tokio::sync::Mutex;
use v8::MapFnTo;

use self::{
Expand Down
144 changes: 72 additions & 72 deletions crates/runtime_websocket/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,25 @@ type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;

pub type WsId = Uuid;

#[derive(Debug)]
pub enum SendValue {
Text(String),
Binary(Vec<u8>),
Pong,
Ping,
}

#[derive(Debug)]
pub enum EventResponse {
String(String),
Binary(Vec<u8>),
Close { code: u16, reason: String },
Ping,
Pong,
Error(String),
Closed,
}

#[derive(Debug)]
pub struct Ws {
id: Uuid,
Expand All @@ -41,7 +60,7 @@ impl Ws {
self.id
}

pub async fn send(&mut self, message: Message) -> Result<()> {
async fn send(&mut self, message: Message) -> Result<()> {
let res = self.tx.send(message).await;

match res {
Expand All @@ -54,16 +73,66 @@ impl Ws {
}
}

pub async fn next_message(
async fn next_message(
&mut self,
) -> Result<Option<Result<Message, tokio_tungstenite::tungstenite::Error>>> {
let res = self.rx.next().await;
Ok(res)
}

pub async fn close(&mut self) {
async fn close(&mut self) {
self.tx.close().await;
}

pub async fn get_ws_event(&mut self) -> Result<EventResponse> {
let val = self.next_message().await?;
let res = match val {
Some(Ok(Message::Text(text))) => EventResponse::String(text),
Some(Ok(Message::Binary(data))) => EventResponse::Binary(data.into()),
Some(Ok(Message::Close(Some(frame)))) => EventResponse::Close {
code: frame.code.into(),
reason: frame.reason.to_string(),
},
Some(Ok(Message::Close(None))) => EventResponse::Close {
code: 1005,
reason: String::new(),
},
Some(Ok(Message::Ping(_))) => EventResponse::Ping,
Some(Ok(Message::Pong(_))) => EventResponse::Pong,
Some(Err(e)) => EventResponse::Error(e.to_string()),
None => EventResponse::Closed,
};

Ok(res)
}

pub async fn send_ws_event(&mut self, value: SendValue) -> Result<()> {
let msg = match value {
SendValue::Text(text) => Message::Text(text),
SendValue::Binary(buf) => Message::Binary(buf.to_vec()),
SendValue::Pong => Message::Pong(vec![]),
SendValue::Ping => Message::Ping(vec![]),
};

self.send(msg).await?;

Ok(())
}

pub async fn close_ws(&mut self, code: Option<u16>, reason: Option<String>) -> Result<()> {
let msg = Message::Close(code.map(|c| CloseFrame {
code: CloseCode::from(c),
reason: match reason {
Some(reason) => Cow::from(reason),
None => Default::default(),
},
}));

self.send(msg).await?;

self.close().await;
Ok(())
}
}

pub fn create_default_root_cert_store() -> RootCertStore {
Expand Down Expand Up @@ -155,72 +224,3 @@ pub async fn new_ws(url: String, protocols: String) -> Result<(Ws, String, Strin

Ok((ws, protocol.into(), extensions))
}

#[derive(Debug)]
pub enum EventResponse {
String(String),
Binary(Vec<u8>),
Close { code: u16, reason: String },
Ping,
Pong,
Error(String),
Closed,
}

pub async fn get_ws_event(ws: &mut Ws) -> Result<EventResponse> {
let val = ws.next_message().await?;
let res = match val {
Some(Ok(Message::Text(text))) => EventResponse::String(text),
Some(Ok(Message::Binary(data))) => EventResponse::Binary(data.into()),
Some(Ok(Message::Close(Some(frame)))) => EventResponse::Close {
code: frame.code.into(),
reason: frame.reason.to_string(),
},
Some(Ok(Message::Close(None))) => EventResponse::Close {
code: 1005,
reason: String::new(),
},
Some(Ok(Message::Ping(_))) => EventResponse::Ping,
Some(Ok(Message::Pong(_))) => EventResponse::Pong,
Some(Err(e)) => EventResponse::Error(e.to_string()),
None => EventResponse::Closed,
};

Ok(res)
}

#[derive(Debug)]
pub enum SendValue {
Text(String),
Binary(Vec<u8>),
Pong,
Ping,
}

pub async fn send_ws_event(ws: &mut Ws, value: SendValue) -> Result<()> {
let msg = match value {
SendValue::Text(text) => Message::Text(text),
SendValue::Binary(buf) => Message::Binary(buf.to_vec()),
SendValue::Pong => Message::Pong(vec![]),
SendValue::Ping => Message::Ping(vec![]),
};

ws.send(msg).await?;

Ok(())
}

pub async fn close_ws(ws: &mut Ws, code: Option<u16>, reason: Option<String>) -> Result<()> {
let msg = Message::Close(code.map(|c| CloseFrame {
code: CloseCode::from(c),
reason: match reason {
Some(reason) => Cow::from(reason),
None => Default::default(),
},
}));

ws.send(msg).await?;

ws.close().await;
Ok(())
}

0 comments on commit f84b28a

Please sign in to comment.