Skip to content

Commit

Permalink
[Mobile Config] Add updated_at to GatewayInfoV2 (response) (#921)
Browse files Browse the repository at this point in the history
* Add updated_at to GatewayInfoV2 responses
  • Loading branch information
kurotych authored Jan 2, 2025
1 parent bad8f0d commit 38aa4c4
Show file tree
Hide file tree
Showing 5 changed files with 410 additions and 43 deletions.
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 51 additions & 12 deletions mobile_config/src/gateway_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,12 @@ pub struct GatewayInfo {
pub address: PublicKeyBinary,
pub metadata: Option<GatewayMetadata>,
pub device_type: DeviceType,
// None for V1
// Optional fields are None for GatewayInfoProto (V1)
pub created_at: Option<DateTime<Utc>>,
// updated_at refers to the last time the data was actually changed.
pub updated_at: Option<DateTime<Utc>>,
// refreshed_at indicates the last time the chain was consulted, regardless of data changes.
pub refreshed_at: Option<DateTime<Utc>>,
}

impl GatewayInfo {
Expand All @@ -119,8 +123,8 @@ pub enum GatewayInfoProtoParseError {
InvalidLocation(#[from] std::num::ParseIntError),
#[error("Invalid created_at: {0}")]
InvalidCreatedAt(u64),
#[error("Invalid refreshed_at: {0}")]
InvalidRefreshedAt(u64),
#[error("Invalid updated_at: {0}")]
InvalidUpdatedAt(u64),
}

impl TryFrom<GatewayInfoProtoV2> for GatewayInfo {
Expand All @@ -134,6 +138,7 @@ impl TryFrom<GatewayInfoProtoV2> for GatewayInfo {
metadata,
device_type: _,
created_at,
updated_at,
} = info;

let metadata = if let Some(metadata) = metadata {
Expand All @@ -152,11 +157,18 @@ impl TryFrom<GatewayInfoProtoV2> for GatewayInfo {
.single()
.ok_or(GatewayInfoProtoParseError::InvalidCreatedAt(created_at))?;

let updated_at = Utc
.timestamp_opt(updated_at as i64, 0)
.single()
.ok_or(GatewayInfoProtoParseError::InvalidUpdatedAt(updated_at))?;

Ok(Self {
address: address.into(),
metadata,
device_type: device_type_,
created_at: Some(created_at),
updated_at: Some(updated_at),
refreshed_at: None,
})
}
}
Expand Down Expand Up @@ -189,6 +201,8 @@ impl TryFrom<GatewayInfoProto> for GatewayInfo {
metadata,
device_type: device_type_,
created_at: None,
updated_at: None,
refreshed_at: None,
})
}
}
Expand Down Expand Up @@ -264,8 +278,8 @@ pub enum GatewayInfoToProtoError {
InvalidLocation(#[from] hextree::Error),
#[error("created_at is None")]
CreatedAtIsNone,
#[error("refreshed_at is None")]
RefreshedAtIsNone,
#[error("updated_at is None")]
UpdatedAtIsNone,
}

impl TryFrom<GatewayInfo> for GatewayInfoProtoV2 {
Expand All @@ -289,6 +303,10 @@ impl TryFrom<GatewayInfo> for GatewayInfoProtoV2 {
.created_at
.ok_or(GatewayInfoToProtoError::CreatedAtIsNone)?
.timestamp() as u64,
updated_at: info
.updated_at
.ok_or(GatewayInfoToProtoError::UpdatedAtIsNone)?
.timestamp() as u64,
})
}
}
Expand Down Expand Up @@ -351,7 +369,7 @@ pub(crate) mod db {
};
use helium_crypto::PublicKeyBinary;
use sqlx::{types::Json, PgExecutor, Row};
use std::{collections::HashSet, str::FromStr};
use std::{collections::HashMap, str::FromStr};

const GET_METADATA_SQL: &str = r#"
select kta.entity_key, infos.location::bigint, infos.device_type,
Expand All @@ -363,7 +381,10 @@ pub(crate) mod db {
const DEVICE_TYPES_WHERE_SNIPPET: &str = " where device_type::text = any($1) ";

const GET_UPDATED_RADIOS: &str =
"SELECT entity_key FROM mobile_radio_tracker WHERE last_changed_at >= $1";
"SELECT entity_key, last_changed_at FROM mobile_radio_tracker WHERE last_changed_at >= $1";

const GET_UPDATED_AT: &str =
"SELECT last_changed_at FROM mobile_radio_tracker WHERE entity_key = $1";

lazy_static::lazy_static! {
static ref BATCH_METADATA_SQL: String = format!("{GET_METADATA_SQL} {BATCH_SQL_WHERE_SNIPPET}");
Expand All @@ -373,23 +394,36 @@ pub(crate) mod db {
pub async fn get_updated_radios(
db: impl PgExecutor<'_>,
min_updated_at: DateTime<Utc>,
) -> anyhow::Result<HashSet<PublicKeyBinary>> {
) -> anyhow::Result<HashMap<PublicKeyBinary, DateTime<Utc>>> {
sqlx::query(GET_UPDATED_RADIOS)
.bind(min_updated_at)
.fetch(db)
.map_err(anyhow::Error::from)
.try_fold(
HashSet::new(),
|mut set: HashSet<PublicKeyBinary>, row| async move {
HashMap::new(),
|mut map: HashMap<PublicKeyBinary, DateTime<Utc>>, row| async move {
let entity_key_b = row.get::<&[u8], &str>("entity_key");
let entity_key = bs58::encode(entity_key_b).into_string();
set.insert(PublicKeyBinary::from_str(&entity_key)?);
Ok(set)
let updated_at = row.get::<DateTime<Utc>, &str>("last_changed_at");
map.insert(PublicKeyBinary::from_str(&entity_key)?, updated_at);
Ok(map)
},
)
.await
}

pub async fn get_updated_at(
db: impl PgExecutor<'_>,
address: &PublicKeyBinary,
) -> anyhow::Result<Option<DateTime<Utc>>> {
let entity_key = bs58::decode(address.to_string()).into_vec()?;
sqlx::query_scalar(GET_UPDATED_AT)
.bind(entity_key)
.fetch_optional(db)
.await
.map_err(anyhow::Error::from)
}

pub async fn get_info(
db: impl PgExecutor<'_>,
address: &PublicKeyBinary,
Expand Down Expand Up @@ -471,6 +505,7 @@ pub(crate) mod db {
)
.map_err(|err| sqlx::Error::Decode(Box::new(err)))?;
let created_at = row.get::<DateTime<Utc>, &str>("created_at");
let refreshed_at = row.get::<Option<DateTime<Utc>>, &str>("refreshed_at");

Ok(Self {
address: PublicKeyBinary::from_str(
Expand All @@ -480,6 +515,10 @@ pub(crate) mod db {
metadata,
device_type,
created_at: Some(created_at),
refreshed_at,
// The updated_at field should be determined by considering the last_changed_at
// value from the mobile_radio_tracker table.
updated_at: None,
})
}
}
Expand Down
116 changes: 90 additions & 26 deletions mobile_config/src/gateway_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
key_cache::KeyCache,
telemetry, verify_public_key, GrpcResult, GrpcStreamResult,
};
use chrono::{TimeZone, Utc};
use chrono::{DateTime, TimeZone, Utc};
use file_store::traits::{MsgVerify, TimestampEncode};
use futures::{
future,
Expand All @@ -15,12 +15,12 @@ use helium_proto::{
services::mobile_config::{
self, GatewayInfoBatchReqV1, GatewayInfoReqV1, GatewayInfoResV1, GatewayInfoResV2,
GatewayInfoStreamReqV1, GatewayInfoStreamReqV2, GatewayInfoStreamResV1,
GatewayInfoStreamResV2,
GatewayInfoStreamResV2, GatewayInfoV2,
},
Message,
};
use sqlx::{Pool, Postgres};
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};
use tonic::{Request, Response, Status};

pub struct GatewayService {
Expand Down Expand Up @@ -129,6 +129,12 @@ impl mobile_config::Gateway for GatewayService {
let pubkey: PublicKeyBinary = request.address.into();
tracing::debug!(pubkey = pubkey.to_string(), "fetching gateway info (v2)");

let updated_at = gateway_info::db::get_updated_at(&self.mobile_config_db_pool, &pubkey)
.await
.map_err(|_| {
Status::internal("error fetching updated_at field for gateway info (v2)")
})?;

gateway_info::db::get_info(&self.metadata_pool, &pubkey)
.await
.map_err(|_| Status::internal("error fetching gateway info (v2)"))?
Expand All @@ -137,15 +143,26 @@ impl mobile_config::Gateway for GatewayService {
telemetry::count_gateway_chain_lookup("not-found");
Err(Status::not_found(pubkey.to_string()))
},
|info| {
|mut info| {
if info.metadata.is_some() {
telemetry::count_gateway_chain_lookup("asserted");
} else {
telemetry::count_gateway_chain_lookup("not-asserted");
};
let info = info

// determine updated_at
if let Some(v) = updated_at {
info.updated_at = Some(v)
} else if info.refreshed_at.is_some() {
info.updated_at = info.refreshed_at;
} else {
info.updated_at = info.created_at;
}

let info: GatewayInfoV2 = info
.try_into()
.map_err(|_| Status::internal("error serializing gateway info (v2)"))?;

let mut res = GatewayInfoResV2 {
info: Some(info),
timestamp: Utc::now().encode_timestamp(),
Expand Down Expand Up @@ -212,7 +229,8 @@ impl mobile_config::Gateway for GatewayService {
"fetching gateways' info batch"
);

let pool = self.metadata_pool.clone();
let metadata_db_pool = self.metadata_pool.clone();
let mobile_config_db_pool = self.mobile_config_db_pool.clone();
let signing_key = self.signing_key.clone();
let batch_size = request.batch_size;
let addresses = request
Expand All @@ -224,7 +242,19 @@ impl mobile_config::Gateway for GatewayService {
let (tx, rx) = tokio::sync::mpsc::channel(100);

tokio::spawn(async move {
let stream = gateway_info::db::batch_info_stream(&pool, &addresses)?;
let min_updated_at = DateTime::UNIX_EPOCH;
let updated_radios = get_updated_radios(&mobile_config_db_pool, min_updated_at).await?;

let stream = gateway_info::db::batch_info_stream(&metadata_db_pool, &addresses)?;
let stream = stream
.filter_map(|gateway_info| {
future::ready(handle_updated_at(
gateway_info,
&updated_radios,
min_updated_at,
))
})
.boxed();
stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await
});

Expand Down Expand Up @@ -291,32 +321,66 @@ impl mobile_config::Gateway for GatewayService {
);

tokio::spawn(async move {
let min_updated_at = Utc
.timestamp_opt(request.min_updated_at as i64, 0)
.single()
.ok_or(Status::invalid_argument(
"Invalid min_refreshed_at argument",
))?;

let updated_radios = get_updated_radios(&mobile_config_db_pool, min_updated_at).await?;
let stream = gateway_info::db::all_info_stream(&metadata_db_pool, &device_types);
if request.min_updated_at > 0 {
let min_updated_at = Utc
.timestamp_opt(request.min_updated_at as i64, 0)
.single()
.ok_or(Status::invalid_argument(
"Invalid min_refreshed_at argument",
))?;

let updated_radios =
get_updated_radios(&mobile_config_db_pool, min_updated_at).await?;
let stream = stream
.filter(|v| future::ready(updated_radios.contains(&v.address)))
.boxed();
stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size)
.await
} else {
stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size)
.await
}
let stream = stream
.filter_map(|gateway_info| {
future::ready(handle_updated_at(
gateway_info,
&updated_radios,
min_updated_at,
))
})
.boxed();
stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await
});

Ok(Response::new(GrpcStreamResult::new(rx)))
}
}

fn handle_updated_at(
mut gateway_info: GatewayInfo,
updated_radios: &HashMap<PublicKeyBinary, chrono::DateTime<Utc>>,
min_updated_at: chrono::DateTime<Utc>,
) -> Option<GatewayInfo> {
// Check mobile_radio_tracker HashMap
if let Some(updated_at) = updated_radios.get(&gateway_info.address) {
// It could be already filtered by min_updated_at but recheck won't hurt
if updated_at >= &min_updated_at {
gateway_info.updated_at = Some(*updated_at);
return Some(gateway_info);
}
return None;
}
// Fallback solution #1. Try to use refreshed_at as updated_at field and check
// min_updated_at
if let Some(refreshed_at) = gateway_info.refreshed_at {
if refreshed_at >= min_updated_at {
gateway_info.updated_at = Some(refreshed_at);
return Some(gateway_info);
}
return None;
}
// Fallback solution #2. Try to use created_at as updated_at field and check
// min_updated_at
if let Some(created_at) = gateway_info.created_at {
if created_at >= min_updated_at {
gateway_info.updated_at = Some(created_at);
return Some(gateway_info);
}
return None;
}
None
}

trait GatewayInfoStreamRes {
type GatewayInfoType;
fn new(gateways: Vec<Self::GatewayInfoType>, timestamp: u64, signer: Vec<u8>) -> Self;
Expand Down
Loading

0 comments on commit 38aa4c4

Please sign in to comment.