Skip to content

Commit

Permalink
Api Auth
Browse files Browse the repository at this point in the history
  • Loading branch information
the2pizza committed Jan 16, 2025
1 parent d06c913 commit b302f4f
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 20 deletions.
13 changes: 8 additions & 5 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ jobs:
name: Release
needs: [build-api, build-agent]
runs-on: ubuntu-latest
strategy:
matrix:
os: [ubuntu-20.04, ubuntu-22.04]
steps:
- name: Checkout
uses: actions/checkout@v1
Expand All @@ -76,18 +79,18 @@ jobs:

- name: Rename binaries with architecture
run: |
mv target/release/api target/release/api-${{ env.ARCH }}
mv target/release/agent target/release/agent-${{ env.ARCH }}
mv target/release/api target/release/api-${{ env.ARCH }}-${{ matrix.os }}
mv target/release/agent target/release/agent-${{ env.ARCH }}-${{ matrix.os }}
- name: Release
uses: softprops/action-gh-release@v1
if: startsWith(github.ref, 'refs/tags/')
with:
files: |
target/release/api-${{ env.ARCH }}
target/release/agent-${{ env.ARCH }}
target/release/api-${{ env.ARCH }}-${{ matrix.os }}
target/release/agent-${{ env.ARCH }}-${{ matrix.os }}
deploy/install
docs/
docs/*
README.md
config-agent-example.toml
config-api-example.toml
Expand Down
13 changes: 11 additions & 2 deletions src/bin/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let _ = {
let settings = settings.clone();
debug!("----->>>>> Register node");
if let Err(e) =
agent::register_node(state.clone(), settings.clone(), settings.node.env.clone()).await
if let Err(e) = agent::register_node(
state.clone(),
settings.api.endpoint,
settings.node.env.clone(),
settings.api.token,
)
.await
{
panic!("Cannot register node {:?}", e);
}
Expand Down Expand Up @@ -219,6 +224,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let block_trial_users_by_limit_handle = tokio::spawn({
let state = state.clone();
let clients = xray_api_clients.clone();
let endpoint = settings.api.endpoint.clone();
let api_token = settings.api.token.clone();
async move {
loop {
sleep(Duration::from_secs(settings.app.trial_jobs_timeout)).await;
Expand All @@ -227,6 +234,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
clients.clone(),
env.clone(),
node_uuid.clone(),
endpoint.clone(),
api_token.clone(),
)
.await;
}
Expand Down
1 change: 1 addition & 0 deletions src/bin/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
publisher.clone(),
settings.api.address.unwrap_or(Ipv4Addr::new(127, 0, 0, 1)),
settings.api.port,
settings.api.token,
));
tokio::signal::ctrl_c()
.await
Expand Down
6 changes: 6 additions & 0 deletions src/config/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ fn default_api_web_port() -> u16 {
3005
}

fn default_api_token() -> String {
"supetsecrettoken".to_string()
}

fn default_node_health_check_timeout() -> i16 {
60
}
Expand All @@ -114,6 +118,8 @@ pub struct ApiConfig {
pub endpoint: String,
#[serde(default = "default_node_health_check_timeout")]
pub node_health_check_timeout: i16,
#[serde(default = "default_api_token")]
pub token: String,
}

#[derive(Clone, Debug, Deserialize, Default)]
Expand Down
27 changes: 23 additions & 4 deletions src/http/api.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use log::debug;
use std::{net::Ipv4Addr, sync::Arc};
use tokio::sync::Mutex;
use tokio_postgres::Client;
use warp::Filter;
use zmq::Socket;

use super::handlers::AuthError;
use super::handlers::{self, NodesQueryParams, UserQueryParams};
use crate::state::{node::NodeRequest, state::State};

Expand All @@ -13,34 +15,51 @@ pub async fn run_api_server(
publisher: Arc<Mutex<Socket>>,
listen: Ipv4Addr,
port: u16,
token: String,
) {
let auth = warp::header::<String>("authorization").and_then(move |auth_header: String| {
let expected_token = format!("Bearer {}", token.clone());
debug!("Received Token: {}", auth_header);
if auth_header == expected_token {
futures::future::ok(())
} else {
futures::future::err(warp::reject::custom(AuthError("Unauthorized".to_string())))
}
});

let user_route = warp::post()
.and(warp::path("user"))
.and(auth.clone())
.and(warp::body::json())
.and(with_publisher(publisher.clone()))
.and(with_state(state.clone()))
.and_then(|user_req, publisher, state| handlers::user_request(user_req, publisher, state));
.and_then(|_auth, user_req, publisher, state| {
handlers::user_request(user_req, publisher, state)
});

let nodes_route = warp::get()
.and(warp::path("nodes"))
.and(auth.clone())
.and(warp::query::<NodesQueryParams>())
.and(with_state(state.clone()))
.and_then(handlers::get_nodes);
.and_then(|_auth, node_req, state| handlers::get_nodes(node_req, state));

let conn_route = warp::get()
.and(warp::path("conn"))
.and(auth.clone())
.and(warp::query::<UserQueryParams>())
.and(with_state(state.clone()))
.and_then(handlers::get_conn);
.and_then(|_auth, user_req, state| handlers::get_conn(user_req, state));

let nodes_register_route = warp::post()
.and(warp::path("node"))
.and(warp::path("register"))
.and(auth)
.and(warp::body::json::<NodeRequest>())
.and(with_state(state))
.and(with_pg_client(client))
.and(with_publisher(publisher))
.and_then(|node_req, state, client, publisher| {
.and_then(|_auth, node_req, state, client, publisher| {
handlers::node_register(node_req, state, client, publisher)
});

Expand Down
40 changes: 33 additions & 7 deletions src/http/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use zmq::Socket;
use crate::config::xray::Inbound;
use crate::state::{
node::Node,
node::NodeStatus,
node::{NodeRequest, NodeResponse},
state::State,
tag::Tag,
Expand All @@ -29,9 +30,12 @@ pub type UserRequest = Message;

#[derive(Debug)]
struct JsonError(String);

impl reject::Reject for JsonError {}

#[derive(Debug)]
pub struct AuthError(pub String);
impl warp::reject::Reject for AuthError {}

#[derive(Debug)]
struct MethodError;
impl reject::Reject for MethodError {}
Expand All @@ -49,13 +53,26 @@ pub async fn user_request(
) -> Result<impl warp::Reply, warp::Rejection> {
match publisher::send_message(publisher, &user_req.env, user_req.clone()).await {
Ok(_) => {
let trial = match user_req.trial {
Some(trial) => trial,
None => {
return Err(warp::reject::custom(JsonError(
"Missing 'trial' value".to_string(),
)))
}
};

let limit = match user_req.limit {
Some(limit) => limit,
None => {
return Err(warp::reject::custom(JsonError(
"Missing 'limit' value".to_string(),
)))
}
};

let mut state = state.lock().await;
let user = User::new(
user_req.trial.expect("REASON"),
user_req.limit.expect("REASON"),
user_req.env,
user_req.password,
);
let user = User::new(trial, limit, user_req.env, user_req.password);
let _ = state.add_user(user_req.user_id, user).await;

let response = ResponseMessage {
Expand Down Expand Up @@ -240,6 +257,7 @@ pub async fn get_conn(
if let Some(nodes) = state.get_nodes(env) {
let connections: Vec<String> = nodes
.iter()
.filter(|node| node.status == NodeStatus::Online)
.flat_map(|node| {
debug!("TAGS {:?}", node.inbounds.keys());
node.inbounds.iter().filter_map(|(tag, inbound)| match tag {
Expand Down Expand Up @@ -271,6 +289,14 @@ pub async fn rejection(reject: Rejection) -> Result<impl Reply, Rejection> {
error_response,
StatusCode::METHOD_NOT_ALLOWED,
))
} else if let Some(_) = reject.find::<AuthError>() {
let error_response = warp::reply::json(&serde_json::json!({
"error": "UNAUTHORIZED"
}));
Ok(warp::reply::with_status(
error_response,
StatusCode::UNAUTHORIZED,
))
} else if let Some(err) = reject.find::<JsonError>() {
let error_response = warp::reply::json(&serde_json::json!({
"error": err.0
Expand Down
60 changes: 58 additions & 2 deletions src/jobs/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use chrono::{Duration, Utc};
use log::{debug, error};
use reqwest::Url;
use reqwest::{Client, StatusCode};

use std::{error::Error, sync::Arc};
use tokio::sync::Mutex;
use uuid::Uuid;
Expand All @@ -14,6 +15,8 @@ use crate::state::{
stats::StatType,
user::{User, UserStatus},
};
use crate::zmq::message::Action;
use crate::zmq::message::Message;

use crate::xray_op::{
actions::create_users, actions::remove_user, client::XrayClients, stats, stats::Prefix, vless,
Expand Down Expand Up @@ -161,15 +164,60 @@ pub async fn send_metrics_job<T>(
Ok(())
}

async fn block_user_req(
user_id: Uuid,
env: String,
endpoint: String,
token: String,
) -> Result<(), Box<dyn Error>> {
let msg = Message {
user_id: user_id,
action: Action::Delete,
env: env,
trial: None,
limit: None,
password: None,
};

let mut endpoint = Url::parse(&endpoint)?;
endpoint
.path_segments_mut()
.map_err(|_| "Invalid API endpoint")?
.push("user");
let endpoint = endpoint.to_string();

debug!("ENDPOINT: {}", endpoint);

match serde_json::to_string_pretty(&msg) {
Ok(json) => debug!("Serialized Message for user '{}': {}", user_id, json),
Err(e) => error!("Error serializing Message for user_id '{}': {}", user_id, e),
}

let res = Client::new()
.post(&endpoint)
.header("Content-Type", "application/json")
.header("Authorization", format!("Bearer {}", token))
.json(&msg)
.send()
.await?;

if res.status().is_success() || res.status() == StatusCode::NOT_MODIFIED {
return Ok(());
} else {
return Err(format!("Req error: {} {:?}", res.status(), res).into());
}
}

pub async fn register_node(
state: Arc<Mutex<State>>,
settings: AgentSettings,
endpoint: String,
env: String,
token: String,
) -> Result<(), Box<dyn Error>> {
let node_state = state.lock().await;
let node = node_state.nodes.get(&env).clone();

let mut endpoint = Url::parse(&settings.api.endpoint)?;
let mut endpoint = Url::parse(&endpoint)?;
endpoint
.path_segments_mut()
.map_err(|_| "Invalid API endpoint")?
Expand All @@ -189,6 +237,7 @@ pub async fn register_node(
let res = Client::new()
.post(&endpoint)
.header("Content-Type", "application/json")
.header("Authorization", format!("Bearer {}", token))
.json(&node)
.send()
.await?;
Expand Down Expand Up @@ -320,6 +369,8 @@ pub async fn block_trial_users_by_limit(
clients: XrayClients,
env: String,
node_id: Uuid,
endpoint: String,
token: String,
) {
let trial_users = {
let state_guard = state.lock().await;
Expand All @@ -329,8 +380,11 @@ pub async fn block_trial_users_by_limit(
for (user_id, user) in trial_users {
let state = state.clone();
let user_id = user_id.clone();
let user = user.clone();
let clients = clients.clone();
let env = env.clone();
let endpoint = endpoint.clone();
let token = token.clone();

tokio::spawn(async move {
let user_exceeds_limit = {
Expand Down Expand Up @@ -370,6 +424,8 @@ pub async fn block_trial_users_by_limit(
{
error!("Failed to block user {}: {:?}", user_id, e);
} else {
let _ =
block_user_req(user_id.clone(), user.env.clone(), endpoint, token).await;
debug!("Successfully blocked user: {}", user_id);
}

Expand Down
11 changes: 11 additions & 0 deletions src/state/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@ pub enum NodeStatus {
Unknown,
}

impl PartialEq for NodeStatus {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(NodeStatus::Online, NodeStatus::Online) => true,
(NodeStatus::Offline, NodeStatus::Offline) => true,
(NodeStatus::Unknown, NodeStatus::Unknown) => true,
_ => false,
}
}
}

impl fmt::Display for NodeStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Expand Down

0 comments on commit b302f4f

Please sign in to comment.