diff --git a/Cargo.lock b/Cargo.lock index a83c77228..fc459c706 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1615,7 +1615,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#16d6838b88a35ac88797d0c7eb14a932b214a856" +source = "git+https://github.com/helium/proto?branch=master#b4c8c8f47dfff38a2ff1b7fe14e1b2a1beea651c" dependencies = [ "base64 0.21.7", "byteorder", @@ -1625,7 +1625,7 @@ dependencies = [ "rand_chacha 0.3.0", "rust_decimal", "serde", - "sha2 0.10.8", + "sha2 0.9.9", "thiserror", ] @@ -3821,7 +3821,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#16d6838b88a35ac88797d0c7eb14a932b214a856" +source = "git+https://github.com/helium/proto?branch=master#b4c8c8f47dfff38a2ff1b7fe14e1b2a1beea651c" dependencies = [ "bytes", "prost", @@ -6063,7 +6063,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80b776a1b2dc779f5ee0641f8ade0125bc1298dd41a9a0c16d8bd57b42d222b1" dependencies = [ "bytes", - "heck 0.5.0", + "heck 0.4.0", "itertools", "log", "multimap", @@ -9988,7 +9988,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", - "sha2 0.10.8", + "sha2 0.9.9", "thiserror", "twox-hash", "xorf", diff --git a/mobile_config/src/gateway_info.rs b/mobile_config/src/gateway_info.rs index 5ff690564..044cfd6c9 100644 --- a/mobile_config/src/gateway_info.rs +++ b/mobile_config/src/gateway_info.rs @@ -104,7 +104,6 @@ pub struct GatewayInfo { pub metadata: Option, pub device_type: DeviceType, // None for V1 - pub refreshed_at: Option>, pub created_at: Option>, } @@ -135,7 +134,6 @@ impl TryFrom for GatewayInfo { metadata, device_type: _, created_at, - refreshed_at, } = info; let metadata = if let Some(metadata) = metadata { @@ -154,16 +152,11 @@ impl TryFrom for GatewayInfo { .single() .ok_or(GatewayInfoProtoParseError::InvalidCreatedAt(created_at))?; - let refreshed_at = Utc.timestamp_opt(refreshed_at as i64, 0).single().ok_or( - GatewayInfoProtoParseError::InvalidRefreshedAt(info.refreshed_at), - )?; - Ok(Self { address: address.into(), metadata, device_type: device_type_, created_at: Some(created_at), - refreshed_at: Some(refreshed_at), }) } } @@ -196,7 +189,6 @@ impl TryFrom for GatewayInfo { metadata, device_type: device_type_, created_at: None, - refreshed_at: None, }) } } @@ -297,10 +289,6 @@ impl TryFrom for GatewayInfoProtoV2 { .created_at .ok_or(GatewayInfoToProtoError::CreatedAtIsNone)? .timestamp() as u64, - refreshed_at: info - .refreshed_at - .ok_or(GatewayInfoToProtoError::RefreshedAtIsNone)? - .timestamp() as u64, }) } } @@ -357,10 +345,13 @@ pub(crate) mod db { use super::{DeviceType, GatewayInfo, GatewayMetadata}; use crate::gateway_info::DeploymentInfo; use chrono::{DateTime, Utc}; - use futures::stream::{Stream, StreamExt}; + use futures::{ + stream::{Stream, StreamExt}, + TryStreamExt, + }; use helium_crypto::PublicKeyBinary; use sqlx::{types::Json, PgExecutor, Row}; - use std::str::FromStr; + use std::{collections::HashSet, str::FromStr}; const GET_METADATA_SQL: &str = r#" select kta.entity_key, infos.location::bigint, infos.device_type, @@ -369,15 +360,34 @@ pub(crate) mod db { join key_to_assets kta on infos.asset = kta.asset "#; const BATCH_SQL_WHERE_SNIPPET: &str = " where kta.entity_key = any($1::bytea[]) "; - const DEVICE_TYPES_AND_SNIPPET: &str = " and device_type::text = any($2) "; + const DEVICE_TYPES_WHERE_SNIPPET: &str = " where device_type::text = any($1) "; + + const GET_UPDATED_RADIOS: &str = + "SELECT entity_key FROM mobile_radio_tracker WHERE last_changed_at >= $1"; lazy_static::lazy_static! { static ref BATCH_METADATA_SQL: String = format!("{GET_METADATA_SQL} {BATCH_SQL_WHERE_SNIPPET}"); - static ref GET_METADATA_SQL_REFRESHED_AT: String = format!(r#"{GET_METADATA_SQL} - where ( infos.refreshed_at >= $1 OR (infos.refreshed_at IS NULL AND infos.created_at >= $1) ) "#); - - static ref DEVICE_TYPES_METADATA_SQL: String = format!("{} {}", *GET_METADATA_SQL_REFRESHED_AT, DEVICE_TYPES_AND_SNIPPET); + static ref DEVICE_TYPES_METADATA_SQL: String = format!("{GET_METADATA_SQL} {DEVICE_TYPES_WHERE_SNIPPET}"); + } + pub async fn get_updated_radios( + db: impl PgExecutor<'_>, + min_updated_at: DateTime, + ) -> anyhow::Result> { + sqlx::query(GET_UPDATED_RADIOS) + .bind(min_updated_at) + .fetch(db) + .map_err(anyhow::Error::from) + .try_fold( + HashSet::new(), + |mut set: HashSet, row| async move { + let entity_key_b = row.get::<&[u8], &str>("entity_key"); + let entity_key = bs58::encode(entity_key_b).into_string(); + set.insert(PublicKeyBinary::from_str(&entity_key)?); + Ok(set) + }, + ) + .await } pub async fn get_info( @@ -413,16 +423,13 @@ pub(crate) mod db { pub fn all_info_stream<'a>( db: impl PgExecutor<'a> + 'a, device_types: &'a [DeviceType], - min_refreshed_at: DateTime, ) -> impl Stream + 'a { match device_types.is_empty() { - true => sqlx::query_as::<_, GatewayInfo>(&GET_METADATA_SQL_REFRESHED_AT) - .bind(min_refreshed_at) + true => sqlx::query_as::<_, GatewayInfo>(GET_METADATA_SQL) .fetch(db) .filter_map(|metadata| async move { metadata.ok() }) .boxed(), false => sqlx::query_as::<_, GatewayInfo>(&DEVICE_TYPES_METADATA_SQL) - .bind(min_refreshed_at) .bind( device_types .iter() @@ -464,11 +471,6 @@ pub(crate) mod db { ) .map_err(|err| sqlx::Error::Decode(Box::new(err)))?; let created_at = row.get::, &str>("created_at"); - // `refreshed_at` can be NULL in the database schema. - // If so, fallback to using `created_at` as the default value of `refreshed_at`. - let refreshed_at = row - .get::>, &str>("refreshed_at") - .unwrap_or(created_at); Ok(Self { address: PublicKeyBinary::from_str( @@ -477,7 +479,6 @@ pub(crate) mod db { .map_err(|err| sqlx::Error::Decode(Box::new(err)))?, metadata, device_type, - refreshed_at: Some(refreshed_at), created_at: Some(created_at), }) } diff --git a/mobile_config/src/gateway_service.rs b/mobile_config/src/gateway_service.rs index 13398d287..7b66405c8 100644 --- a/mobile_config/src/gateway_service.rs +++ b/mobile_config/src/gateway_service.rs @@ -1,19 +1,21 @@ use crate::{ - gateway_info::{self, DeviceType, GatewayInfo}, + gateway_info::{self, db::get_updated_radios, DeviceType, GatewayInfo}, key_cache::KeyCache, telemetry, verify_public_key, GrpcResult, GrpcStreamResult, }; -use chrono::{DateTime, TimeZone, Utc}; +use chrono::{TimeZone, Utc}; use file_store::traits::{MsgVerify, TimestampEncode}; use futures::{ + future, stream::{Stream, StreamExt, TryStreamExt}, TryFutureExt, }; use helium_crypto::{Keypair, PublicKey, PublicKeyBinary, Sign}; use helium_proto::{ services::mobile_config::{ - self, GatewayInfoBatchReqV1, GatewayInfoReqV1, GatewayInfoResV1, GatewayInfoStreamReqV1, - GatewayInfoStreamReqV2, GatewayInfoStreamResV1, GatewayInfoStreamResV2, + self, GatewayInfoBatchReqV1, GatewayInfoReqV1, GatewayInfoResV1, GatewayInfoResV2, + GatewayInfoStreamReqV1, GatewayInfoStreamReqV2, GatewayInfoStreamResV1, + GatewayInfoStreamResV2, }, Message, }; @@ -23,16 +25,23 @@ use tonic::{Request, Response, Status}; pub struct GatewayService { key_cache: KeyCache, + mobile_config_db_pool: Pool, metadata_pool: Pool, signing_key: Arc, } impl GatewayService { - pub fn new(key_cache: KeyCache, metadata_pool: Pool, signing_key: Keypair) -> Self { + pub fn new( + key_cache: KeyCache, + metadata_pool: Pool, + signing_key: Keypair, + mobile_config_db_pool: Pool, + ) -> Self { Self { key_cache, metadata_pool, signing_key: Arc::new(signing_key), + mobile_config_db_pool, } } @@ -68,6 +77,7 @@ impl GatewayService { #[tonic::async_trait] impl mobile_config::Gateway for GatewayService { + // Deprecated async fn info(&self, request: Request) -> GrpcResult { let request = request.into_inner(); telemetry::count_request("gateway", "info"); @@ -108,6 +118,47 @@ impl mobile_config::Gateway for GatewayService { ) } + async fn info_v2(&self, request: Request) -> GrpcResult { + let request = request.into_inner(); + telemetry::count_request("gateway", "info-v2"); + custom_tracing::record_b58("pub_key", &request.address); + custom_tracing::record_b58("signer", &request.signer); + + self.verify_request_signature_for_info(&request)?; + + let pubkey: PublicKeyBinary = request.address.into(); + tracing::debug!(pubkey = pubkey.to_string(), "fetching gateway info (v2)"); + + gateway_info::db::get_info(&self.metadata_pool, &pubkey) + .await + .map_err(|_| Status::internal("error fetching gateway info (v2)"))? + .map_or_else( + || { + telemetry::count_gateway_chain_lookup("not-found"); + Err(Status::not_found(pubkey.to_string())) + }, + |info| { + if info.metadata.is_some() { + telemetry::count_gateway_chain_lookup("asserted"); + } else { + telemetry::count_gateway_chain_lookup("not-asserted"); + }; + let info = info + .try_into() + .map_err(|_| Status::internal("error serializing gateway info (v2)"))?; + let mut res = GatewayInfoResV2 { + info: Some(info), + timestamp: Utc::now().encode_timestamp(), + signer: self.signing_key.public_key().into(), + signature: vec![], + }; + res.signature = self.sign_response(&res.encode_to_vec())?; + Ok(Response::new(res)) + }, + ) + } + + // Deprecated type info_batchStream = GrpcStreamResult; async fn info_batch( &self, @@ -144,6 +195,42 @@ impl mobile_config::Gateway for GatewayService { Ok(Response::new(GrpcStreamResult::new(rx))) } + type info_batch_v2Stream = GrpcStreamResult; + async fn info_batch_v2( + &self, + request: Request, + ) -> GrpcResult { + let request = request.into_inner(); + telemetry::count_request("gateway", "info-batch-v2"); + custom_tracing::record_b58("signer", &request.signer); + + let signer = verify_public_key(&request.signer)?; + self.verify_request_signature(&signer, &request)?; + + tracing::debug!( + batch = request.addresses.len(), + "fetching gateways' info batch" + ); + + let pool = self.metadata_pool.clone(); + let signing_key = self.signing_key.clone(); + let batch_size = request.batch_size; + let addresses = request + .addresses + .into_iter() + .map(|key| key.into()) + .collect::>(); + + let (tx, rx) = tokio::sync::mpsc::channel(100); + + tokio::spawn(async move { + let stream = gateway_info::db::batch_info_stream(&pool, &addresses)?; + stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await + }); + + Ok(Response::new(GrpcStreamResult::new(rx))) + } + // Deprecated type info_streamStream = GrpcStreamResult; async fn info_stream( @@ -170,8 +257,7 @@ impl mobile_config::Gateway for GatewayService { ); tokio::spawn(async move { - let stream = - gateway_info::db::all_info_stream(&pool, &device_types, DateTime::UNIX_EPOCH); + let stream = gateway_info::db::all_info_stream(&pool, &device_types); stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await }); @@ -190,19 +276,14 @@ impl mobile_config::Gateway for GatewayService { let signer = verify_public_key(&request.signer)?; self.verify_request_signature(&signer, &request)?; - let pool = self.metadata_pool.clone(); + let metadata_db_pool = self.metadata_pool.clone(); + let mobile_config_db_pool = self.mobile_config_db_pool.clone(); let signing_key = self.signing_key.clone(); let batch_size = request.batch_size; let (tx, rx) = tokio::sync::mpsc::channel(100); let device_types: Vec = request.device_types().map(|v| v.into()).collect(); - let min_refreshed_at = Utc - .timestamp_opt(request.min_refreshed_at as i64, 0) - .single() - .ok_or(Status::invalid_argument( - "Invalid min_refreshed_at argument", - ))?; tracing::debug!( "fetching all gateways' info (v2). Device types: {:?} ", @@ -210,8 +291,26 @@ impl mobile_config::Gateway for GatewayService { ); tokio::spawn(async move { - let stream = gateway_info::db::all_info_stream(&pool, &device_types, min_refreshed_at); - stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await + let stream = gateway_info::db::all_info_stream(&metadata_db_pool, &device_types); + if request.min_updated_at > 0 { + let min_updated_at = Utc + .timestamp_opt(request.min_updated_at as i64, 0) + .single() + .ok_or(Status::invalid_argument( + "Invalid min_refreshed_at argument", + ))?; + + let updated_radios = + get_updated_radios(&mobile_config_db_pool, min_updated_at).await?; + let stream = stream + .filter(|v| future::ready(updated_radios.contains(&v.address))) + .boxed(); + stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size) + .await + } else { + stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size) + .await + } }); Ok(Response::new(GrpcStreamResult::new(rx))) diff --git a/mobile_config/src/main.rs b/mobile_config/src/main.rs index 2128851a3..b869bef2c 100644 --- a/mobile_config/src/main.rs +++ b/mobile_config/src/main.rs @@ -75,6 +75,7 @@ impl Daemon { key_cache.clone(), metadata_pool.clone(), settings.signing_keypair()?, + pool.clone(), ); let auth_svc = AuthorizationService::new(key_cache.clone(), settings.signing_keypair()?); let entity_svc = EntityService::new( diff --git a/mobile_config/tests/gateway_service.rs b/mobile_config/tests/gateway_service.rs index 49101ca3b..609cb43cb 100644 --- a/mobile_config/tests/gateway_service.rs +++ b/mobile_config/tests/gateway_service.rs @@ -1,4 +1,6 @@ -use chrono::{DateTime, Utc}; +use std::vec; + +use chrono::{DateTime, Duration, Utc}; use futures::stream::StreamExt; use helium_crypto::{KeyTag, Keypair, PublicKey, PublicKeyBinary, Sign}; @@ -33,7 +35,7 @@ async fn gateway_info_authorization_errors(pool: PgPool) -> anyhow::Result<()> { // Start the gateway server let keys = CacheKeys::from_iter([(admin_key.public_key().to_owned(), KeyRole::Administrator)]); let (_key_cache_tx, key_cache) = KeyCache::new(keys); - let gws = GatewayService::new(key_cache, pool.clone(), server_key); + let gws = GatewayService::new(key_cache, pool.clone(), server_key, pool.clone()); let _handle = tokio::spawn( transport::Server::builder() .add_service(proto::GatewayServer::new(gws)) @@ -98,7 +100,7 @@ async fn spawn_gateway_service( // Start the gateway server let keys = CacheKeys::from_iter([(admin_pub_key.to_owned(), KeyRole::Administrator)]); let (_key_cache_tx, key_cache) = KeyCache::new(keys); - let gws = GatewayService::new(key_cache, pool, server_key); + let gws = GatewayService::new(key_cache, pool.clone(), server_key, pool.clone()); let handle = tokio::spawn( transport::Server::builder() .add_service(proto::GatewayServer::new(gws)) @@ -217,11 +219,14 @@ async fn gateway_stream_info_v2(pool: PgPool) { } #[sqlx::test] -async fn gateway_stream_info_v2_refreshed_at_is_null(pool: PgPool) { +async fn gateway_stream_info_v2_updated_at(pool: PgPool) { let admin_key = make_keypair(); let asset1_pubkey = make_keypair().public_key().clone(); let asset1_hex_idx = 631711281837647359_i64; - let now = Utc::now(); + let asset2_hex_idx = 631711286145955327_i64; + let asset2_pubkey = make_keypair().public_key().clone(); + let created_at = Utc::now() - Duration::hours(5); + let updated_at = Utc::now() - Duration::hours(3); create_db_tables(&pool).await; add_db_record( @@ -230,28 +235,187 @@ async fn gateway_stream_info_v2_refreshed_at_is_null(pool: PgPool) { asset1_hex_idx, "\"wifiIndoor\"", asset1_pubkey.clone().into(), - now, + created_at, + Some(updated_at), + None, + ) + .await; + add_mobile_tracker_record(&pool, asset1_pubkey.clone().into(), updated_at).await; + + // Shouldn't be returned + add_db_record( + &pool, + "asset2", + asset2_hex_idx, + "\"wifiDataOnly\"", + asset2_pubkey.clone().into(), + created_at, None, None, ) .await; + add_mobile_tracker_record(&pool, asset2_pubkey.clone().into(), created_at).await; let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await.unwrap(); - let req = make_gateway_stream_signed_req_v2(&admin_key, &[], now.timestamp() as u64); + let req = make_gateway_stream_signed_req_v2(&admin_key, &[], updated_at.timestamp() as u64); let mut stream = client.info_stream_v2(req).await.unwrap().into_inner(); - - // Make sure the gateway was returned let resp = stream.next().await.unwrap().unwrap(); assert_eq!(resp.gateways.len(), 1); - let req = make_gateway_stream_signed_req_v2(&admin_key, &[], (now.timestamp() + 1) as u64); - let mut stream = client.info_stream_v2(req).await.unwrap().into_inner(); - // Response is empty + let gw_info = resp.gateways.first().unwrap(); + let pub_key = PublicKey::from_bytes(gw_info.address.clone()).unwrap(); + assert_eq!(pub_key, asset1_pubkey.clone()); + assert_eq!( + DeviceType::try_from(gw_info.device_type).unwrap(), + DeviceType::WifiIndoor + ); + assert_eq!( + i64::from_str_radix(&gw_info.metadata.clone().unwrap().location, 16).unwrap(), + asset1_hex_idx + ); assert!(stream.next().await.is_none()); } +#[sqlx::test] +async fn gateway_info_batch_v2(pool: PgPool) { + let admin_key = make_keypair(); + let asset1_pubkey = make_keypair().public_key().clone(); + let asset1_hex_idx = 631711281837647359_i64; + let asset2_hex_idx = 631711286145955327_i64; + let asset2_pubkey = make_keypair().public_key().clone(); + let created_at = Utc::now() - Duration::hours(5); + let updated_at = Utc::now() - Duration::hours(3); + + create_db_tables(&pool).await; + add_db_record( + &pool, + "asset1", + asset1_hex_idx, + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + created_at, + Some(updated_at), + Some(r#"{"wifiInfoV0": {"antenna": 18, "azimuth": 161, "elevation": 2, "electricalDownTilt": 3, "mechanicalDownTilt": 4}}"#) + ) + .await; + + add_db_record( + &pool, + "asset2", + asset2_hex_idx, + "\"wifiDataOnly\"", + asset2_pubkey.clone().into(), + created_at, + None, + None, + ) + .await; + add_mobile_tracker_record(&pool, asset2_pubkey.clone().into(), created_at).await; + + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await.unwrap(); + + let req = make_signed_info_batch_request( + &vec![asset1_pubkey.clone(), asset2_pubkey.clone()], + &admin_key, + ); + let stream = client.info_batch_v2(req).await.unwrap().into_inner(); + let resp = stream + .filter_map(|result| async { result.ok() }) + .collect::>() + .await; + + let gateways = resp.first().unwrap().gateways.clone(); + let gw1 = gateways + .iter() + .find(|v| v.address == asset1_pubkey.to_vec()) + .unwrap(); + + let deployment_info = gw1.metadata.clone().unwrap().deployment_info.unwrap(); + + match deployment_info { + DeploymentInfo::WifiDeploymentInfo(v) => { + assert_eq!(v.antenna, 18); + assert_eq!(v.azimuth, 161); + assert_eq!(v.elevation, 2); + assert_eq!(v.electrical_down_tilt, 3); + assert_eq!(v.mechanical_down_tilt, 4); + } + DeploymentInfo::CbrsDeploymentInfo(_) => panic!(), + }; + + let gw2 = gateways + .iter() + .find(|v| v.address == asset2_pubkey.to_vec()) + .unwrap(); + assert!(gw2.metadata.clone().unwrap().deployment_info.is_none()); +} + +#[sqlx::test] +async fn gateway_info_v2(pool: PgPool) { + let admin_key = make_keypair(); + let asset1_pubkey = make_keypair().public_key().clone(); + let asset1_hex_idx = 631711281837647359_i64; + let asset2_pubkey = make_keypair().public_key().clone(); + let created_at = Utc::now() - Duration::hours(5); + let updated_at = Utc::now() - Duration::hours(3); + + create_db_tables(&pool).await; + add_db_record( + &pool, + "asset1", + asset1_hex_idx, + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + created_at, + Some(updated_at), + Some(r#"{"wifiInfoV0": {"antenna": 18, "azimuth": 161, "elevation": 2, "electricalDownTilt": 3, "mechanicalDownTilt": 4}}"#) + ) + .await; + + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await.unwrap(); + + let req = make_signed_info_request(&asset1_pubkey, &admin_key); + let resp = client.info_v2(req).await.unwrap().into_inner(); + + let gw_info = resp.info.unwrap(); + let pub_key = PublicKey::from_bytes(gw_info.address.clone()).unwrap(); + assert_eq!(pub_key, asset1_pubkey.clone()); + assert_eq!( + DeviceType::try_from(gw_info.device_type).unwrap(), + DeviceType::WifiIndoor + ); + assert_eq!( + i64::from_str_radix(&gw_info.metadata.clone().unwrap().location, 16).unwrap(), + asset1_hex_idx + ); + + let deployment_info = gw_info.metadata.clone().unwrap().deployment_info.unwrap(); + + match deployment_info { + DeploymentInfo::WifiDeploymentInfo(v) => { + assert_eq!(v.antenna, 18); + assert_eq!(v.azimuth, 161); + assert_eq!(v.elevation, 2); + assert_eq!(v.electrical_down_tilt, 3); + assert_eq!(v.mechanical_down_tilt, 4); + } + DeploymentInfo::CbrsDeploymentInfo(_) => panic!(), + }; + + // Non-existent + let req = make_signed_info_request(&asset2_pubkey, &admin_key); + let resp_err = client + .info_v2(req) + .await + .expect_err("testing expects error"); + + assert_eq!(resp_err.code(), Code::NotFound); +} + #[sqlx::test] async fn gateway_stream_info_v2_deployment_info(pool: PgPool) { let admin_key = make_keypair(); @@ -374,6 +538,30 @@ async fn gateway_stream_info_v2_deployment_info(pool: PgPool) { } } +async fn add_mobile_tracker_record( + pool: &PgPool, + key: PublicKeyBinary, + last_changed_at: DateTime, +) { + let b58 = bs58::decode(key.to_string()).into_vec().unwrap(); + + sqlx::query( + r#" + INSERT INTO +"mobile_radio_tracker" ("entity_key", "hash", "last_changed_at", "last_checked_at") + VALUES +($1, $2, $3, $4); + "#, + ) + .bind(b58) + .bind("hash") + .bind(last_changed_at) + .bind(last_changed_at + Duration::hours(1)) + .execute(pool) + .await + .unwrap(); +} + #[allow(clippy::too_many_arguments)] async fn add_db_record( pool: &PgPool, @@ -477,14 +665,14 @@ fn make_keypair() -> Keypair { fn make_gateway_stream_signed_req_v2( signer: &Keypair, device_types: &[DeviceType], - min_refreshed_at: u64, + min_updated_at: u64, ) -> proto::GatewayInfoStreamReqV2 { let mut req = GatewayInfoStreamReqV2 { batch_size: 10000, signer: signer.public_key().to_vec(), signature: vec![], device_types: device_types.iter().map(|v| DeviceType::into(*v)).collect(), - min_refreshed_at, + min_updated_at, }; req.signature = signer.sign(&req.encode_to_vec()).unwrap(); @@ -515,3 +703,17 @@ fn make_signed_info_request(address: &PublicKey, signer: &Keypair) -> proto::Gat req.signature = signer.sign(&req.encode_to_vec()).unwrap(); req } + +fn make_signed_info_batch_request( + addresses: &[PublicKey], + signer: &Keypair, +) -> proto::GatewayInfoBatchReqV1 { + let mut req = proto::GatewayInfoBatchReqV1 { + addresses: addresses.iter().map(|v| v.to_vec()).collect(), + batch_size: 42, + signer: signer.public_key().to_vec(), + signature: vec![], + }; + req.signature = signer.sign(&req.encode_to_vec()).unwrap(); + req +} diff --git a/mobile_verifier/tests/integrations/speedtests.rs b/mobile_verifier/tests/integrations/speedtests.rs index d63fd0a54..791480b45 100644 --- a/mobile_verifier/tests/integrations/speedtests.rs +++ b/mobile_verifier/tests/integrations/speedtests.rs @@ -35,7 +35,6 @@ impl GatewayInfoResolver for MockGatewayInfoResolver { metadata: None, device_type: DeviceType::Cbrs, created_at: None, - refreshed_at: None, })) }