Skip to content

Commit

Permalink
[Mobile config] Add v2 endpoints to GatewayInfoV2. Change `min_refres…
Browse files Browse the repository at this point in the history
…hed_at` to `min_updated_at`

* Implement min_updated_at

* Fix updated_at filtering. Add  gateway_stream_info_v2_updated_at test

* Remove refreshed_at

* Add info_v2 and info_batch_v2
  • Loading branch information
kurotych authored Dec 17, 2024
1 parent f8e16d8 commit 6a8bb9b
Show file tree
Hide file tree
Showing 6 changed files with 367 additions and 65 deletions.
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

59 changes: 30 additions & 29 deletions mobile_config/src/gateway_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ pub struct GatewayInfo {
pub metadata: Option<GatewayMetadata>,
pub device_type: DeviceType,
// None for V1
pub refreshed_at: Option<DateTime<Utc>>,
pub created_at: Option<DateTime<Utc>>,
}

Expand Down Expand Up @@ -135,7 +134,6 @@ impl TryFrom<GatewayInfoProtoV2> for GatewayInfo {
metadata,
device_type: _,
created_at,
refreshed_at,
} = info;

let metadata = if let Some(metadata) = metadata {
Expand All @@ -154,16 +152,11 @@ impl TryFrom<GatewayInfoProtoV2> for GatewayInfo {
.single()
.ok_or(GatewayInfoProtoParseError::InvalidCreatedAt(created_at))?;

let refreshed_at = Utc.timestamp_opt(refreshed_at as i64, 0).single().ok_or(
GatewayInfoProtoParseError::InvalidRefreshedAt(info.refreshed_at),
)?;

Ok(Self {
address: address.into(),
metadata,
device_type: device_type_,
created_at: Some(created_at),
refreshed_at: Some(refreshed_at),
})
}
}
Expand Down Expand Up @@ -196,7 +189,6 @@ impl TryFrom<GatewayInfoProto> for GatewayInfo {
metadata,
device_type: device_type_,
created_at: None,
refreshed_at: None,
})
}
}
Expand Down Expand Up @@ -297,10 +289,6 @@ impl TryFrom<GatewayInfo> for GatewayInfoProtoV2 {
.created_at
.ok_or(GatewayInfoToProtoError::CreatedAtIsNone)?
.timestamp() as u64,
refreshed_at: info
.refreshed_at
.ok_or(GatewayInfoToProtoError::RefreshedAtIsNone)?
.timestamp() as u64,
})
}
}
Expand Down Expand Up @@ -357,10 +345,13 @@ pub(crate) mod db {
use super::{DeviceType, GatewayInfo, GatewayMetadata};
use crate::gateway_info::DeploymentInfo;
use chrono::{DateTime, Utc};
use futures::stream::{Stream, StreamExt};
use futures::{
stream::{Stream, StreamExt},
TryStreamExt,
};
use helium_crypto::PublicKeyBinary;
use sqlx::{types::Json, PgExecutor, Row};
use std::str::FromStr;
use std::{collections::HashSet, str::FromStr};

const GET_METADATA_SQL: &str = r#"
select kta.entity_key, infos.location::bigint, infos.device_type,
Expand All @@ -369,15 +360,34 @@ pub(crate) mod db {
join key_to_assets kta on infos.asset = kta.asset
"#;
const BATCH_SQL_WHERE_SNIPPET: &str = " where kta.entity_key = any($1::bytea[]) ";
const DEVICE_TYPES_AND_SNIPPET: &str = " and device_type::text = any($2) ";
const DEVICE_TYPES_WHERE_SNIPPET: &str = " where device_type::text = any($1) ";

const GET_UPDATED_RADIOS: &str =
"SELECT entity_key FROM mobile_radio_tracker WHERE last_changed_at >= $1";

lazy_static::lazy_static! {
static ref BATCH_METADATA_SQL: String = format!("{GET_METADATA_SQL} {BATCH_SQL_WHERE_SNIPPET}");
static ref GET_METADATA_SQL_REFRESHED_AT: String = format!(r#"{GET_METADATA_SQL}
where ( infos.refreshed_at >= $1 OR (infos.refreshed_at IS NULL AND infos.created_at >= $1) ) "#);

static ref DEVICE_TYPES_METADATA_SQL: String = format!("{} {}", *GET_METADATA_SQL_REFRESHED_AT, DEVICE_TYPES_AND_SNIPPET);
static ref DEVICE_TYPES_METADATA_SQL: String = format!("{GET_METADATA_SQL} {DEVICE_TYPES_WHERE_SNIPPET}");
}

pub async fn get_updated_radios(
db: impl PgExecutor<'_>,
min_updated_at: DateTime<Utc>,
) -> anyhow::Result<HashSet<PublicKeyBinary>> {
sqlx::query(GET_UPDATED_RADIOS)
.bind(min_updated_at)
.fetch(db)
.map_err(anyhow::Error::from)
.try_fold(
HashSet::new(),
|mut set: HashSet<PublicKeyBinary>, row| async move {
let entity_key_b = row.get::<&[u8], &str>("entity_key");
let entity_key = bs58::encode(entity_key_b).into_string();
set.insert(PublicKeyBinary::from_str(&entity_key)?);
Ok(set)
},
)
.await
}

pub async fn get_info(
Expand Down Expand Up @@ -413,16 +423,13 @@ pub(crate) mod db {
pub fn all_info_stream<'a>(
db: impl PgExecutor<'a> + 'a,
device_types: &'a [DeviceType],
min_refreshed_at: DateTime<Utc>,
) -> impl Stream<Item = GatewayInfo> + 'a {
match device_types.is_empty() {
true => sqlx::query_as::<_, GatewayInfo>(&GET_METADATA_SQL_REFRESHED_AT)
.bind(min_refreshed_at)
true => sqlx::query_as::<_, GatewayInfo>(GET_METADATA_SQL)
.fetch(db)
.filter_map(|metadata| async move { metadata.ok() })
.boxed(),
false => sqlx::query_as::<_, GatewayInfo>(&DEVICE_TYPES_METADATA_SQL)
.bind(min_refreshed_at)
.bind(
device_types
.iter()
Expand Down Expand Up @@ -464,11 +471,6 @@ pub(crate) mod db {
)
.map_err(|err| sqlx::Error::Decode(Box::new(err)))?;
let created_at = row.get::<DateTime<Utc>, &str>("created_at");
// `refreshed_at` can be NULL in the database schema.
// If so, fallback to using `created_at` as the default value of `refreshed_at`.
let refreshed_at = row
.get::<Option<DateTime<Utc>>, &str>("refreshed_at")
.unwrap_or(created_at);

Ok(Self {
address: PublicKeyBinary::from_str(
Expand All @@ -477,7 +479,6 @@ pub(crate) mod db {
.map_err(|err| sqlx::Error::Decode(Box::new(err)))?,
metadata,
device_type,
refreshed_at: Some(refreshed_at),
created_at: Some(created_at),
})
}
Expand Down
Loading

0 comments on commit 6a8bb9b

Please sign in to comment.