Skip to content

Commit

Permalink
refactor: deepsource and lint
Browse files Browse the repository at this point in the history
  • Loading branch information
parmesant committed Jan 2, 2025
1 parent d575c1f commit 4092f9d
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 108 deletions.
144 changes: 58 additions & 86 deletions src/alerts/alerts_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::{
};

use super::{
Aggregate, AggregateConfig, AlertConfig, AlertError, AlertOperator, AlertState, ALERTS,
Aggregate, AlertConfig, AlertError, AlertOperator, AlertState, Conditions, ALERTS
};

async fn get_tables_from_query(query: &str) -> Result<TableScanVisitor, AlertError> {
Expand Down Expand Up @@ -95,7 +95,7 @@ pub async fn user_auth_for_query(session_key: &SessionKey, query: &str) -> Resul
/// collect the results in the end
///
/// check whether notification needs to be triggered or not
pub async fn evaluate_alert_the_second(alert: &AlertConfig) -> Result<(), AlertError> {
pub async fn evaluate_alert(alert: &AlertConfig) -> Result<(), AlertError> {
trace!("RUNNING EVAL TASK FOR- {alert:?}");

let (start_time, end_time) = match &alert.eval_type {
Expand Down Expand Up @@ -141,58 +141,8 @@ pub async fn evaluate_alert_the_second(alert: &AlertConfig) -> Result<(), AlertE
let mut aggr_expr: Vec<Expr> = vec![];

let filtered_df = if let Some(where_clause) = &agg_config.condition_config {
let filter_expr = match where_clause {
crate::alerts::Conditions::AND((expr1, expr2)) => {
let mut expr =
Expr::Literal(datafusion::scalar::ScalarValue::Boolean(Some(true)));
for e in [expr1, expr2] {
let ex = match e.operator {
AlertOperator::GreaterThan => col(&e.column).gt(lit(&e.value)),
AlertOperator::LessThan => col(&e.column).lt(lit(&e.value)),
AlertOperator::EqualTo => col(&e.column).eq(lit(&e.value)),
AlertOperator::NotEqualTo => col(&e.column).not_eq(lit(&e.value)),
AlertOperator::GreaterThanEqualTo => {
col(&e.column).gt_eq(lit(&e.value))
}
AlertOperator::LessThanEqualTo => col(&e.column).lt_eq(lit(&e.value)),
AlertOperator::Like => col(&e.column).like(lit(&e.value)),
AlertOperator::NotLike => col(&e.column).not_like(lit(&e.value)),
};
expr = expr.and(ex);
}
expr
}
crate::alerts::Conditions::OR((expr1, expr2)) => {
let mut expr =
Expr::Literal(datafusion::scalar::ScalarValue::Boolean(Some(true)));
for e in [expr1, expr2] {
let ex = match e.operator {
AlertOperator::GreaterThan => col(&e.column).gt(lit(&e.value)),
AlertOperator::LessThan => col(&e.column).lt(lit(&e.value)),
AlertOperator::EqualTo => col(&e.column).eq(lit(&e.value)),
AlertOperator::NotEqualTo => col(&e.column).not_eq(lit(&e.value)),
AlertOperator::GreaterThanEqualTo => {
col(&e.column).gt_eq(lit(&e.value))
}
AlertOperator::LessThanEqualTo => col(&e.column).lt_eq(lit(&e.value)),
AlertOperator::Like => col(&e.column).like(lit(&e.value)),
AlertOperator::NotLike => col(&e.column).not_like(lit(&e.value)),
};
expr = expr.or(ex);
}
expr
}
crate::alerts::Conditions::Condition(expr) => match expr.operator {
AlertOperator::GreaterThan => col(&expr.column).gt(lit(&expr.value)),
AlertOperator::LessThan => col(&expr.column).lt(lit(&expr.value)),
AlertOperator::EqualTo => col(&expr.column).eq(lit(&expr.value)),
AlertOperator::NotEqualTo => col(&expr.column).not_eq(lit(&expr.value)),
AlertOperator::GreaterThanEqualTo => col(&expr.column).gt_eq(lit(&expr.value)),
AlertOperator::LessThanEqualTo => col(&expr.column).lt_eq(lit(&expr.value)),
AlertOperator::Like => col(&expr.column).like(lit(&expr.value)),
AlertOperator::NotLike => col(&expr.column).not_like(lit(&expr.value)),
},
};

let filter_expr = get_filter_expr(where_clause);

trace!("filter_expr-\n{filter_expr:?}");

Expand Down Expand Up @@ -266,7 +216,7 @@ pub async fn evaluate_alert_the_second(alert: &AlertConfig) -> Result<(), AlertE
.generate_filter_message(),
)
} else {
Some(String::new())
Some(String::default())
}
} else {
None
Expand All @@ -291,7 +241,7 @@ pub async fn evaluate_alert_the_second(alert: &AlertConfig) -> Result<(), AlertE
if res {
trace!("ALERT!!!!!!");

let mut message = String::new();
let mut message = String::default();
for (_, filter_msg, agg_config, final_value) in agg_results {
if let Some(msg) = filter_msg {
message.extend([format!(
Expand Down Expand Up @@ -445,35 +395,57 @@ pub async fn evaluate_alert_the_second(alert: &AlertConfig) -> Result<(), AlertE
// Ok(())
// }

fn get_exprs(thresholds: &Vec<AggregateConfig>) -> (Vec<Expr>, Vec<Expr>, Expr) {
// for now group by is empty, we can include this later
let group_expr: Vec<Expr> = vec![];

// agg expression
let mut aggr_expr: Vec<Expr> = vec![];

let mut filter_expr = Expr::Literal(datafusion::scalar::ScalarValue::Boolean(Some(true)));
for threshold in thresholds {
let res = match threshold.operator {
AlertOperator::GreaterThan => col(&threshold.column).gt(lit(threshold.value)),
AlertOperator::LessThan => col(&threshold.column).lt(lit(threshold.value)),
AlertOperator::EqualTo => col(&threshold.column).eq(lit(threshold.value)),
AlertOperator::NotEqualTo => col(&threshold.column).not_eq(lit(threshold.value)),
AlertOperator::GreaterThanEqualTo => col(&threshold.column).gt_eq(lit(threshold.value)),
AlertOperator::LessThanEqualTo => col(&threshold.column).lt_eq(lit(threshold.value)),
AlertOperator::Like => col(&threshold.column).like(lit(threshold.value)),
AlertOperator::NotLike => col(&threshold.column).not_like(lit(threshold.value)),
};

aggr_expr.push(match threshold.agg {
Aggregate::Avg => avg(col(&threshold.column)).alias(&threshold.column),
Aggregate::Count => count(col(&threshold.column)).alias(&threshold.column),
Aggregate::Min => min(col(&threshold.column)).alias(&threshold.column),
Aggregate::Max => max(col(&threshold.column)).alias(&threshold.column),
Aggregate::Sum => sum(col(&threshold.column)).alias(&threshold.column),
});
filter_expr = filter_expr.and(res);
fn get_filter_expr(where_clause: &Conditions) -> Expr {
match where_clause {
crate::alerts::Conditions::AND((expr1, expr2)) => {
let mut expr =
Expr::Literal(datafusion::scalar::ScalarValue::Boolean(Some(true)));
for e in [expr1, expr2] {
let ex = match e.operator {
AlertOperator::GreaterThan => col(&e.column).gt(lit(&e.value)),
AlertOperator::LessThan => col(&e.column).lt(lit(&e.value)),
AlertOperator::EqualTo => col(&e.column).eq(lit(&e.value)),
AlertOperator::NotEqualTo => col(&e.column).not_eq(lit(&e.value)),
AlertOperator::GreaterThanEqualTo => {
col(&e.column).gt_eq(lit(&e.value))
}
AlertOperator::LessThanEqualTo => col(&e.column).lt_eq(lit(&e.value)),
AlertOperator::Like => col(&e.column).like(lit(&e.value)),
AlertOperator::NotLike => col(&e.column).not_like(lit(&e.value)),
};
expr = expr.and(ex);
}
expr
}
crate::alerts::Conditions::OR((expr1, expr2)) => {
let mut expr =
Expr::Literal(datafusion::scalar::ScalarValue::Boolean(Some(true)));
for e in [expr1, expr2] {
let ex = match e.operator {
AlertOperator::GreaterThan => col(&e.column).gt(lit(&e.value)),
AlertOperator::LessThan => col(&e.column).lt(lit(&e.value)),
AlertOperator::EqualTo => col(&e.column).eq(lit(&e.value)),
AlertOperator::NotEqualTo => col(&e.column).not_eq(lit(&e.value)),
AlertOperator::GreaterThanEqualTo => {
col(&e.column).gt_eq(lit(&e.value))
}
AlertOperator::LessThanEqualTo => col(&e.column).lt_eq(lit(&e.value)),
AlertOperator::Like => col(&e.column).like(lit(&e.value)),
AlertOperator::NotLike => col(&e.column).not_like(lit(&e.value)),
};
expr = expr.or(ex);
}
expr
}
crate::alerts::Conditions::Condition(expr) => match expr.operator {
AlertOperator::GreaterThan => col(&expr.column).gt(lit(&expr.value)),
AlertOperator::LessThan => col(&expr.column).lt(lit(&expr.value)),
AlertOperator::EqualTo => col(&expr.column).eq(lit(&expr.value)),
AlertOperator::NotEqualTo => col(&expr.column).not_eq(lit(&expr.value)),
AlertOperator::GreaterThanEqualTo => col(&expr.column).gt_eq(lit(&expr.value)),
AlertOperator::LessThanEqualTo => col(&expr.column).lt_eq(lit(&expr.value)),
AlertOperator::Like => col(&expr.column).like(lit(&expr.value)),
AlertOperator::NotLike => col(&expr.column).not_like(lit(&expr.value)),
},
}

(group_expr, aggr_expr, filter_expr)
}
16 changes: 7 additions & 9 deletions src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use actix_web::http::header::ContentType;
use actix_web::web::Json;
use actix_web::{FromRequest, HttpRequest};
use alerts_utils::{evaluate_alert_the_second, user_auth_for_query};
use alerts_utils::{evaluate_alert, user_auth_for_query};
use async_trait::async_trait;
use chrono::Utc;
use http::StatusCode;
Expand Down Expand Up @@ -454,12 +454,10 @@ impl AlertConfig {
));
}

if self.aggregate_config.len() > 1 {
if self.agg_condition.is_none() {
return Err(AlertError::Metadata(
"aggCondition can't be null with multiple aggregateConfigs",
));
}
if self.aggregate_config.len() > 1 && self.agg_condition.is_none() {
return Err(AlertError::Metadata(
"aggCondition can't be null with multiple aggregateConfigs",
));
}

let session_state = QUERY_SESSION.state();
Expand Down Expand Up @@ -550,7 +548,7 @@ impl AlertConfig {
self.severity.clone().to_string(),
),
DeploymentInfo::new(deployment_instance, deployment_id, deployment_mode),
String::new(),
String::default(),
)
}

Expand Down Expand Up @@ -635,7 +633,7 @@ impl Alerts {

// run eval task once for each alert
for alert in this.iter() {
evaluate_alert_the_second(alert).await?;
evaluate_alert(alert).await?;
}

Ok(())
Expand Down
1 change: 0 additions & 1 deletion src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use std::path::PathBuf;
use url::Url;

use crate::{
kafka::SslProtocol,
kafka::SslProtocol,
oidc::{self, OpenidConfig},
option::{validation, Compression, Mode},
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*
*/

use crate::correlation::CORRELATIONS;
use crate::alerts::ALERTS;
use crate::correlation::CORRELATIONS;
use crate::handlers::airplane;
use crate::handlers::http::base_path;
use crate::handlers::http::caching_removed;
Expand Down
7 changes: 0 additions & 7 deletions src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,14 +297,10 @@ impl ObjectStorage for LocalFS {

async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
let ignore_dir = &[

"lost+found",

PARSEABLE_ROOT_DIRECTORY,

USERS_ROOT_DIR,
CORRELATIONS_ROOT_DIRECTORY,
,
ALERTS_ROOT_DIRECTORY,
];
let directories = ReadDirStream::new(fs::read_dir(&self.root).await?);
Expand All @@ -327,12 +323,9 @@ impl ObjectStorage for LocalFS {

async fn list_old_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
let ignore_dir = &[

"lost+found",

PARSEABLE_ROOT_DIRECTORY,
CORRELATIONS_ROOT_DIRECTORY,
,
ALERTS_ROOT_DIRECTORY,
];
let directories = ReadDirStream::new(fs::read_dir(&self.root).await?);
Expand Down
7 changes: 4 additions & 3 deletions src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ use super::{
ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata,
};
use super::{
ALERTS_ROOT_DIRECTORY, CORRELATIONS_ROOT_DIRECTORY, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME,
PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
ALERTS_ROOT_DIRECTORY, CORRELATIONS_ROOT_DIRECTORY, MANIFEST_FILE,
PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME,
STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
};

use crate::correlation::{CorrelationConfig, CorrelationError};
use crate::alerts::AlertConfig;
use crate::correlation::{CorrelationConfig, CorrelationError};
use crate::handlers::http::modal::ingest_server::INGESTOR_META;
use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR};
use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE};
Expand Down
2 changes: 1 addition & 1 deletion src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ pub async fn schedule_alert_task(
scheduler.every((eval_frequency).minutes()).run(move || {
let alert_val = alert.clone();
async move {
match alerts_utils::evaluate_alert_the_second(&alert_val).await {
match alerts_utils::evaluate_alert(&alert_val).await {
Ok(_) => {}
Err(err) => error!("Error while evaluation- {err}"),
}
Expand Down

0 comments on commit 4092f9d

Please sign in to comment.