Skip to content

Commit

Permalink
restructured code
Browse files Browse the repository at this point in the history
  • Loading branch information
parmesant committed Dec 27, 2024
1 parent cc46126 commit afed517
Show file tree
Hide file tree
Showing 11 changed files with 44 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use datafusion::{
use tracing::trace;

use crate::{
handlers::http::alerts::{AlertState, ALERTS},
query::{TableScanVisitor, QUERY_SESSION},
rbac::{
map::SessionKey,
Expand All @@ -39,7 +38,9 @@ use crate::{
utils::time::TimeRange,
};

use super::{AlertConfig, AlertError, ThresholdConfig};
use super::{
Aggregate, AlertConfig, AlertError, AlertOperator, AlertState, ThresholdConfig, ALERTS,
};

async fn get_tables_from_query(query: &str) -> Result<TableScanVisitor, AlertError> {
let session_state = QUERY_SESSION.state();
Expand Down Expand Up @@ -152,48 +153,22 @@ fn get_exprs(thresholds: &Vec<ThresholdConfig>) -> (Vec<Expr>, Vec<Expr>, Expr)
let mut expr = Expr::Literal(datafusion::scalar::ScalarValue::Boolean(Some(true)));
for threshold in thresholds {
let res = match threshold.operator {
crate::handlers::http::alerts::AlertOperator::GreaterThan => {
col(&threshold.column).gt(lit(threshold.value))
}
crate::handlers::http::alerts::AlertOperator::LessThan => {
col(&threshold.column).lt(lit(threshold.value))
}
crate::handlers::http::alerts::AlertOperator::EqualTo => {
col(&threshold.column).eq(lit(threshold.value))
}
crate::handlers::http::alerts::AlertOperator::NotEqualTo => {
col(&threshold.column).not_eq(lit(threshold.value))
}
crate::handlers::http::alerts::AlertOperator::GreaterThanEqualTo => {
col(&threshold.column).gt_eq(lit(threshold.value))
}
crate::handlers::http::alerts::AlertOperator::LessThanEqualTo => {
col(&threshold.column).lt_eq(lit(threshold.value))
}
crate::handlers::http::alerts::AlertOperator::Like => {
col(&threshold.column).like(lit(threshold.value))
}
crate::handlers::http::alerts::AlertOperator::NotLike => {
col(&threshold.column).not_like(lit(threshold.value))
}
AlertOperator::GreaterThan => col(&threshold.column).gt(lit(threshold.value)),
AlertOperator::LessThan => col(&threshold.column).lt(lit(threshold.value)),
AlertOperator::EqualTo => col(&threshold.column).eq(lit(threshold.value)),
AlertOperator::NotEqualTo => col(&threshold.column).not_eq(lit(threshold.value)),
AlertOperator::GreaterThanEqualTo => col(&threshold.column).gt_eq(lit(threshold.value)),
AlertOperator::LessThanEqualTo => col(&threshold.column).lt_eq(lit(threshold.value)),
AlertOperator::Like => col(&threshold.column).like(lit(threshold.value)),
AlertOperator::NotLike => col(&threshold.column).not_like(lit(threshold.value)),
};

aggr_expr.push(match threshold.agg {
crate::handlers::http::alerts::Aggregate::Avg => {
avg(col(&threshold.column)).alias(&threshold.column)
}
crate::handlers::http::alerts::Aggregate::Count => {
count(col(&threshold.column)).alias(&threshold.column)
}
crate::handlers::http::alerts::Aggregate::Min => {
min(col(&threshold.column)).alias(&threshold.column)
}
crate::handlers::http::alerts::Aggregate::Max => {
max(col(&threshold.column)).alias(&threshold.column)
}
crate::handlers::http::alerts::Aggregate::Sum => {
sum(col(&threshold.column)).alias(&threshold.column)
}
Aggregate::Avg => avg(col(&threshold.column)).alias(&threshold.column),
Aggregate::Count => count(col(&threshold.column)).alias(&threshold.column),
Aggregate::Min => min(col(&threshold.column)).alias(&threshold.column),
Aggregate::Max => max(col(&threshold.column)).alias(&threshold.column),
Aggregate::Sum => sum(col(&threshold.column)).alias(&threshold.column),
});
expr = expr.and(res);
}
Expand Down
26 changes: 3 additions & 23 deletions src/handlers/http/alerts/mod.rs → src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,11 @@ use tracing::{trace, warn};
use ulid::Ulid;

pub mod alerts_utils;
pub mod http_handlers;
pub mod target;

use crate::option::CONFIG;
use crate::rbac::map::SessionKey;
use crate::storage;
use crate::storage::object_storage::alert_json_path;
use crate::storage::ObjectStorageError;
use crate::sync::schedule_alert_task;
use crate::utils::uid;
Expand Down Expand Up @@ -461,14 +459,12 @@ impl Alerts {
trigger_notif: bool,
) -> Result<(), AlertError> {
let store = CONFIG.storage().get_object_store();
// let alert_path = alert_json_path(alert_id);

// // read and modify alert
// let mut alert: AlertConfig = serde_json::from_slice(&store.get_object(&alert_path).await?)?;
// alert.state = new_state;
// read and modify alert
let mut alert = self.get_alert_by_id(alert_id).await?;

alert.state = new_state;

// save to disk
store.put_alert(alert_id, &alert).await?;

Expand All @@ -482,12 +478,6 @@ impl Alerts {
};
drop(writer);

// self.alerts.write().await.iter_mut().for_each(|alert| {
// if alert.id.to_string() == alert_id {
// alert.state = new_state;
// }
// });

if trigger_notif {
alert.trigger_notifications().await?;
}
Expand All @@ -497,17 +487,7 @@ impl Alerts {

/// Remove alert and scheduled task from disk and memory
pub async fn delete(&self, alert_id: &str) -> Result<(), AlertError> {
let store = CONFIG.storage().get_object_store();
let alert_path = alert_json_path(alert_id);

// delete from disk
store
.delete_object(&alert_path)
.await
.map_err(AlertError::ObjectStorage)?;
trace!("Deleted from disk");

// now delete from memory
// delete from memory
let read_access = self.alerts.read().await;

let index = read_access
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use humantime_serde::re::humantime;
use reqwest::ClientBuilder;
use tracing::{error, trace, warn};

use crate::handlers::http::alerts::ALERTS;
use super::ALERTS;

use super::{AlertState, CallableTarget, Context};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use actix_web::{web, HttpRequest, Responder};
use bytes::Bytes;
use relative_path::RelativePathBuf;

use super::{
use crate::alerts::{
alerts_utils::user_auth_for_query, AlertConfig, AlertError, AlertRequest, AlertState, ALERTS,
};

Expand Down Expand Up @@ -95,6 +95,15 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, AlertError> {
// validate that the user has access to the tables mentioned
user_auth_for_query(&session_key, &alert.query).await?;

let store = CONFIG.storage().get_object_store();
let alert_path = alert_json_path(alert_id);

// delete from disk
store
.delete_object(&alert_path)
.await
.map_err(AlertError::ObjectStorage)?;

// delete from disk and memory
ALERTS.delete(alert_id).await?;

Expand All @@ -115,9 +124,11 @@ pub async fn modify(req: HttpRequest, alert: AlertRequest) -> Result<impl Respon
.ok_or(AlertError::Metadata("No alert ID Provided"))?;

// check if alert id exists in map
ALERTS.get_alert_by_id(alert_id).await?;
let old_alert = ALERTS.get_alert_by_id(alert_id).await?;

// validate that the user has access to the tables mentioned
// in the old as well as the modified alert
user_auth_for_query(&session_key, &old_alert.query).await?;
user_auth_for_query(&session_key, &alert.query).await?;

let store = CONFIG.storage().get_object_store();
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*
*/

use crate::alerts::ALERTS;
use crate::handlers::airplane;
use crate::handlers::http::alerts::ALERTS;
use crate::handlers::http::base_path;
use crate::handlers::http::caching_removed;
use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular};
Expand Down
30 changes: 7 additions & 23 deletions src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
*
*/

use crate::alerts::ALERTS;
use crate::analytics;
use crate::handlers;
use crate::handlers::http::about;
use crate::handlers::http::alerts;
use crate::handlers::http::alerts::ALERTS;
use crate::handlers::http::base_path;
use crate::handlers::http::caching_removed;
use crate::handlers::http::health_check;
Expand Down Expand Up @@ -188,39 +188,23 @@ impl Server {
web::scope("/alerts")
.service(
web::resource("")
.route(
web::get()
.to(alerts::http_handlers::list)
.authorize(Action::GetAlert),
)
.route(
web::post()
.to(alerts::http_handlers::post)
.authorize(Action::PutAlert),
),
.route(web::get().to(alerts::list).authorize(Action::GetAlert))
.route(web::post().to(alerts::post).authorize(Action::PutAlert)),
)
.service(
web::resource("/{alert_id}")
.route(
web::get()
.to(alerts::http_handlers::get)
.authorize(Action::GetAlert),
)
.route(
web::put()
.to(alerts::http_handlers::modify)
.authorize(Action::PutAlert),
)
.route(web::get().to(alerts::get).authorize(Action::GetAlert))
.route(web::put().to(alerts::modify).authorize(Action::PutAlert))
.route(
web::delete()
.to(alerts::http_handlers::delete)
.to(alerts::delete)
.authorize(Action::DeleteAlert),
),
)
.service(
web::resource("/{alert_id}/update_state").route(
web::put()
.to(alerts::http_handlers::update_state)
.to(alerts::update_state)
.authorize(Action::PutAlert),
),
)
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/

mod about;
pub mod alerts;
pub mod analytics;
pub mod banner;
mod catalog;
Expand Down
2 changes: 1 addition & 1 deletion src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use super::{
SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
};

use crate::handlers::http::alerts::AlertConfig;
use crate::alerts::AlertConfig;
use crate::handlers::http::modal::ingest_server::INGESTOR_META;
use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR};
use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE};
Expand Down
2 changes: 1 addition & 1 deletion src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use tokio::task;
use tokio::time::{interval, sleep, Duration};
use tracing::{error, info, warn};

use crate::handlers::http::alerts::{alerts_utils, AlertConfig, AlertError};
use crate::alerts::{alerts_utils, AlertConfig, AlertError};
use crate::option::CONFIG;
use crate::{storage, STORAGE_UPLOAD_INTERVAL};

Expand Down

0 comments on commit afed517

Please sign in to comment.