diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index b8d6329..ee6f562 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -52,6 +52,9 @@ jobs: name: Release needs: [build-api, build-agent] runs-on: ubuntu-latest + strategy: + matrix: + os: [ubuntu-20.04, ubuntu-22.04] steps: - name: Checkout uses: actions/checkout@v1 @@ -76,18 +79,18 @@ jobs: - name: Rename binaries with architecture run: | - mv target/release/api target/release/api-${{ env.ARCH }} - mv target/release/agent target/release/agent-${{ env.ARCH }} + mv target/release/api target/release/api-${{ env.ARCH }}-${{ matrix.os }} + mv target/release/agent target/release/agent-${{ env.ARCH }}-${{ matrix.os }} - name: Release uses: softprops/action-gh-release@v1 if: startsWith(github.ref, 'refs/tags/') with: files: | - target/release/api-${{ env.ARCH }} - target/release/agent-${{ env.ARCH }} + target/release/api-${{ env.ARCH }}-${{ matrix.os }} + target/release/agent-${{ env.ARCH }}-${{ matrix.os }} deploy/install - docs/ + docs/* README.md config-agent-example.toml config-api-example.toml diff --git a/src/bin/agent.rs b/src/bin/agent.rs index aea50bd..63fd979 100644 --- a/src/bin/agent.rs +++ b/src/bin/agent.rs @@ -171,8 +171,13 @@ async fn main() -> Result<(), Box> { let _ = { let settings = settings.clone(); debug!("----->>>>> Register node"); - if let Err(e) = - agent::register_node(state.clone(), settings.clone(), settings.node.env.clone()).await + if let Err(e) = agent::register_node( + state.clone(), + settings.api.endpoint, + settings.node.env.clone(), + settings.api.token, + ) + .await { panic!("Cannot register node {:?}", e); } @@ -219,6 +224,8 @@ async fn main() -> Result<(), Box> { let block_trial_users_by_limit_handle = tokio::spawn({ let state = state.clone(); let clients = xray_api_clients.clone(); + let endpoint = settings.api.endpoint.clone(); + let api_token = settings.api.token.clone(); async move { loop { sleep(Duration::from_secs(settings.app.trial_jobs_timeout)).await; @@ -227,6 +234,8 @@ async fn main() -> Result<(), Box> { clients.clone(), env.clone(), node_uuid.clone(), + endpoint.clone(), + api_token.clone(), ) .await; } diff --git a/src/bin/api.rs b/src/bin/api.rs index 913d0b0..e374ad5 100644 --- a/src/bin/api.rs +++ b/src/bin/api.rs @@ -151,6 +151,7 @@ async fn main() -> Result<(), Box> { publisher.clone(), settings.api.address.unwrap_or(Ipv4Addr::new(127, 0, 0, 1)), settings.api.port, + settings.api.token, )); tokio::signal::ctrl_c() .await diff --git a/src/config/settings.rs b/src/config/settings.rs index 6a9e230..9dd3e09 100644 --- a/src/config/settings.rs +++ b/src/config/settings.rs @@ -100,6 +100,10 @@ fn default_api_web_port() -> u16 { 3005 } +fn default_api_token() -> String { + "supetsecrettoken".to_string() +} + fn default_node_health_check_timeout() -> i16 { 60 } @@ -114,6 +118,8 @@ pub struct ApiConfig { pub endpoint: String, #[serde(default = "default_node_health_check_timeout")] pub node_health_check_timeout: i16, + #[serde(default = "default_api_token")] + pub token: String, } #[derive(Clone, Debug, Deserialize, Default)] diff --git a/src/http/api.rs b/src/http/api.rs index 19920c7..07f45f3 100644 --- a/src/http/api.rs +++ b/src/http/api.rs @@ -1,9 +1,11 @@ +use log::debug; use std::{net::Ipv4Addr, sync::Arc}; use tokio::sync::Mutex; use tokio_postgres::Client; use warp::Filter; use zmq::Socket; +use super::handlers::AuthError; use super::handlers::{self, NodesQueryParams, UserQueryParams}; use crate::state::{node::NodeRequest, state::State}; @@ -13,34 +15,51 @@ pub async fn run_api_server( publisher: Arc>, listen: Ipv4Addr, port: u16, + token: String, ) { + let auth = warp::header::("authorization").and_then(move |auth_header: String| { + let expected_token = format!("Bearer {}", token.clone()); + debug!("Received Token: {}", auth_header); + if auth_header == expected_token { + futures::future::ok(()) + } else { + futures::future::err(warp::reject::custom(AuthError("Unauthorized".to_string()))) + } + }); + let user_route = warp::post() .and(warp::path("user")) + .and(auth.clone()) .and(warp::body::json()) .and(with_publisher(publisher.clone())) .and(with_state(state.clone())) - .and_then(|user_req, publisher, state| handlers::user_request(user_req, publisher, state)); + .and_then(|_auth, user_req, publisher, state| { + handlers::user_request(user_req, publisher, state) + }); let nodes_route = warp::get() .and(warp::path("nodes")) + .and(auth.clone()) .and(warp::query::()) .and(with_state(state.clone())) - .and_then(handlers::get_nodes); + .and_then(|_auth, node_req, state| handlers::get_nodes(node_req, state)); let conn_route = warp::get() .and(warp::path("conn")) + .and(auth.clone()) .and(warp::query::()) .and(with_state(state.clone())) - .and_then(handlers::get_conn); + .and_then(|_auth, user_req, state| handlers::get_conn(user_req, state)); let nodes_register_route = warp::post() .and(warp::path("node")) .and(warp::path("register")) + .and(auth) .and(warp::body::json::()) .and(with_state(state)) .and(with_pg_client(client)) .and(with_publisher(publisher)) - .and_then(|node_req, state, client, publisher| { + .and_then(|_auth, node_req, state, client, publisher| { handlers::node_register(node_req, state, client, publisher) }); diff --git a/src/http/handlers.rs b/src/http/handlers.rs index f263091..77983d2 100644 --- a/src/http/handlers.rs +++ b/src/http/handlers.rs @@ -18,6 +18,7 @@ use zmq::Socket; use crate::config::xray::Inbound; use crate::state::{ node::Node, + node::NodeStatus, node::{NodeRequest, NodeResponse}, state::State, tag::Tag, @@ -29,9 +30,12 @@ pub type UserRequest = Message; #[derive(Debug)] struct JsonError(String); - impl reject::Reject for JsonError {} +#[derive(Debug)] +pub struct AuthError(pub String); +impl warp::reject::Reject for AuthError {} + #[derive(Debug)] struct MethodError; impl reject::Reject for MethodError {} @@ -49,13 +53,26 @@ pub async fn user_request( ) -> Result { match publisher::send_message(publisher, &user_req.env, user_req.clone()).await { Ok(_) => { + let trial = match user_req.trial { + Some(trial) => trial, + None => { + return Err(warp::reject::custom(JsonError( + "Missing 'trial' value".to_string(), + ))) + } + }; + + let limit = match user_req.limit { + Some(limit) => limit, + None => { + return Err(warp::reject::custom(JsonError( + "Missing 'limit' value".to_string(), + ))) + } + }; + let mut state = state.lock().await; - let user = User::new( - user_req.trial.expect("REASON"), - user_req.limit.expect("REASON"), - user_req.env, - user_req.password, - ); + let user = User::new(trial, limit, user_req.env, user_req.password); let _ = state.add_user(user_req.user_id, user).await; let response = ResponseMessage { @@ -240,6 +257,7 @@ pub async fn get_conn( if let Some(nodes) = state.get_nodes(env) { let connections: Vec = nodes .iter() + .filter(|node| node.status == NodeStatus::Online) .flat_map(|node| { debug!("TAGS {:?}", node.inbounds.keys()); node.inbounds.iter().filter_map(|(tag, inbound)| match tag { @@ -271,6 +289,14 @@ pub async fn rejection(reject: Rejection) -> Result { error_response, StatusCode::METHOD_NOT_ALLOWED, )) + } else if let Some(_) = reject.find::() { + let error_response = warp::reply::json(&serde_json::json!({ + "error": "UNAUTHORIZED" + })); + Ok(warp::reply::with_status( + error_response, + StatusCode::UNAUTHORIZED, + )) } else if let Some(err) = reject.find::() { let error_response = warp::reply::json(&serde_json::json!({ "error": err.0 diff --git a/src/jobs/agent.rs b/src/jobs/agent.rs index 37724d5..ca7e385 100644 --- a/src/jobs/agent.rs +++ b/src/jobs/agent.rs @@ -2,6 +2,7 @@ use chrono::{Duration, Utc}; use log::{debug, error}; use reqwest::Url; use reqwest::{Client, StatusCode}; + use std::{error::Error, sync::Arc}; use tokio::sync::Mutex; use uuid::Uuid; @@ -14,6 +15,8 @@ use crate::state::{ stats::StatType, user::{User, UserStatus}, }; +use crate::zmq::message::Action; +use crate::zmq::message::Message; use crate::xray_op::{ actions::create_users, actions::remove_user, client::XrayClients, stats, stats::Prefix, vless, @@ -161,15 +164,60 @@ pub async fn send_metrics_job( Ok(()) } +async fn block_user_req( + user_id: Uuid, + env: String, + endpoint: String, + token: String, +) -> Result<(), Box> { + let msg = Message { + user_id: user_id, + action: Action::Delete, + env: env, + trial: None, + limit: None, + password: None, + }; + + let mut endpoint = Url::parse(&endpoint)?; + endpoint + .path_segments_mut() + .map_err(|_| "Invalid API endpoint")? + .push("user"); + let endpoint = endpoint.to_string(); + + debug!("ENDPOINT: {}", endpoint); + + match serde_json::to_string_pretty(&msg) { + Ok(json) => debug!("Serialized Message for user '{}': {}", user_id, json), + Err(e) => error!("Error serializing Message for user_id '{}': {}", user_id, e), + } + + let res = Client::new() + .post(&endpoint) + .header("Content-Type", "application/json") + .header("Authorization", format!("Bearer {}", token)) + .json(&msg) + .send() + .await?; + + if res.status().is_success() || res.status() == StatusCode::NOT_MODIFIED { + return Ok(()); + } else { + return Err(format!("Req error: {} {:?}", res.status(), res).into()); + } +} + pub async fn register_node( state: Arc>, - settings: AgentSettings, + endpoint: String, env: String, + token: String, ) -> Result<(), Box> { let node_state = state.lock().await; let node = node_state.nodes.get(&env).clone(); - let mut endpoint = Url::parse(&settings.api.endpoint)?; + let mut endpoint = Url::parse(&endpoint)?; endpoint .path_segments_mut() .map_err(|_| "Invalid API endpoint")? @@ -189,6 +237,7 @@ pub async fn register_node( let res = Client::new() .post(&endpoint) .header("Content-Type", "application/json") + .header("Authorization", format!("Bearer {}", token)) .json(&node) .send() .await?; @@ -320,6 +369,8 @@ pub async fn block_trial_users_by_limit( clients: XrayClients, env: String, node_id: Uuid, + endpoint: String, + token: String, ) { let trial_users = { let state_guard = state.lock().await; @@ -329,8 +380,11 @@ pub async fn block_trial_users_by_limit( for (user_id, user) in trial_users { let state = state.clone(); let user_id = user_id.clone(); + let user = user.clone(); let clients = clients.clone(); let env = env.clone(); + let endpoint = endpoint.clone(); + let token = token.clone(); tokio::spawn(async move { let user_exceeds_limit = { @@ -370,6 +424,8 @@ pub async fn block_trial_users_by_limit( { error!("Failed to block user {}: {:?}", user_id, e); } else { + let _ = + block_user_req(user_id.clone(), user.env.clone(), endpoint, token).await; debug!("Successfully blocked user: {}", user_id); } diff --git a/src/state/node.rs b/src/state/node.rs index c84a21d..855b4c3 100644 --- a/src/state/node.rs +++ b/src/state/node.rs @@ -22,6 +22,17 @@ pub enum NodeStatus { Unknown, } +impl PartialEq for NodeStatus { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (NodeStatus::Online, NodeStatus::Online) => true, + (NodeStatus::Offline, NodeStatus::Offline) => true, + (NodeStatus::Unknown, NodeStatus::Unknown) => true, + _ => false, + } + } +} + impl fmt::Display for NodeStatus { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self {