Skip to content

Commit

Permalink
feature: introduce tokio and tokio proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
tyrchen committed Apr 12, 2024
1 parent 098f24b commit 4cc16f9
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 3 deletions.
32 changes: 32 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,17 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
[dev-dependencies]
axum = { version = "0.7.5", features = ["http2", "query", "tracing"] }
base64 = "0.22.0"
blake3 = "1.5.1"
bytes = "1.6.0"
derive_builder = "0.20.0"
derive_more = "0.99.17"
http = "1.1.0"
serde = { version = "1.0.197", features = ["derive"] }
serde_json = "1.0.115"
strum = { version = "0.26.2", features = ["derive"] }
tokio = { version = "1.37.0", features = ["rt", "rt-multi-thread", "macros"] }
tokio = { version = "1.37.0", features = [
"fs",
"rt",
"rt-multi-thread",
"macros",
] }
62 changes: 62 additions & 0 deletions examples/minginx.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// it could be a proxy to a upstream
use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::{
io,
net::{TcpListener, TcpStream},
};
use tracing::{info, level_filters::LevelFilter, warn};
use tracing_subscriber::{fmt::Layer, layer::SubscriberExt, util::SubscriberInitExt, Layer as _};

#[derive(Serialize, Deserialize, Clone)]
struct Config {
upstream_addr: String,
listen_addr: String,
}

#[tokio::main]
async fn main() -> Result<()> {
let layer = Layer::new().with_filter(LevelFilter::INFO);
tracing_subscriber::registry().with(layer).init();
let config = resolve_config();
let config = Arc::new(config);
info!("Upstream is {}", config.upstream_addr);
info!("Listening on {}", config.listen_addr);
let listener = TcpListener::bind(&config.listen_addr).await?;
loop {
let (client, addr) = listener.accept().await?;
info!("Accepted connection from {}", addr);
let cloned_config = config.clone();
tokio::spawn(async move {
let upstream = TcpStream::connect(&cloned_config.upstream_addr).await?;
proxy(client, upstream).await?;
Ok::<(), anyhow::Error>(())
});
}

#[allow(unreachable_code)]
Ok::<(), anyhow::Error>(())
}

async fn proxy(mut client: TcpStream, mut upstream: TcpStream) -> Result<()> {
let (mut client_read, mut client_write) = client.split();
let (mut upstream_read, mut upstream_write) = upstream.split();
let client_to_upstream = io::copy(&mut client_read, &mut upstream_write);
let upstream_to_client = io::copy(&mut upstream_read, &mut client_write);
match tokio::try_join!(client_to_upstream, upstream_to_client) {
Ok((n, m)) => info!(
"proxied {} bytes from client to upstream, {} bytes from upstream to client",
n, m
),
Err(e) => warn!("error proxying: {:?}", e),
}
Ok(())
}

fn resolve_config() -> Config {
Config {
upstream_addr: "0.0.0.0:8080".to_string(),
listen_addr: "0.0.0.0:8081".to_string(),
}
}
36 changes: 36 additions & 0 deletions examples/tokio1.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use std::{thread, time::Duration};

use tokio::{
fs,
runtime::{Builder, Runtime},
time::sleep,
};

fn main() {
let handle = thread::spawn(|| {
let rt = Builder::new_current_thread().enable_all().build().unwrap();

rt.block_on(run(&rt));
});

handle.join().unwrap();
}

fn expensive_blocking_task(s: String) -> String {
thread::sleep(Duration::from_millis(800));
blake3::hash(s.as_bytes()).to_string()
}

async fn run(rt: &Runtime) {
rt.spawn(async {
println!("future 1");
let content = fs::read("Cargo.toml").await.unwrap();
println!("content: {:?}", content.len());
});
rt.spawn(async {
println!("future 2");
let result = expensive_blocking_task("hello".to_string());
println!("result: {}", result);
});
sleep(Duration::from_secs(1)).await;
}
40 changes: 40 additions & 0 deletions examples/tokio2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use std::{thread, time::Duration};
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
// tokio task send string to expensive_blocking_task for execution
let (tx, rx) = mpsc::channel(32);
let handle = worker(rx);

tokio::spawn(async move {
let mut i = 0;
loop {
i += 1;
println!("sending task {}", i);
tx.send(format!("task {i}")).await.unwrap();
}
});

handle.join().unwrap();
}

fn worker(mut rx: mpsc::Receiver<String>) -> thread::JoinHandle<()> {
thread::spawn(move || {
let (sender, receiver) = std::sync::mpsc::channel();
while let Some(s) = rx.blocking_recv() {
let sender_clone = sender.clone();
thread::spawn(move || {
let ret = expensive_blocking_task(s);
sender_clone.send(ret).unwrap();
});
let result = receiver.recv().unwrap();
println!("result: {}", result);
}
})
}

fn expensive_blocking_task(s: String) -> String {
thread::sleep(Duration::from_millis(800));
blake3::hash(s.as_bytes()).to_string()
}
4 changes: 2 additions & 2 deletions test.rest
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ Content-Type: application/json

### invalid request

PATCH http://localhost:8080/
PATCH http://localhost:8081/
Content-Type: application/json

{
"skills": "Rust"
"skills": ["Rust"]
}

0 comments on commit 4cc16f9

Please sign in to comment.