diff --git a/Cargo.lock b/Cargo.lock index 596a95200..a83c77228 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "Inflector" @@ -1615,7 +1615,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#8e3edc2053a16ec98421d83211399338836f91e4" +source = "git+https://github.com/helium/proto?branch=master#16d6838b88a35ac88797d0c7eb14a932b214a856" dependencies = [ "base64 0.21.7", "byteorder", @@ -1625,7 +1625,7 @@ dependencies = [ "rand_chacha 0.3.0", "rust_decimal", "serde", - "sha2 0.9.9", + "sha2 0.10.8", "thiserror", ] @@ -3821,7 +3821,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#8e3edc2053a16ec98421d83211399338836f91e4" +source = "git+https://github.com/helium/proto?branch=master#16d6838b88a35ac88797d0c7eb14a932b214a856" dependencies = [ "bytes", "prost", @@ -6063,7 +6063,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80b776a1b2dc779f5ee0641f8ade0125bc1298dd41a9a0c16d8bd57b42d222b1" dependencies = [ "bytes", - "heck 0.4.0", + "heck 0.5.0", "itertools", "log", "multimap", @@ -6122,7 +6122,7 @@ dependencies = [ [[package]] name = "pyth-solana-receiver-sdk" version = "0.3.1" -source = "git+https://github.com/madninja/pyth-crosschain.git?branch=madninja/cap_solana_dep#6576247294bde3ab7b62f7a2dfb4d4d48c401b35" +source = "git+https://github.com/madninja/pyth-crosschain.git?branch=madninja%2Fcap_solana_dep#6576247294bde3ab7b62f7a2dfb4d4d48c401b35" dependencies = [ "anchor-lang 0.29.0", "hex", @@ -6133,7 +6133,7 @@ dependencies = [ [[package]] name = "pythnet-sdk" version = "2.3.0" -source = "git+https://github.com/madninja/pyth-crosschain.git?branch=madninja/cap_solana_dep#6576247294bde3ab7b62f7a2dfb4d4d48c401b35" +source = "git+https://github.com/madninja/pyth-crosschain.git?branch=madninja%2Fcap_solana_dep#6576247294bde3ab7b62f7a2dfb4d4d48c401b35" dependencies = [ "anchor-lang 0.30.1", "bincode", @@ -9988,7 +9988,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", - "sha2 0.9.9", + "sha2 0.10.8", "thiserror", "twox-hash", "xorf", diff --git a/coverage_point_calculator/src/hexes.rs b/coverage_point_calculator/src/hexes.rs index 912369a85..4901c77db 100644 --- a/coverage_point_calculator/src/hexes.rs +++ b/coverage_point_calculator/src/hexes.rs @@ -77,13 +77,18 @@ pub(crate) fn clean_covered_hexes( // hip-131: if the radio is banned, it automatically gets an assignment_multiplier of 0.0 // hip-103: if a hex is boosted by a service provider >=1x, the oracle - // multiplier will automatically be 1x, regardless of boosted_hex_status. - let assignment_multiplier = if oracle_boosting_status == OracleBoostingStatus::Banned { - dec!(0) - } else if ranked.boosted.is_some() { - dec!(1) - } else { - ranked.assignments.boosting_multiplier() + // multiplier will automatically be 1x, regardless of boosted_hex_status. + // hip-134: qualified radios earn full Oracle Boosting rewards + let assignment_multiplier = match oracle_boosting_status { + OracleBoostingStatus::Qualified if radio_type.is_wifi() => dec!(1), + OracleBoostingStatus::Banned => dec!(0), + OracleBoostingStatus::Qualified | OracleBoostingStatus::Eligible => { + if ranked.boosted.is_some() { + dec!(1) + } else { + ranked.assignments.boosting_multiplier() + } + } }; let base_coverage_points = @@ -209,9 +214,54 @@ mod tests { ) .unwrap(); - dbg!(&covered_hexes); - assert_eq!(dec!(0), covered_hexes[0].assignment_multiplier); assert_eq!(dec!(0), covered_hexes[1].assignment_multiplier); } + + #[rstest] + fn hip134_qualified_radio( + #[values( + OracleBoostingStatus::Qualified, + OracleBoostingStatus::Eligible, + OracleBoostingStatus::Banned + )] + boost_status: OracleBoostingStatus, + #[values( + RadioType::IndoorCbrs, + RadioType::OutdoorCbrs, + RadioType::IndoorWifi, + RadioType::OutdoorWifi + )] + radio_type: RadioType, + ) { + let coverage = RankedCoverage { + hotspot_key: vec![1], + cbsd_id: None, + hex: hextree::Cell::from_raw(0x8c2681a3064edff).unwrap(), + rank: 1, + signal_level: SignalLevel::High, + assignments: HexAssignments { + footfall: Assignment::C, + landtype: Assignment::C, + urbanized: Assignment::C, + }, + boosted: NonZeroU32::new(0), + }; + + let covered_hexes = clean_covered_hexes( + radio_type, + SpBoostedHexStatus::Eligible, + vec![coverage], + boost_status, + ) + .unwrap(); + + // Only Qualified WIFI radios should bypass bad assignment multiplier + let expected_multiplier = match boost_status { + OracleBoostingStatus::Qualified if radio_type.is_wifi() => dec!(1), + _ => dec!(0), + }; + + assert_eq!(expected_multiplier, covered_hexes[0].assignment_multiplier); + } } diff --git a/coverage_point_calculator/src/lib.rs b/coverage_point_calculator/src/lib.rs index 35f708105..9dcdcbcf9 100644 --- a/coverage_point_calculator/src/lib.rs +++ b/coverage_point_calculator/src/lib.rs @@ -12,6 +12,8 @@ //! - [CoveredHex::assignment_multiplier] //! - [HIP-103][oracle-boosting] //! - provider boosted hexes increase oracle boosting to 1x +//! - [HIP-134][carrier-offload] +//! - serving >25 unique connection increase oracle boosting to 1x //! //! - [CoveredHex::rank] //! - [HIP-105][hex-limits] @@ -44,13 +46,14 @@ //! - If a Radio is not [BoostedHexStatus::Eligible], boost values are removed before calculations. //! - If a Hex is boosted by a Provider, the Oracle Assignment multiplier is automatically 1x. //! -//! - [ServiceProviderBoostedRewardEligibility] +//! - [SPBoostedRewardEligibility] //! - Radio must pass at least 1mb of data from 3 unique phones [HIP-84][provider-boosting] //! - Service Provider can invalidate boosted rewards of a hotspot [HIP-125][provider-banning] //! //! - [OracleBoostingStatus] //! - Eligible: Radio is eligible for normal oracle boosting multipliers //! - Banned: Radio is banned according to hip-131 rules and all assignment_multipliers are 0.0 +//! - Qualified: Radio serves >25 unique connections, automatic oracle boosting multiplier of 1x //! //! [modeled-coverage]: https://github.com/helium/HIP/blob/main/0074-mobile-poc-modeled-coverage-rewards.md#outdoor-radios //! [provider-boosting]: https://github.com/helium/HIP/blob/main/0084-service-provider-hex-boosting.md @@ -65,6 +68,7 @@ //! [location-gaming]: https://github.com/helium/HIP/blob/main/0119-closing-gaming-loopholes-within-the-mobile-network.md //! [provider-banning]: https://github.com/helium/HIP/blob/main/0125-temporary-anti-gaming-measures-for-boosted-hexes.md //! [anti-gaming]: https://github.com/helium/HIP/blob/main/0131-bridging-gap-between-verification-mappers-and-anti-gaming-measures.md +//! [carrier-offload]: https://github.com/helium/HIP/blob/main/0134-reward-mobile-carrier-offload-hotspots.md //! pub use crate::{ hexes::{CoveredHex, HexPoints}, @@ -136,10 +140,12 @@ pub struct CoveragePoints { pub speedtest_multiplier: Decimal, /// Input Radio Type pub radio_type: RadioType, - /// Input ServiceProviderBoostedRewardEligibility + /// Input SPBoostedRewardEligibility pub service_provider_boosted_reward_eligibility: SPBoostedRewardEligibility, - /// Derived Eligibility for Boosted Hex Rewards - pub boosted_hex_eligibility: SpBoostedHexStatus, + /// Derived Eligibility for Service Provider Boosted Hex Rewards + pub sp_boosted_hex_eligibility: SpBoostedHexStatus, + /// Derived Eligibility for Oracle Boosted Hex Rewards + pub oracle_boosted_hex_eligibility: OracleBoostingStatus, /// Speedtests used in calculation pub speedtests: Vec, /// Location Trust Scores used in calculation @@ -157,11 +163,11 @@ impl CoveragePoints { speedtests: Vec, location_trust_scores: Vec, ranked_coverage: Vec, - oracle_boosting_status: OracleBoostingStatus, + oracle_boost_status: OracleBoostingStatus, ) -> Result { let location_trust_multiplier = location::multiplier(radio_type, &location_trust_scores); - let boost_eligibility = SpBoostedHexStatus::new( + let sp_boost_eligibility = SpBoostedHexStatus::new( radio_type, location_trust_multiplier, &location_trust_scores, @@ -170,9 +176,9 @@ impl CoveragePoints { let covered_hexes = hexes::clean_covered_hexes( radio_type, - boost_eligibility, + sp_boost_eligibility, ranked_coverage, - oracle_boosting_status, + oracle_boost_status, )?; let hex_coverage_points = hexes::calculated_coverage_points(&covered_hexes); @@ -187,7 +193,8 @@ impl CoveragePoints { speedtest_avg, radio_type, service_provider_boosted_reward_eligibility, - boosted_hex_eligibility: boost_eligibility, + sp_boosted_hex_eligibility: sp_boost_eligibility, + oracle_boosted_hex_eligibility: oracle_boost_status, speedtests, location_trust_scores, covered_hexes, @@ -230,7 +237,7 @@ impl CoveragePoints { } fn boosted_points(&self) -> Decimal { - match self.boosted_hex_eligibility { + match self.sp_boosted_hex_eligibility { SpBoostedHexStatus::Eligible => self.coverage_points.boosted, SpBoostedHexStatus::WifiLocationScoreBelowThreshold(_) => dec!(0), SpBoostedHexStatus::AverageAssertedDistanceOverLimit(_) => dec!(0), @@ -244,6 +251,7 @@ impl CoveragePoints { pub enum OracleBoostingStatus { Eligible, Banned, + Qualified, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] diff --git a/file_store/src/file_info.rs b/file_store/src/file_info.rs index be926a53a..4371c276e 100644 --- a/file_store/src/file_info.rs +++ b/file_store/src/file_info.rs @@ -173,6 +173,8 @@ pub const VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT: &str = pub const PROMOTION_REWARD_INGEST_REPORT: &str = "promotion_reward_ingest_report"; pub const VERIFIED_PROMOTION_REWARD: &str = "verified_promotion_reward"; pub const SERVICE_PROVIDER_PROMOTION_FUND: &str = "service_provider_promotion_fund"; +pub const UNIQUE_CONNECTIONS_REPORT: &str = "unique_connections_report"; +pub const VERIFIED_UNIQUE_CONNECTIONS_REPORT: &str = "verified_unique_connections_report"; #[derive(Debug, PartialEq, Eq, Clone, Serialize, Copy, strum::EnumCount)] #[serde(rename_all = "snake_case")] @@ -237,87 +239,13 @@ pub enum FileType { RadioUsageStatsIngestReport, HexUsageStatsReq, RadioUsageStatsReq, + UniqueConnectionsReport, + VerifiedUniqueConnectionsReport, } impl fmt::Display for FileType { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let s = match self { - Self::InvalidatedRadioThresholdReq => INVALIDATED_RADIO_THRESHOLD_REQ, - Self::InvalidatedRadioThresholdIngestReport => { - INVALIDATED_RADIO_THRESHOLD_INGEST_REPORT - } - Self::VerifiedInvalidatedRadioThresholdIngestReport => { - VERIFIED_INVALIDATED_RADIO_THRESHOLD_INGEST_REPORT - } - Self::RadioThresholdReq => RADIO_THRESHOLD_REQ, - Self::RadioThresholdIngestReport => RADIO_THRESHOLD_INGEST_REPORT, - Self::VerifiedRadioThresholdIngestReport => VERIFIED_RADIO_THRESHOLD_INGEST_REPORT, - Self::SubscriberLocationReq => SUBSCRIBER_LOCATION_REQ, - Self::SubscriberLocationIngestReport => SUBSCRIBER_LOCATION_INGEST_REPORT, - Self::VerifiedSubscriberLocationIngestReport => { - VERIFIED_SUBSCRIBER_LOCATION_INGEST_REPORT - } - Self::CbrsHeartbeat => CBRS_HEARTBEAT, - Self::WifiHeartbeat => WIFI_HEARTBEAT, - Self::CellSpeedtest => CELL_SPEEDTEST, - Self::VerifiedSpeedtest => VERIFIED_SPEEDTEST, - Self::CbrsHeartbeatIngestReport => CELL_HEARTBEAT_INGEST_REPORT, - Self::WifiHeartbeatIngestReport => WIFI_HEARTBEAT_INGEST_REPORT, - Self::CellSpeedtestIngestReport => CELL_SPEEDTEST_INGEST_REPORT, - Self::Entropy => ENTROPY, - Self::SubnetworkRewards => SUBNETWORK_REWARDS, - Self::EntropyReport => ENTROPY_REPORT, - Self::IotBeaconIngestReport => IOT_BEACON_INGEST_REPORT, - Self::IotWitnessIngestReport => IOT_WITNESS_INGEST_REPORT, - Self::IotPoc => IOT_POC, - Self::IotInvalidBeaconReport => IOT_INVALID_BEACON_REPORT, - Self::IotInvalidWitnessReport => IOT_INVALID_WITNESS_REPORT, - Self::SpeedtestAvg => SPEEDTEST_AVG, - Self::ValidatedHeartbeat => VALIDATED_HEARTBEAT, - Self::SignedPocReceiptTxn => SIGNED_POC_RECEIPT_TXN, - Self::RadioRewardShare => RADIO_REWARD_SHARE, - Self::RewardManifest => REWARD_MANIFEST, - Self::IotPacketReport => IOT_PACKET_REPORT, - Self::IotValidPacket => IOT_VALID_PACKET, - Self::InvalidPacket => INVALID_PACKET, - Self::NonRewardablePacket => NON_REWARDABLE_PACKET, - Self::IotRewardShare => IOT_REWARD_SHARE, - Self::DataTransferSessionIngestReport => DATA_TRANSFER_SESSION_INGEST_REPORT, - Self::InvalidDataTransferSessionIngestReport => { - INVALID_DATA_TRANSFER_SESSION_INGEST_REPORT - } - Self::ValidDataTransferSession => VALID_DATA_TRANSFER_SESSION, - Self::VerifiedDataTransferSession => VERIFIED_DATA_TRANSFER_SESSION, - Self::PriceReport => PRICE_REPORT, - Self::MobileRewardShare => MOBILE_REWARD_SHARE, - Self::MapperMsg => MAPPER_MSG, - Self::CoverageObject => COVERAGE_OBJECT, - Self::CoverageObjectIngestReport => COVERAGE_OBJECT_INGEST_REPORT, - Self::SeniorityUpdate => SENIORITY_UPDATE, - Self::BoostedHexUpdate => BOOSTED_HEX_UPDATE, - Self::OracleBoostingReport => ORACLE_BOOSTING_REPORT, - Self::UrbanizationDataSet => URBANIZATION_DATA_SET, - Self::FootfallDataSet => FOOTFALL_DATA_SET, - Self::LandtypeDataSet => LANDTYPE_DATA_SET, - Self::SPBoostedRewardsBannedRadioIngestReport => SP_BOOSTED_REWARDS_BANNED_RADIO, - Self::VerifiedSPBoostedRewardsBannedRadioIngestReport => { - VERIFIED_SP_BOOSTED_REWARDS_BANNED_RADIO - } - Self::SubscriberVerifiedMappingEventIngestReport => { - SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT - } - Self::VerifiedSubscriberVerifiedMappingEventIngestReport => { - VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT - } - Self::PromotionRewardIngestReport => PROMOTION_REWARD_INGEST_REPORT, - Self::VerifiedPromotionReward => VERIFIED_PROMOTION_REWARD, - Self::ServiceProviderPromotionFund => SERVICE_PROVIDER_PROMOTION_FUND, - Self::HexUsageStatsIngestReport => HEX_USAGE_STATS_INGEST_REPORT, - Self::RadioUsageStatsIngestReport => RADIO_USAGE_STATS_INGEST_REPORT, - Self::HexUsageStatsReq => HEX_USAGE_STATS_REQ, - Self::RadioUsageStatsReq => RADIO_USAGE_STATS_REQ, - }; - f.write_str(s) + f.write_str(self.to_str()) } } @@ -398,6 +326,8 @@ impl FileType { Self::RadioUsageStatsIngestReport => RADIO_USAGE_STATS_INGEST_REPORT, Self::HexUsageStatsReq => HEX_USAGE_STATS_REQ, Self::RadioUsageStatsReq => RADIO_USAGE_STATS_REQ, + Self::UniqueConnectionsReport => UNIQUE_CONNECTIONS_REPORT, + Self::VerifiedUniqueConnectionsReport => VERIFIED_UNIQUE_CONNECTIONS_REPORT, } } } @@ -479,6 +409,8 @@ impl FromStr for FileType { RADIO_USAGE_STATS_INGEST_REPORT => Self::RadioUsageStatsIngestReport, HEX_USAGE_STATS_REQ => Self::HexUsageStatsReq, RADIO_USAGE_STATS_REQ => Self::RadioUsageStatsReq, + UNIQUE_CONNECTIONS_REPORT => Self::UniqueConnectionsReport, + VERIFIED_UNIQUE_CONNECTIONS_REPORT => Self::VerifiedUniqueConnectionsReport, _ => return Err(Error::from(io::Error::from(io::ErrorKind::InvalidInput))), }; Ok(result) diff --git a/file_store/src/lib.rs b/file_store/src/lib.rs index b067c7072..37ed8fb85 100644 --- a/file_store/src/lib.rs +++ b/file_store/src/lib.rs @@ -26,6 +26,7 @@ pub mod speedtest; pub mod subscriber_verified_mapping_event; pub mod subscriber_verified_mapping_event_ingest_report; pub mod traits; +pub mod unique_connections; pub mod usage_counts; pub mod verified_subscriber_verified_mapping_event_ingest_report; pub mod wifi_heartbeat; diff --git a/file_store/src/traits/file_sink_write.rs b/file_store/src/traits/file_sink_write.rs index 4ffeaeca6..bc6a6d468 100644 --- a/file_store/src/traits/file_sink_write.rs +++ b/file_store/src/traits/file_sink_write.rs @@ -263,6 +263,16 @@ impl_file_sink!( FileType::RadioUsageStatsIngestReport.to_str(), "hotspot_usage_counts_ingest_report" ); +impl_file_sink!( + poc_mobile::UniqueConnectionsIngestReportV1, + FileType::UniqueConnectionsReport.to_str(), + "unique_connections_report" +); +impl_file_sink!( + poc_mobile::VerifiedUniqueConnectionsIngestReportV1, + FileType::VerifiedUniqueConnectionsReport.to_str(), + "verified_unique_connections_report" +); impl_file_sink!( proto::BoostedHexUpdateV1, FileType::BoostedHexUpdate.to_str(), diff --git a/file_store/src/traits/msg_verify.rs b/file_store/src/traits/msg_verify.rs index 4cbdbcc03..6f88c0ab6 100644 --- a/file_store/src/traits/msg_verify.rs +++ b/file_store/src/traits/msg_verify.rs @@ -101,6 +101,7 @@ impl_msg_verify!(mobile_config::BoostedHexInfoStreamResV1, signature); impl_msg_verify!(poc_mobile::SubscriberVerifiedMappingEventReqV1, signature); impl_msg_verify!(poc_mobile::HexUsageStatsReqV1, signature); impl_msg_verify!(poc_mobile::RadioUsageStatsReqV1, signature); +impl_msg_verify!(poc_mobile::UniqueConnectionsReqV1, signature); #[cfg(test)] mod test { diff --git a/file_store/src/unique_connections.rs b/file_store/src/unique_connections.rs new file mode 100644 index 000000000..ae3332ccb --- /dev/null +++ b/file_store/src/unique_connections.rs @@ -0,0 +1,108 @@ +use chrono::{DateTime, Utc}; +use helium_crypto::PublicKeyBinary; +use serde::{Deserialize, Serialize}; + +use crate::{ + traits::{MsgDecode, TimestampDecode}, + Error, +}; + +pub mod proto { + pub use helium_proto::services::poc_mobile::{ + UniqueConnectionsIngestReportV1, UniqueConnectionsReqV1, + VerifiedUniqueConnectionsIngestReportStatus, VerifiedUniqueConnectionsIngestReportV1, + }; +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UniqueConnectionsIngestReport { + pub received_timestamp: DateTime, + pub report: UniqueConnectionReq, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct VerifiedUniqueConnectionsIngestReport { + pub timestamp: DateTime, + pub report: UniqueConnectionsIngestReport, + pub status: proto::VerifiedUniqueConnectionsIngestReportStatus, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UniqueConnectionReq { + pub pubkey: PublicKeyBinary, + pub start_timestamp: DateTime, + pub end_timestamp: DateTime, + pub unique_connections: u64, + pub timestamp: DateTime, + pub carrier_key: PublicKeyBinary, + pub signature: Vec, +} + +impl MsgDecode for UniqueConnectionsIngestReport { + type Msg = proto::UniqueConnectionsIngestReportV1; +} + +impl TryFrom for UniqueConnectionsIngestReport { + type Error = Error; + + fn try_from(value: proto::UniqueConnectionsIngestReportV1) -> Result { + Ok(Self { + received_timestamp: value.received_timestamp.to_timestamp()?, + report: value + .report + .ok_or_else(|| Error::not_found("ingest unique connections"))? + .try_into()?, + }) + } +} + +impl From for proto::UniqueConnectionsIngestReportV1 { + fn from(value: UniqueConnectionsIngestReport) -> Self { + Self { + received_timestamp: value.received_timestamp.timestamp_millis() as u64, + report: Some(value.report.into()), + } + } +} + +impl TryFrom for UniqueConnectionReq { + type Error = Error; + + fn try_from(value: proto::UniqueConnectionsReqV1) -> Result { + Ok(Self { + pubkey: value.pubkey.into(), + start_timestamp: value.start_timestamp.to_timestamp_millis()?, + end_timestamp: value.end_timestamp.to_timestamp_millis()?, + unique_connections: value.unique_connections, + timestamp: value.timestamp.to_timestamp_millis()?, + carrier_key: value.carrier_key.into(), + signature: value.signature, + }) + } +} + +impl From for proto::UniqueConnectionsReqV1 { + fn from(value: UniqueConnectionReq) -> Self { + Self { + pubkey: value.pubkey.into(), + start_timestamp: value.start_timestamp.timestamp_millis() as u64, + end_timestamp: value.end_timestamp.timestamp_millis() as u64, + unique_connections: value.unique_connections, + timestamp: value.timestamp.timestamp_millis() as u64, + carrier_key: value.carrier_key.into(), + signature: value.signature, + } + } +} + +impl From + for proto::VerifiedUniqueConnectionsIngestReportV1 +{ + fn from(value: VerifiedUniqueConnectionsIngestReport) -> Self { + Self { + timestamp: value.timestamp.timestamp_millis() as u64, + report: Some(value.report.into()), + status: value.status.into(), + } + } +} diff --git a/ingest/src/server_mobile.rs b/ingest/src/server_mobile.rs index 7096c0572..f2434c150 100644 --- a/ingest/src/server_mobile.rs +++ b/ingest/src/server_mobile.rs @@ -9,7 +9,6 @@ use file_store::{ use futures::future::LocalBoxFuture; use futures_util::TryFutureExt; use helium_crypto::{Network, PublicKey, PublicKeyBinary}; -use helium_proto::services::mobile_config::NetworkKeyRole; use helium_proto::services::poc_mobile::{ self, CellHeartbeatIngestReportV1, CellHeartbeatReqV1, CellHeartbeatRespV1, CoverageObjectIngestReportV1, CoverageObjectReqV1, CoverageObjectRespV1, @@ -23,8 +22,12 @@ use helium_proto::services::poc_mobile::{ SpeedtestIngestReportV1, SpeedtestReqV1, SpeedtestRespV1, SubscriberLocationIngestReportV1, SubscriberLocationReqV1, SubscriberLocationRespV1, SubscriberVerifiedMappingEventIngestReportV1, SubscriberVerifiedMappingEventReqV1, - SubscriberVerifiedMappingEventResV1, WifiHeartbeatIngestReportV1, WifiHeartbeatReqV1, - WifiHeartbeatRespV1, + SubscriberVerifiedMappingEventResV1, UniqueConnectionsIngestReportV1, + WifiHeartbeatIngestReportV1, WifiHeartbeatReqV1, WifiHeartbeatRespV1, +}; +use helium_proto::services::{ + mobile_config::NetworkKeyRole, + poc_mobile::{UniqueConnectionsReqV1, UniqueConnectionsRespV1}, }; use mobile_config::client::{authorization_client::AuthorizationVerifier, AuthorizationClient}; use std::{net::SocketAddr, path::Path}; @@ -52,6 +55,7 @@ pub struct GrpcServer { subscriber_mapping_event_sink: FileSinkClient, hex_usage_stats_event_sink: FileSinkClient, radio_usage_stats_event_sink: FileSinkClient, + unique_connections_sink: FileSinkClient, required_network: Network, address: SocketAddr, api_token: MetadataValue, @@ -100,6 +104,7 @@ where subscriber_mapping_event_sink: FileSinkClient, hex_usage_stats_event_sink: FileSinkClient, radio_usage_stats_event_sink: FileSinkClient, + unique_connections_sink: FileSinkClient, required_network: Network, address: SocketAddr, api_token: MetadataValue, @@ -118,6 +123,7 @@ where subscriber_mapping_event_sink, hex_usage_stats_event_sink, radio_usage_stats_event_sink, + unique_connections_sink, required_network, address, api_token, @@ -523,6 +529,31 @@ where let id = timestamp.to_string(); Ok(Response::new(RadioUsageStatsResV1 { id })) } + + async fn submit_unique_connections( + &self, + request: Request, + ) -> GrpcResult { + let received_timestamp = Utc::now().timestamp_millis() as u64; + let event = request.into_inner(); + + let timestamp = event.timestamp; + + custom_tracing::record_b58("pub_key", &event.pubkey); + + let report = self + .verify_public_key(event.carrier_key.as_ref()) + .and_then(|public_key| self.verify_network(public_key)) + .and_then(|public_key| self.verify_signature(public_key, event)) + .map(|(_, event)| UniqueConnectionsIngestReportV1 { + received_timestamp, + report: Some(event), + })?; + + _ = self.unique_connections_sink.write(report, []).await; + + Ok(Response::new(UniqueConnectionsRespV1 { timestamp })) + } } pub async fn grpc_server(settings: &Settings) -> Result<()> { @@ -652,6 +683,16 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { ) .await?; + let (unique_connections_sink, unique_connections_server) = + UniqueConnectionsIngestReportV1::file_sink( + store_base_path, + file_upload.clone(), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(settings.roll_time), + env!("CARGO_PKG_NAME"), + ) + .await?; + let Some(api_token) = settings .token .as_ref() @@ -677,6 +718,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { subscriber_mapping_event_sink, hex_usage_stats_event_sink, radio_usage_stats_event_sink, + unique_connections_sink, settings.network, settings.listen_addr, api_token, @@ -703,6 +745,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { .add_task(subscriber_mapping_event_server) .add_task(hex_usage_stats_event_server) .add_task(radio_usage_stats_event_server) + .add_task(unique_connections_server) .add_task(grpc_server) .build() .start() diff --git a/ingest/tests/common/mod.rs b/ingest/tests/common/mod.rs index 79061ff1a..bb28c4065 100644 --- a/ingest/tests/common/mod.rs +++ b/ingest/tests/common/mod.rs @@ -1,10 +1,12 @@ use anyhow::bail; use backon::{ExponentialBuilder, Retryable}; +use chrono::{DateTime, Utc}; use file_store::file_sink::FileSinkClient; use helium_crypto::{KeyTag, Keypair, Network, PublicKeyBinary, Sign}; use helium_proto::services::poc_mobile::{ HexUsageStatsIngestReportV1, HexUsageStatsReqV1, HexUsageStatsResV1, RadioUsageStatsIngestReportV1, RadioUsageStatsReqV1, RadioUsageStatsResV1, + UniqueConnectionsIngestReportV1, UniqueConnectionsReqV1, UniqueConnectionsRespV1, }; use helium_proto::services::{ mobile_config::NetworkKeyRole, @@ -75,6 +77,7 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> { let (subscriber_mapping_tx, subscriber_mapping_rx) = tokio::sync::mpsc::channel(10); let (hex_usage_stat_tx, hex_usage_stat_rx) = tokio::sync::mpsc::channel(10); let (radio_usage_stat_tx, radio_usage_stat_rx) = tokio::sync::mpsc::channel(10); + let (unique_connections_tx, unique_connections_rx) = tokio::sync::mpsc::channel(10); let auth_client = MockAuthorizationClient::new(); @@ -92,6 +95,7 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> { FileSinkClient::new(subscriber_mapping_tx, "test_file_sink"), FileSinkClient::new(hex_usage_stat_tx, "hex_usage_test_file_sink"), FileSinkClient::new(radio_usage_stat_tx, "radio_usage_test_file_sink"), + FileSinkClient::new(unique_connections_tx, "noop"), Network::MainNet, socket_addr, api_token, @@ -108,6 +112,7 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> { subscriber_mapping_rx, hex_usage_stat_rx, radio_usage_stat_rx, + unique_connections_rx, ) .await; @@ -124,6 +129,8 @@ pub struct TestClient { Receiver>, radio_usage_stats_file_sink_rx: Receiver>, + unique_connections_file_sink_rx: + Receiver>, } impl TestClient { @@ -140,6 +147,9 @@ impl TestClient { radio_usage_stats_file_sink_rx: Receiver< file_store::file_sink::Message, >, + unique_connections_file_sink_rx: Receiver< + file_store::file_sink::Message, + >, ) -> TestClient { let client = (|| PocMobileClient::connect(format!("http://{socket_addr}"))) .retry(&ExponentialBuilder::default()) @@ -153,6 +163,26 @@ impl TestClient { subscriber_mapping_file_sink_rx, hex_usage_stats_file_sink_rx, radio_usage_stats_file_sink_rx, + unique_connections_file_sink_rx, + } + } + + pub async fn unique_connection_recv( + mut self, + ) -> anyhow::Result { + match timeout( + Duration::from_secs(2), + self.unique_connections_file_sink_rx.recv(), + ) + .await + { + Ok(Some(msg)) => match msg { + file_store::file_sink::Message::Data(_, data) => Ok(data), + file_store::file_sink::Message::Commit(_) => bail!("got Commit"), + file_store::file_sink::Message::Rollback(_) => bail!("got Rollback"), + }, + Ok(None) => bail!("got none"), + Err(reason) => bail!("got error {reason}"), } } @@ -209,6 +239,35 @@ impl TestClient { } } + pub async fn submit_unique_connections( + &mut self, + pubkey: Vec, + start_timestamp: DateTime, + end_timestamp: DateTime, + unique_connections: u64, + ) -> anyhow::Result { + let mut req = UniqueConnectionsReqV1 { + pubkey, + start_timestamp: start_timestamp.timestamp_millis() as u64, + end_timestamp: end_timestamp.timestamp_millis() as u64, + unique_connections, + timestamp: 0, + carrier_key: self.key_pair.public_key().into(), + signature: vec![], + }; + + req.signature = self.key_pair.sign(&req.encode_to_vec()).expect("sign"); + + let mut request = Request::new(req); + let metadata = request.metadata_mut(); + + metadata.insert("authorization", self.authorization.clone()); + + let res = self.client.submit_unique_connections(request).await?; + + Ok(res.into_inner()) + } + pub async fn submit_verified_subscriber_mapping_event( &mut self, subscriber_id: Vec, diff --git a/ingest/tests/mobile_ingest.rs b/ingest/tests/mobile_ingest.rs index cdb7842fe..aa9fcb478 100644 --- a/ingest/tests/mobile_ingest.rs +++ b/ingest/tests/mobile_ingest.rs @@ -1,3 +1,4 @@ +use chrono::Utc; use helium_crypto::PublicKeyBinary; use std::str::FromStr; @@ -5,6 +6,35 @@ mod common; const PUBKEY1: &str = "113HRxtzxFbFUjDEJJpyeMRZRtdAW38LAUnB5mshRwi6jt7uFbt"; +#[tokio::test] +async fn submit_unique_connections() -> anyhow::Result<()> { + let (mut client, trigger) = common::setup_mobile().await?; + + let pubkey = PublicKeyBinary::from_str(PUBKEY1)?; + let timestamp = Utc::now(); + let end = timestamp - chrono::Duration::days(1); + let start = end - chrono::Duration::days(7); + + const UNIQUE_CONNECTIONS: u64 = 42; + + let response = client + .submit_unique_connections(pubkey.into(), start, end, UNIQUE_CONNECTIONS) + .await?; + + let report = client.unique_connection_recv().await?; + + let Some(inner_report) = report.report else { + anyhow::bail!("No report found") + }; + + assert_eq!(inner_report.timestamp, response.timestamp); + assert_eq!(inner_report.unique_connections, UNIQUE_CONNECTIONS); + + trigger.trigger(); + + Ok(()) +} + #[tokio::test] async fn submit_verified_subscriber_mapping_event() -> anyhow::Result<()> { let (mut client, trigger) = common::setup_mobile().await?; diff --git a/mobile_verifier/migrations/39_unique_connections-up.sql b/mobile_verifier/migrations/39_unique_connections-up.sql new file mode 100644 index 000000000..ac3122be0 --- /dev/null +++ b/mobile_verifier/migrations/39_unique_connections-up.sql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS unique_connections ( + hotspot_pubkey TEXT NOT NULL, + unique_connections BIGINT NOT NULL, + start_timestamp TIMESTAMPTZ NOT NULL, + end_timestamp TIMESTAMPTZ NOT NULL, + received_timestamp TIMESTAMPTZ NOT NULL, + primary key(hotspot_pubkey, received_timestamp) +); + diff --git a/mobile_verifier/src/cli/reward_from_db.rs b/mobile_verifier/src/cli/reward_from_db.rs index d0f9dfa60..ef94596cf 100644 --- a/mobile_verifier/src/cli/reward_from_db.rs +++ b/mobile_verifier/src/cli/reward_from_db.rs @@ -6,6 +6,7 @@ use crate::{ rewarder::boosted_hex_eligibility::BoostedHexEligibility, sp_boosted_rewards_bans::BannedRadios, speedtests_average::SpeedtestAverages, + unique_connections::UniqueConnectionCounts, Settings, }; use anyhow::Result; @@ -50,6 +51,7 @@ impl Cmd { &BoostedHexes::default(), &BoostedHexEligibility::default(), &BannedRadios::default(), + &UniqueConnectionCounts::default(), &epoch, ) .await?; diff --git a/mobile_verifier/src/cli/server.rs b/mobile_verifier/src/cli/server.rs index b7ab815fd..539f0eb16 100644 --- a/mobile_verifier/src/cli/server.rs +++ b/mobile_verifier/src/cli/server.rs @@ -12,7 +12,9 @@ use crate::{ speedtests::SpeedtestDaemon, subscriber_location::SubscriberLocationIngestor, subscriber_verified_mapping_event::SubscriberVerifiedMappingEventDaemon, - telemetry, Settings, + telemetry, + unique_connections::ingestor::UniqueConnectionsIngestor, + Settings, }; use anyhow::Result; use file_store::{ @@ -193,6 +195,16 @@ impl Cmd { ) .await?, ) + .add_task( + UniqueConnectionsIngestor::create_managed_task( + pool.clone(), + settings, + file_upload.clone(), + report_ingest.clone(), + auth_client.clone(), + ) + .await?, + ) .add_task(DataSessionIngestor::create_managed_task(pool.clone(), settings).await?) .add_task( ServiceProviderBoostedRewardsBanIngestor::create_managed_task( diff --git a/mobile_verifier/src/lib.rs b/mobile_verifier/src/lib.rs index 9fc3757c0..d14bf4ee3 100644 --- a/mobile_verifier/src/lib.rs +++ b/mobile_verifier/src/lib.rs @@ -17,6 +17,7 @@ pub mod speedtests_average; pub mod subscriber_location; pub mod subscriber_verified_mapping_event; pub mod telemetry; +pub mod unique_connections; pub use settings::Settings; diff --git a/mobile_verifier/src/radio_threshold.rs b/mobile_verifier/src/radio_threshold.rs index f3cfe8fe5..0b9873eab 100644 --- a/mobile_verifier/src/radio_threshold.rs +++ b/mobile_verifier/src/radio_threshold.rs @@ -344,7 +344,7 @@ pub async fn save( .bind(ingest_report.report.subscriber_threshold as i32) .bind(ingest_report.report.threshold_timestamp) .bind(ingest_report.received_timestamp) - .execute(&mut *db) + .execute(db) .await?; Ok(()) } diff --git a/mobile_verifier/src/reward_shares.rs b/mobile_verifier/src/reward_shares.rs index c2ded2b9c..5d3901981 100644 --- a/mobile_verifier/src/reward_shares.rs +++ b/mobile_verifier/src/reward_shares.rs @@ -1,9 +1,14 @@ use crate::{ - coverage::CoveredHexStream, data_session::HotspotMap, heartbeats::HeartbeatReward, - rewarder::boosted_hex_eligibility::BoostedHexEligibility, seniority::Seniority, - sp_boosted_rewards_bans::BannedRadios, speedtests_average::SpeedtestAverages, + coverage::CoveredHexStream, + data_session::HotspotMap, + heartbeats::HeartbeatReward, + rewarder::boosted_hex_eligibility::BoostedHexEligibility, + seniority::Seniority, + sp_boosted_rewards_bans::BannedRadios, + speedtests_average::SpeedtestAverages, subscriber_location::SubscriberValidatedLocations, subscriber_verified_mapping_event::VerifiedSubscriberVerifiedMappingEventShares, + unique_connections::{self, UniqueConnectionCounts}, }; use chrono::{DateTime, Duration, Utc}; use coverage_point_calculator::{OracleBoostingStatus, SPBoostedRewardEligibility}; @@ -355,7 +360,8 @@ pub fn coverage_point_to_mobile_reward_share( ), speedtests: coverage_points.proto_speedtests(), speedtest_multiplier: Some(coverage_points.speedtest_multiplier.proto_decimal()), - boosted_hex_status: coverage_points.proto_boosted_hex_status().into(), + sp_boosted_hex_status: coverage_points.proto_sp_boosted_hex_status().into(), + oracle_boosted_hex_status: coverage_points.proto_oracle_boosted_hex_status().into(), covered_hexes: coverage_points.proto_covered_hexes(), speedtest_average: Some(coverage_points.proto_speedtest_avg()), }); @@ -398,6 +404,7 @@ pub struct CoverageShares { } impl CoverageShares { + #[allow(clippy::too_many_arguments)] pub async fn new( hex_streams: &impl CoveredHexStream, heartbeats: impl Stream>, @@ -405,6 +412,7 @@ impl CoverageShares { boosted_hexes: &BoostedHexes, boosted_hex_eligibility: &BoostedHexEligibility, banned_radios: &BannedRadios, + unique_connections: &UniqueConnectionCounts, reward_period: &Range>, ) -> anyhow::Result { let mut radio_infos: HashMap = HashMap::new(); @@ -475,11 +483,14 @@ impl CoverageShares { }) .collect(); - let oracle_boosting_status = if banned_radios.contains(&pubkey, cbsd_id.as_deref()) { - OracleBoostingStatus::Banned - } else { - OracleBoostingStatus::Eligible - }; + let oracle_boosting_status = + if unique_connections::is_qualified(unique_connections, &pubkey, &radio_type) { + OracleBoostingStatus::Qualified + } else if banned_radios.contains(&pubkey, cbsd_id.as_deref()) { + OracleBoostingStatus::Banned + } else { + OracleBoostingStatus::Eligible + }; let sp_boosted_reward_eligibility = boosted_hex_eligibility.eligibility(pubkey, cbsd_id); @@ -798,6 +809,14 @@ mod test { } } + fn bad_hex_assignments_mock() -> HexAssignments { + HexAssignments { + footfall: Assignment::C, + urbanized: Assignment::C, + landtype: Assignment::C, + } + } + #[test] fn ensure_correct_conversion_of_bytes_to_bones() { assert_eq!( @@ -1112,6 +1131,24 @@ mod test { }] } + fn bad_hex_coverage<'a>(key: impl Into>, hex: u64) -> Vec { + let key = key.into(); + let radio_key = key.to_owned(); + let hex = hex.try_into().expect("valid h3 cell"); + + vec![HexCoverage { + uuid: Uuid::new_v4(), + hex, + indoor: true, + radio_key, + signal_level: crate::coverage::SignalLevel::Low, + signal_power: 0, + coverage_claim_time: DateTime::::MIN_UTC, + inserted_at: DateTime::::MIN_UTC, + assignments: bad_hex_assignments_mock(), + }] + } + #[tokio::test] async fn check_speedtest_avg_in_radio_reward_v2() { let owner1: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" @@ -1187,6 +1224,7 @@ mod test { &BoostedHexes::default(), &BoostedHexEligibility::default(), &BannedRadios::default(), + &UniqueConnectionCounts::default(), &epoch, ) .await @@ -1590,6 +1628,7 @@ mod test { &BoostedHexes::default(), &BoostedHexEligibility::default(), &BannedRadios::default(), + &UniqueConnectionCounts::default(), &epoch, ) .await @@ -1771,6 +1810,7 @@ mod test { &BoostedHexes::default(), &BoostedHexEligibility::default(), &BannedRadios::default(), + &UniqueConnectionCounts::default(), &epoch, ) .await @@ -1905,6 +1945,7 @@ mod test { &BoostedHexes::default(), &BoostedHexEligibility::default(), &BannedRadios::default(), + &UniqueConnectionCounts::default(), &epoch, ) .await @@ -2040,6 +2081,7 @@ mod test { &BoostedHexes::default(), &BoostedHexEligibility::default(), &BannedRadios::default(), + &UniqueConnectionCounts::default(), &epoch, ) .await @@ -2075,6 +2117,132 @@ mod test { assert_eq!(owner2_reward, 410_958_904_109); } + #[tokio::test] + async fn qualified_wifi_exempt_from_oracle_boosting_bad() { + // init owners + let owner1: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" + .parse() + .expect("failed owner1 parse"); + let owner2: PublicKeyBinary = "11sctWiP9r5wDJVuDe1Th4XSL2vaawaLLSQF8f8iokAoMAJHxqp" + .parse() + .expect("failed owner2 parse"); + // init hotspots + let gw1: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" + .parse() + .expect("failed gw1 parse"); + let gw2: PublicKeyBinary = "11sctWiP9r5wDJVuDe1Th4XSL2vaawaLLSQF8f8iokAoMAJHxqp" + .parse() + .expect("failed gw2 parse"); + // link gws to owners + let mut owners = HashMap::new(); + owners.insert(gw1.clone(), owner1.clone()); + owners.insert(gw2.clone(), owner2.clone()); + + let now = Utc::now(); + let timestamp = now - Duration::minutes(20); + + let g1_cov_obj = Uuid::new_v4(); + let g2_cov_obj = Uuid::new_v4(); + + // setup heartbeats + let heartbeat_rewards = vec![ + // add qualified wifi indoor HB + HeartbeatReward { + cbsd_id: None, + hotspot_key: gw1.clone(), + cell_type: CellType::NovaGenericWifiOutdoor, + coverage_object: g1_cov_obj, + distances_to_asserted: Some(vec![0]), + trust_score_multipliers: vec![dec!(1.0)], + }, + // add unqualified wifi indoor HB + HeartbeatReward { + cbsd_id: None, + hotspot_key: gw2.clone(), + cell_type: CellType::NovaGenericWifiOutdoor, + coverage_object: g2_cov_obj, + distances_to_asserted: None, + trust_score_multipliers: vec![dec!(1.0)], + }, + ] + .into_iter() + .map(Ok) + .collect::>>(); + + // setup speedtests + let last_speedtest = timestamp - Duration::hours(12); + let gw1_speedtests = vec![ + acceptable_speedtest(gw1.clone(), last_speedtest), + acceptable_speedtest(gw1.clone(), timestamp), + ]; + let gw2_speedtests = vec![ + acceptable_speedtest(gw2.clone(), last_speedtest), + acceptable_speedtest(gw2.clone(), timestamp), + ]; + + let gw1_average = SpeedtestAverage::from(gw1_speedtests); + let gw2_average = SpeedtestAverage::from(gw2_speedtests); + let mut averages = HashMap::new(); + averages.insert(gw1.clone(), gw1_average); + averages.insert(gw2.clone(), gw2_average); + + let speedtest_avgs = SpeedtestAverages { averages }; + let mut hex_coverage: HashMap<(OwnedKeyType, Uuid), Vec> = Default::default(); + hex_coverage.insert( + (OwnedKeyType::from(gw1.clone()), g1_cov_obj), + bad_hex_coverage(&gw1, 0x8a1fb46622dffff), + ); + hex_coverage.insert( + (OwnedKeyType::from(gw2.clone()), g2_cov_obj), + bad_hex_coverage(&gw2, 0x8a1fb46642dffff), + ); + + // calculate the rewards for the group + let mut owner_rewards = HashMap::::new(); + let duration = Duration::hours(1); + let epoch = (now - duration)..now; + + let reward_shares = DataTransferAndPocAllocatedRewardBuckets::new_poc_only(&epoch); + let unique_connection_counts = HashMap::from([(gw1.clone(), 42)]); + for (_reward_amount, _mobile_reward_v1, mobile_reward_v2) in CoverageShares::new( + &hex_coverage, + stream::iter(heartbeat_rewards), + &speedtest_avgs, + &BoostedHexes::default(), + &BoostedHexEligibility::default(), + &BannedRadios::default(), + &unique_connection_counts, + &epoch, + ) + .await + .unwrap() + .into_rewards(reward_shares, &epoch) + .unwrap() + .1 + { + let radio_reward = match mobile_reward_v2.reward { + Some(MobileReward::RadioRewardV2(radio_reward)) => radio_reward, + _ => unreachable!(), + }; + let owner = owners + .get(&PublicKeyBinary::from(radio_reward.hotspot_key)) + .expect("Could not find owner") + .clone(); + + let base = radio_reward.base_poc_reward; + let boosted = radio_reward.boosted_poc_reward; + *owner_rewards.entry(owner).or_default() += base + boosted; + } + + // qualified wifi + let owner1_reward = owner_rewards.get(&owner1); + assert!(owner1_reward.is_some()); + + // unqualified wifi + let owner2_reward = owner_rewards.get(&owner2); + assert!(owner2_reward.is_none()); + } + /// Test to ensure that rewards that are zeroed are not written out. #[tokio::test] async fn ensure_zeroed_rewards_are_not_written() { diff --git a/mobile_verifier/src/reward_shares/radio_reward_v2.rs b/mobile_verifier/src/reward_shares/radio_reward_v2.rs index 629e3ce13..22447511f 100644 --- a/mobile_verifier/src/reward_shares/radio_reward_v2.rs +++ b/mobile_verifier/src/reward_shares/radio_reward_v2.rs @@ -1,7 +1,7 @@ use file_store::traits::TimestampEncode; use helium_proto::services::poc_mobile::{ radio_reward_v2::{CoveredHex, LocationTrustScore}, - BoostedHexStatus, Speedtest, + OracleBoostedHexStatus, SpBoostedHexStatus, Speedtest, }; use rust_decimal::prelude::ToPrimitive; @@ -20,7 +20,8 @@ pub trait RadioRewardV2Ext { fn proto_location_trust_scores(&self) -> Vec; fn proto_speedtests(&self) -> Vec; fn proto_speedtest_avg(&self) -> Speedtest; - fn proto_boosted_hex_status(&self) -> BoostedHexStatus; + fn proto_sp_boosted_hex_status(&self) -> SpBoostedHexStatus; + fn proto_oracle_boosted_hex_status(&self) -> OracleBoostedHexStatus; fn proto_covered_hexes(&self) -> Vec; } @@ -57,20 +58,34 @@ impl RadioRewardV2Ext for coverage_point_calculator::CoveragePoints { .collect() } - fn proto_boosted_hex_status(&self) -> BoostedHexStatus { - match self.boosted_hex_eligibility { - coverage_point_calculator::SpBoostedHexStatus::Eligible => BoostedHexStatus::Eligible, + fn proto_sp_boosted_hex_status(&self) -> SpBoostedHexStatus { + match self.sp_boosted_hex_eligibility { + coverage_point_calculator::SpBoostedHexStatus::Eligible => SpBoostedHexStatus::Eligible, coverage_point_calculator::SpBoostedHexStatus::WifiLocationScoreBelowThreshold(_) => { - BoostedHexStatus::LocationScoreBelowThreshold + SpBoostedHexStatus::LocationScoreBelowThreshold } coverage_point_calculator::SpBoostedHexStatus::RadioThresholdNotMet => { - BoostedHexStatus::RadioThresholdNotMet + SpBoostedHexStatus::RadioThresholdNotMet } coverage_point_calculator::SpBoostedHexStatus::ServiceProviderBanned => { - BoostedHexStatus::ServiceProviderBan + SpBoostedHexStatus::ServiceProviderBan } coverage_point_calculator::SpBoostedHexStatus::AverageAssertedDistanceOverLimit(_) => { - BoostedHexStatus::AverageAssertedDistanceOverLimit + SpBoostedHexStatus::AverageAssertedDistanceOverLimit + } + } + } + + fn proto_oracle_boosted_hex_status(&self) -> OracleBoostedHexStatus { + match self.oracle_boosted_hex_eligibility { + coverage_point_calculator::OracleBoostingStatus::Eligible => { + OracleBoostedHexStatus::Eligible + } + coverage_point_calculator::OracleBoostingStatus::Banned => { + OracleBoostedHexStatus::Banned + } + coverage_point_calculator::OracleBoostingStatus::Qualified => { + OracleBoostedHexStatus::Qualified } } } diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index 8b1d3ccaa..80b26461c 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -10,7 +10,8 @@ use crate::{ service_provider::{self, ServiceProviderDCSessions, ServiceProviderPromotions}, sp_boosted_rewards_bans, speedtests, speedtests_average::SpeedtestAverages, - subscriber_location, subscriber_verified_mapping_event, telemetry, Settings, + subscriber_location, subscriber_verified_mapping_event, telemetry, unique_connections, + Settings, }; use anyhow::bail; use chrono::{DateTime, TimeZone, Utc}; @@ -50,6 +51,7 @@ use tokio::time::sleep; use self::boosted_hex_eligibility::BoostedHexEligibility; pub mod boosted_hex_eligibility; +mod db; const REWARDS_NOT_CURRENT_DELAY_PERIOD: i64 = 5; @@ -193,42 +195,28 @@ where &self, reward_period: &Range>, ) -> anyhow::Result { - // Check if we have heartbeats and speedtests past the end of the reward period + // Check if we have heartbeats and speedtests and unique connections past the end of the reward period if reward_period.end >= self.disable_complete_data_checks_until().await? { - if sqlx::query_scalar::<_, i64>( - "SELECT COUNT(*) FROM cbrs_heartbeats WHERE latest_timestamp >= $1", - ) - .bind(reward_period.end) - .fetch_one(&self.pool) - .await? - == 0 - { + if db::no_cbrs_heartbeats(&self.pool, reward_period).await? { tracing::info!("No cbrs heartbeats found past reward period"); return Ok(false); } - if sqlx::query_scalar::<_, i64>( - "SELECT COUNT(*) FROM wifi_heartbeats WHERE latest_timestamp >= $1", - ) - .bind(reward_period.end) - .fetch_one(&self.pool) - .await? - == 0 - { + if db::no_wifi_heartbeats(&self.pool, reward_period).await? { tracing::info!("No wifi heartbeats found past reward period"); return Ok(false); } - if sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM speedtests WHERE timestamp >= $1") - .bind(reward_period.end) - .fetch_one(&self.pool) - .await? - == 0 - { + if db::no_speedtests(&self.pool, reward_period).await? { tracing::info!("No speedtests found past reward period"); return Ok(false); } + if db::no_unique_connections(&self.pool, reward_period).await? { + tracing::info!("No unique connections found past reward period"); + return Ok(false); + } + if check_for_unprocessed_data_sets(&self.pool, reward_period.end).await? { tracing::info!("Data sets still need to be processed"); return Ok(false); @@ -302,6 +290,7 @@ where coverage::clear_coverage_objects(&mut transaction, &reward_period.start).await?; sp_boosted_rewards_bans::clear_bans(&mut transaction, reward_period.start).await?; subscriber_verified_mapping_event::clear(&mut transaction, &reward_period.start).await?; + unique_connections::db::clear(&mut transaction, &reward_period.start).await?; // subscriber_location::clear_location_shares(&mut transaction, &reward_period.end).await?; let next_reward_period = scheduler.next_reward_period(); @@ -450,6 +439,8 @@ async fn reward_poc( ) .await?; + let unique_connections = unique_connections::db::get(pool, reward_period).await?; + let coverage_shares = CoverageShares::new( pool, heartbeats, @@ -457,6 +448,7 @@ async fn reward_poc( &boosted_hexes, &boosted_hex_eligibility, &poc_banned_radios, + &unique_connections, reward_period, ) .await?; diff --git a/mobile_verifier/src/rewarder/db.rs b/mobile_verifier/src/rewarder/db.rs new file mode 100644 index 000000000..2735b4772 --- /dev/null +++ b/mobile_verifier/src/rewarder/db.rs @@ -0,0 +1,220 @@ +use std::ops::Range; + +use chrono::{DateTime, Utc}; +use sqlx::PgPool; + +pub async fn no_cbrs_heartbeats( + pool: &PgPool, + reward_period: &Range>, +) -> anyhow::Result { + let count = sqlx::query_scalar::<_, i64>( + "SELECT COUNT(*) FROM cbrs_heartbeats WHERE latest_timestamp >= $1", + ) + .bind(reward_period.end) + .fetch_one(pool) + .await?; + + Ok(count == 0) +} + +pub async fn no_wifi_heartbeats( + pool: &PgPool, + reward_period: &Range>, +) -> anyhow::Result { + let count = sqlx::query_scalar::<_, i64>( + "SELECT COUNT(*) FROM wifi_heartbeats WHERE latest_timestamp >= $1", + ) + .bind(reward_period.end) + .fetch_one(pool) + .await?; + + Ok(count == 0) +} + +pub async fn no_speedtests( + pool: &PgPool, + reward_period: &Range>, +) -> Result { + let count = + sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM speedtests WHERE timestamp >= $1") + .bind(reward_period.end) + .fetch_one(pool) + .await?; + + Ok(count == 0) +} + +pub async fn no_unique_connections( + pool: &PgPool, + reward_period: &Range>, +) -> anyhow::Result { + let count = sqlx::query_scalar::<_, i64>( + "SELECT COUNT(*) from unique_connections WHERE received_timestamp >= $1", + ) + .bind(reward_period.end) + .fetch_one(pool) + .await?; + + Ok(count == 0) +} + +#[cfg(test)] +mod tests { + + use helium_crypto::{KeyTag, Keypair, PublicKeyBinary}; + use rand::rngs::OsRng; + use rust_decimal_macros::dec; + + use crate::{cell_type, heartbeats, speedtests, unique_connections}; + + mod file_store { + pub use file_store::{ + speedtest::CellSpeedtest, + unique_connections::{UniqueConnectionReq, UniqueConnectionsIngestReport}, + }; + } + + mod proto { + pub use helium_proto::services::poc_mobile::{HeartbeatValidity, LocationSource}; + } + + use super::*; + + #[sqlx::test] + async fn test_empty_db(pool: PgPool) -> anyhow::Result<()> { + let reward_period = Utc::now() - chrono::Duration::days(1)..Utc::now(); + + // Reports not found + assert!(no_cbrs_heartbeats(&pool, &reward_period).await?); + assert!(no_wifi_heartbeats(&pool, &reward_period).await?); + assert!(no_speedtests(&pool, &reward_period).await?); + assert!(no_unique_connections(&pool, &reward_period).await?); + + Ok(()) + } + + #[sqlx::test] + async fn test_single_report_from_today(pool: PgPool) -> anyhow::Result<()> { + let reward_period = Utc::now() - chrono::Duration::days(1)..Utc::now(); + + let (cbrs_heartbeat, wifi_heartbeat, speedtest, unique_connection) = + create_with_timestamp(Utc::now()); + + let mut txn = pool.begin().await?; + cbrs_heartbeat.save(&mut txn).await?; + wifi_heartbeat.save(&mut txn).await?; + speedtests::save_speedtest(&speedtest, &mut txn).await?; + unique_connections::db::save(&mut txn, &[unique_connection]).await?; + txn.commit().await?; + + // Reports found + assert!(!no_cbrs_heartbeats(&pool, &reward_period).await?); + assert!(!no_wifi_heartbeats(&pool, &reward_period).await?); + assert!(!no_speedtests(&pool, &reward_period).await?); + assert!(!no_unique_connections(&pool, &reward_period).await?); + + Ok(()) + } + + #[sqlx::test] + async fn test_single_report_from_yesterday(pool: PgPool) -> anyhow::Result<()> { + let reward_period = Utc::now() - chrono::Duration::days(1)..Utc::now(); + + let (cbrs_heartbeat, wifi_heartbeat, speedtest, unique_connection) = + create_with_timestamp(Utc::now() - chrono::Duration::days(1)); + + let mut txn = pool.begin().await?; + cbrs_heartbeat.save(&mut txn).await?; + wifi_heartbeat.save(&mut txn).await?; + speedtests::save_speedtest(&speedtest, &mut txn).await?; + unique_connections::db::save(&mut txn, &[unique_connection]).await?; + txn.commit().await?; + + // Reports not found + assert!(no_cbrs_heartbeats(&pool, &reward_period).await?); + assert!(no_wifi_heartbeats(&pool, &reward_period).await?); + assert!(no_speedtests(&pool, &reward_period).await?); + assert!(no_unique_connections(&pool, &reward_period).await?); + + Ok(()) + } + + fn create_with_timestamp( + timestamp: DateTime, + ) -> ( + heartbeats::ValidatedHeartbeat, + heartbeats::ValidatedHeartbeat, + file_store::CellSpeedtest, + file_store::UniqueConnectionsIngestReport, + ) { + let cbrs_keypair = Keypair::generate(KeyTag::default(), &mut OsRng); + let cbrs_pubkey_bin: PublicKeyBinary = cbrs_keypair.public_key().to_owned().into(); + + let wifi_keypair = Keypair::generate(KeyTag::default(), &mut OsRng); + let wifi_pubkey_bin: PublicKeyBinary = wifi_keypair.public_key().to_owned().into(); + + let cbrs_heartbeat = heartbeats::ValidatedHeartbeat { + heartbeat: heartbeats::Heartbeat { + hb_type: heartbeats::HbType::Cbrs, + hotspot_key: cbrs_pubkey_bin.clone(), + cbsd_id: Some("cbsd-id".to_string()), + operation_mode: true, + lat: 0.0, + lon: 0.0, + coverage_object: Some(uuid::Uuid::new_v4()), + location_validation_timestamp: Some(Utc::now()), + location_source: proto::LocationSource::Asserted, + timestamp, + }, + cell_type: cell_type::CellType::Nova430I, + location_trust_score_multiplier: dec!(1), + distance_to_asserted: Some(0), + coverage_meta: None, + validity: proto::HeartbeatValidity::Valid, + }; + + let wifi_heartbeat = heartbeats::ValidatedHeartbeat { + heartbeat: heartbeats::Heartbeat { + hb_type: heartbeats::HbType::Wifi, + hotspot_key: wifi_pubkey_bin, + cbsd_id: None, + operation_mode: true, + lat: 0.0, + lon: 0.0, + coverage_object: Some(uuid::Uuid::new_v4()), + location_validation_timestamp: Some(Utc::now()), + location_source: proto::LocationSource::Asserted, + timestamp, + }, + cell_type: cell_type::CellType::Nova430I, + location_trust_score_multiplier: dec!(1), + distance_to_asserted: Some(0), + coverage_meta: None, + validity: proto::HeartbeatValidity::Valid, + }; + + let speedtest = file_store::CellSpeedtest { + pubkey: cbrs_pubkey_bin.clone(), + serial: "cbrs-serial".to_string(), + timestamp, + upload_speed: 1_000_000, + download_speed: 1_000_000, + latency: 0, + }; + + let unique_connection = file_store::UniqueConnectionsIngestReport { + received_timestamp: timestamp, + report: file_store::UniqueConnectionReq { + pubkey: cbrs_pubkey_bin.clone(), + start_timestamp: Utc::now() - chrono::Duration::days(7), + end_timestamp: Utc::now(), + unique_connections: 42, + timestamp: Utc::now(), + carrier_key: cbrs_pubkey_bin, + signature: vec![], + }, + }; + + (cbrs_heartbeat, wifi_heartbeat, speedtest, unique_connection) + } +} diff --git a/mobile_verifier/src/sp_boosted_rewards_bans.rs b/mobile_verifier/src/sp_boosted_rewards_bans.rs index 597de524f..e26a068e6 100644 --- a/mobile_verifier/src/sp_boosted_rewards_bans.rs +++ b/mobile_verifier/src/sp_boosted_rewards_bans.rs @@ -38,7 +38,7 @@ use crate::{ const CLEANUP_DAYS: i64 = 7; -struct BannedRadioReport { +pub struct BannedRadioReport { received_timestamp: DateTime, pubkey: PublicKeyBinary, key: OwnedKeyType, @@ -367,7 +367,7 @@ pub mod db { .map_err(anyhow::Error::from) } - pub(super) async fn update_report( + pub async fn update_report( transaction: &mut Transaction<'_, Postgres>, report: &BannedRadioReport, ) -> anyhow::Result<()> { diff --git a/mobile_verifier/src/unique_connections/db.rs b/mobile_verifier/src/unique_connections/db.rs new file mode 100644 index 000000000..cc36fade2 --- /dev/null +++ b/mobile_verifier/src/unique_connections/db.rs @@ -0,0 +1,155 @@ +use std::ops::Range; + +use chrono::{DateTime, Utc}; +use file_store::unique_connections::UniqueConnectionsIngestReport; +use futures::TryStreamExt; +use helium_crypto::PublicKeyBinary; +use sqlx::{FromRow, PgPool, Postgres, QueryBuilder, Transaction}; + +use super::UniqueConnectionCounts; + +pub async fn get( + db: &PgPool, + reward_period: &Range>, +) -> anyhow::Result { + #[derive(FromRow)] + struct UniqueConnections { + hotspot_pubkey: PublicKeyBinary, + #[sqlx(try_from = "i64")] + unique_connections: u64, + } + + let rows = sqlx::query_as::<_, UniqueConnections>( + r#" + SELECT DISTINCT ON(hotspot_pubkey) + hotspot_pubkey, unique_connections + FROM unique_connections + WHERE received_timestamp >= $1 AND received_timestamp < $2 + ORDER BY hotspot_pubkey, received_timestamp DESC + "#, + ) + .bind(reward_period.start) + .bind(reward_period.end) + .fetch(db) + .and_then(|row| async move { Ok((row.hotspot_pubkey, row.unique_connections)) }) + .try_collect() + .await?; + + Ok(rows) +} + +pub async fn save( + txn: &mut Transaction<'_, Postgres>, + reports: &[UniqueConnectionsIngestReport], +) -> Result<(), sqlx::Error> { + const BATCH_SIZE: usize = (u16::MAX / 5) as usize; + + for chunk in reports.chunks(BATCH_SIZE) { + QueryBuilder::new( + r#" + INSERT INTO unique_connections + (hotspot_pubkey, unique_connections, start_timestamp, end_timestamp, received_timestamp) + "#, + ) + .push_values(chunk, |mut b, report| { + b.push_bind(report.report.pubkey.to_string()) + .push_bind(report.report.unique_connections as i64) + .push_bind(report.report.start_timestamp) + .push_bind(report.report.end_timestamp) + .push_bind(report.received_timestamp); + }) + .push( + r#" + ON CONFLICT + (hotspot_pubkey, received_timestamp) + DO NOTHING + "#, + ) + .build() + .execute(&mut *txn) + .await?; + } + + Ok(()) +} + +pub async fn clear( + txn: &mut Transaction<'_, Postgres>, + timestamp: &DateTime, +) -> anyhow::Result<()> { + sqlx::query( + r#" + DELETE FROM unique_connections + WHERE received_timestamp < $1 + "#, + ) + .bind(timestamp) + .execute(txn) + .await?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use file_store::unique_connections::UniqueConnectionReq; + use helium_crypto::{KeyTag, Keypair}; + use rand::rngs::OsRng; + + use super::*; + + #[sqlx::test] + fn only_use_latest_within_window(pool: PgPool) -> anyhow::Result<()> { + // In the case connection counts need to be reprocessed, + // make sure we grab only the latest count for a radio + // when there may be more than one row for a radio in the window. + + let keypair = Keypair::generate(KeyTag::default(), &mut OsRng); + let pubkey_bin: PublicKeyBinary = keypair.public_key().to_owned().into(); + + let now = Utc::now(); + + let base_report = UniqueConnectionsIngestReport { + received_timestamp: Utc::now(), + report: UniqueConnectionReq { + pubkey: pubkey_bin.clone(), + start_timestamp: now - chrono::Duration::days(7), + end_timestamp: now, + unique_connections: 0, + timestamp: now, + carrier_key: pubkey_bin.clone(), + signature: vec![], + }, + }; + + // Prepare two reports for the same window. + // Both will be saved, but only the connection count in the second should be used. + let first = now - chrono::Duration::hours(5); + let second = now - chrono::Duration::hours(2); + + let report_one = UniqueConnectionsIngestReport { + received_timestamp: first, + report: UniqueConnectionReq { + unique_connections: 2, + ..base_report.report.clone() + }, + }; + let report_two = UniqueConnectionsIngestReport { + received_timestamp: second, + report: UniqueConnectionReq { + unique_connections: 1, + ..report_one.report.clone() + }, + }; + + let mut txn = pool.begin().await?; + save(&mut txn, &[report_one, report_two]).await?; + txn.commit().await?; + + let epoch = (now - chrono::Duration::days(1))..now; + let uniq_conns = get(&pool, &epoch).await?; + let conns = uniq_conns.get(&pubkey_bin).cloned().unwrap(); + assert_eq!(1, conns); + + Ok(()) + } +} diff --git a/mobile_verifier/src/unique_connections/ingestor.rs b/mobile_verifier/src/unique_connections/ingestor.rs new file mode 100644 index 000000000..36a599c04 --- /dev/null +++ b/mobile_verifier/src/unique_connections/ingestor.rs @@ -0,0 +1,185 @@ +use chrono::Utc; +use file_store::{ + file_info_poller::{FileInfoStream, LookbackBehavior}, + file_sink::FileSinkClient, + file_source, + file_upload::FileUpload, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, + unique_connections::{ + UniqueConnectionReq, UniqueConnectionsIngestReport, VerifiedUniqueConnectionsIngestReport, + }, + FileStore, FileType, +}; +use futures::{StreamExt, TryFutureExt}; +use helium_crypto::PublicKeyBinary; +use helium_proto::services::{ + mobile_config::NetworkKeyRole, + poc_mobile::{ + VerifiedUniqueConnectionsIngestReportStatus, VerifiedUniqueConnectionsIngestReportV1, + }, +}; +use mobile_config::client::authorization_client::AuthorizationVerifier; +use sqlx::PgPool; +use task_manager::{ManagedTask, TaskManager}; +use tokio::sync::mpsc::Receiver; + +use crate::Settings; + +use super::db; + +pub struct UniqueConnectionsIngestor { + pool: PgPool, + unique_connections_receiver: Receiver>, + verified_unique_connections_sink: FileSinkClient, + authorization_verifier: AV, +} + +impl ManagedTask for UniqueConnectionsIngestor +where + AV: AuthorizationVerifier + Send + Sync + 'static, +{ + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> futures_util::future::LocalBoxFuture<'static, anyhow::Result<()>> { + let handle = tokio::spawn(self.run(shutdown)); + Box::pin( + handle + .map_err(anyhow::Error::from) + .and_then(|result| async move { result.map_err(anyhow::Error::from) }), + ) + } +} + +impl UniqueConnectionsIngestor +where + AV: AuthorizationVerifier + Send + Sync + 'static, +{ + pub async fn create_managed_task( + pool: PgPool, + settings: &Settings, + file_upload: FileUpload, + file_store: FileStore, + authorization_verifier: AV, + ) -> anyhow::Result { + let (verified_unique_connections, verified_unique_conections_server) = + VerifiedUniqueConnectionsIngestReportV1::file_sink( + settings.store_base_path(), + file_upload.clone(), + FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, + env!("CARGO_PKG_NAME"), + ) + .await?; + + let (unique_connections_ingest, unique_connections_server) = + file_source::Continuous::msg_source::() + .state(pool.clone()) + .store(file_store.clone()) + .lookback(LookbackBehavior::StartAfter(settings.start_after)) + .prefix(FileType::UniqueConnectionsReport.to_string()) + .create() + .await?; + + let radio_threshold_ingestor = Self::new( + pool.clone(), + unique_connections_ingest, + verified_unique_connections, + authorization_verifier, + ); + + Ok(TaskManager::builder() + .add_task(verified_unique_conections_server) + .add_task(radio_threshold_ingestor) + .add_task(unique_connections_server) + .build()) + } + + pub fn new( + pool: PgPool, + unique_connections_receiver: Receiver>, + verified_unique_connections_sink: FileSinkClient, + authorization_verifier: AV, + ) -> Self { + Self { + pool, + unique_connections_receiver, + verified_unique_connections_sink, + authorization_verifier, + } + } + + async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { + tracing::info!("starting unique connections ingestor"); + loop { + tokio::select! { + biased; + _ = shutdown.clone() => break, + Some(file) = self.unique_connections_receiver.recv() => { + self.process_unique_connections_file(file).await?; + } + } + } + tracing::info!("stopping unique connections ingestor"); + Ok(()) + } + + async fn process_unique_connections_file( + &self, + file_info_stream: FileInfoStream, + ) -> anyhow::Result<()> { + let mut txn = self.pool.begin().await?; + let mut stream = file_info_stream.into_stream(&mut txn).await?; + + let mut verified = vec![]; + + while let Some(unique_connections_report) = stream.next().await { + let verified_report_status = self + .verify_unique_connection_report(&unique_connections_report.report) + .await; + + if matches!( + verified_report_status, + VerifiedUniqueConnectionsIngestReportStatus::Valid + ) { + verified.push(unique_connections_report.clone()); + } + + let verified_report_proto = VerifiedUniqueConnectionsIngestReport { + timestamp: Utc::now(), + report: unique_connections_report, + status: verified_report_status, + }; + + self.verified_unique_connections_sink + .write( + verified_report_proto.into(), + &[("report_status", verified_report_status.as_str_name())], + ) + .await?; + } + + db::save(&mut txn, &verified).await?; + txn.commit().await?; + self.verified_unique_connections_sink.commit().await?; + + Ok(()) + } + + async fn verify_unique_connection_report( + &self, + report: &UniqueConnectionReq, + ) -> VerifiedUniqueConnectionsIngestReportStatus { + if !self.verify_known_carrier_key(&report.carrier_key).await { + return VerifiedUniqueConnectionsIngestReportStatus::InvalidCarrierKey; + } + VerifiedUniqueConnectionsIngestReportStatus::Valid + } + + async fn verify_known_carrier_key(&self, public_key: &PublicKeyBinary) -> bool { + self.authorization_verifier + .verify_authorized_key(public_key, NetworkKeyRole::MobileCarrier) + .await + .unwrap_or_default() + } +} diff --git a/mobile_verifier/src/unique_connections/mod.rs b/mobile_verifier/src/unique_connections/mod.rs new file mode 100644 index 000000000..096187a5b --- /dev/null +++ b/mobile_verifier/src/unique_connections/mod.rs @@ -0,0 +1,22 @@ +pub mod db; +pub mod ingestor; + +use coverage_point_calculator::RadioType; +use helium_crypto::PublicKeyBinary; +use std::collections::HashMap; + +pub type UniqueConnectionCounts = HashMap; + +// hip-134: +// https://github.com/helium/HIP/blob/main/0134-reward-mobile-carrier-offload-hotspots.md +// A Hotspot serving >25 unique connections, as defined by the Carrier utlizing the hotspots for Carrier Offload, on a seven day rolling average. +const MINIMUM_UNIQUE_CONNECTIONS: u64 = 25; + +pub fn is_qualified( + unique_connections: &UniqueConnectionCounts, + pubkey: &PublicKeyBinary, + radio_type: &RadioType, +) -> bool { + let uniq_conns = unique_connections.get(pubkey).cloned().unwrap_or_default(); + radio_type.is_wifi() && uniq_conns > MINIMUM_UNIQUE_CONNECTIONS +} diff --git a/mobile_verifier/tests/integrations/boosting_oracles.rs b/mobile_verifier/tests/integrations/boosting_oracles.rs index 9df359402..6905057aa 100644 --- a/mobile_verifier/tests/integrations/boosting_oracles.rs +++ b/mobile_verifier/tests/integrations/boosting_oracles.rs @@ -24,6 +24,7 @@ use mobile_verifier::{ sp_boosted_rewards_bans::BannedRadios, speedtests::Speedtest, speedtests_average::{SpeedtestAverage, SpeedtestAverages}, + unique_connections::UniqueConnectionCounts, }; use rust_decimal::Decimal; use rust_decimal_macros::dec; @@ -386,6 +387,7 @@ async fn test_footfall_and_urbanization_and_landtype(pool: PgPool) -> anyhow::Re &BoostedHexes::default(), &BoostedHexEligibility::default(), &BannedRadios::default(), + &UniqueConnectionCounts::default(), &epoch, ) .await diff --git a/mobile_verifier/tests/integrations/common/mod.rs b/mobile_verifier/tests/integrations/common/mod.rs index d1e73dd47..7e1563929 100644 --- a/mobile_verifier/tests/integrations/common/mod.rs +++ b/mobile_verifier/tests/integrations/common/mod.rs @@ -102,13 +102,10 @@ impl MockFileSinkReceiver { impl MockFileSinkReceiver { pub async fn receive_radio_reward_v1(&mut self) -> RadioReward { match self.receive("receive_radio_reward_v1").await { - Some(mobile_reward) => { - println!("mobile_reward: {:?}", mobile_reward); - match mobile_reward.reward { - Some(MobileReward::RadioReward(r)) => r, - _ => panic!("failed to get radio reward"), - } - } + Some(mobile_reward) => match mobile_reward.reward { + Some(MobileReward::RadioReward(r)) => r, + err => panic!("failed to get radio reward: {err:?}"), + }, None => panic!("failed to receive radio reward"), } } @@ -127,7 +124,7 @@ impl MockFileSinkReceiver { ); reward } - _ => panic!("failed to get radio reward"), + err => panic!("failed to get radio reward: {err:?}"), }, None => panic!("failed to receive radio reward"), } @@ -255,6 +252,15 @@ pub fn mock_hex_boost_data_default() -> HexBoostData HexBoostData { + HexBoostData::builder() + .urbanization(Assignment::C) + .footfall(Assignment::C) + .landtype(Assignment::C) + .build() + .unwrap() +} + type MockAssignmentMap = HashMap; #[allow(dead_code)] diff --git a/mobile_verifier/tests/integrations/modeled_coverage.rs b/mobile_verifier/tests/integrations/modeled_coverage.rs index 1056f9d8b..7c91ad67d 100644 --- a/mobile_verifier/tests/integrations/modeled_coverage.rs +++ b/mobile_verifier/tests/integrations/modeled_coverage.rs @@ -27,6 +27,7 @@ use mobile_verifier::{ sp_boosted_rewards_bans::BannedRadios, speedtests::Speedtest, speedtests_average::{SpeedtestAverage, SpeedtestAverages}, + unique_connections::UniqueConnectionCounts, IsAuthorized, }; use rust_decimal_macros::dec; @@ -484,6 +485,7 @@ async fn scenario_one(pool: PgPool) -> anyhow::Result<()> { &BoostedHexes::default(), &BoostedHexEligibility::default(), &BannedRadios::default(), + &UniqueConnectionCounts::default(), &reward_period, ) .await?; @@ -587,6 +589,7 @@ async fn scenario_two(pool: PgPool) -> anyhow::Result<()> { &BoostedHexes::default(), &BoostedHexEligibility::default(), &BannedRadios::default(), + &UniqueConnectionCounts::default(), &reward_period, ) .await?; @@ -876,6 +879,7 @@ async fn scenario_three(pool: PgPool) -> anyhow::Result<()> { &boosted_hexes, &BoostedHexEligibility::default(), &BannedRadios::default(), + &UniqueConnectionCounts::default(), &reward_period, ) .await?; @@ -966,6 +970,7 @@ async fn scenario_four(pool: PgPool) -> anyhow::Result<()> { &BoostedHexes::default(), &BoostedHexEligibility::default(), &BannedRadios::default(), + &UniqueConnectionCounts::default(), &reward_period, ) .await?; @@ -1068,6 +1073,7 @@ async fn scenario_five(pool: PgPool) -> anyhow::Result<()> { &BoostedHexes::default(), &BoostedHexEligibility::default(), &BannedRadios::default(), + &UniqueConnectionCounts::default(), &reward_period, ) .await?; @@ -1318,6 +1324,7 @@ async fn scenario_six(pool: PgPool) -> anyhow::Result<()> { &BoostedHexes::default(), &BoostedHexEligibility::default(), &BannedRadios::default(), + &UniqueConnectionCounts::default(), &reward_period, ) .await?; diff --git a/mobile_verifier/tests/integrations/rewarder_poc_dc.rs b/mobile_verifier/tests/integrations/rewarder_poc_dc.rs index e26af88b8..7265cef8e 100644 --- a/mobile_verifier/tests/integrations/rewarder_poc_dc.rs +++ b/mobile_verifier/tests/integrations/rewarder_poc_dc.rs @@ -1,26 +1,40 @@ +use std::ops::Range; + use crate::common::{self, MockFileSinkReceiver, MockHexBoostingClient, RadioRewardV2Ext}; use chrono::{DateTime, Duration as ChronoDuration, Utc}; use file_store::{ coverage::{CoverageObject as FSCoverageObject, KeyType, RadioHexSignalLevel}, speedtest::CellSpeedtest, + unique_connections::{UniqueConnectionReq, UniqueConnectionsIngestReport}, }; use helium_crypto::PublicKeyBinary; -use helium_proto::services::poc_mobile::{ - CoverageObjectValidity, GatewayReward, HeartbeatValidity, LocationSource, MobileRewardShare, - RadioRewardV2, SeniorityUpdateReason, SignalLevel, -}; use mobile_verifier::{ cell_type::CellType, coverage::CoverageObject, data_session, heartbeats::{HbType, Heartbeat, ValidatedHeartbeat}, - reward_shares, rewarder, speedtests, + reward_shares, rewarder, + sp_boosted_rewards_bans::{self, BannedRadioReport}, + speedtests, unique_connections, }; use rust_decimal::prelude::*; use rust_decimal_macros::dec; use sqlx::{PgPool, Postgres, Transaction}; use uuid::Uuid; +pub mod proto { + pub use helium_proto::services::poc_mobile::{ + service_provider_boosted_rewards_banned_radio_req_v1::KeyType, + service_provider_boosted_rewards_banned_radio_req_v1::{ + SpBoostedRewardsBannedRadioBanType, SpBoostedRewardsBannedRadioReason, + }, + CoverageObjectValidity, GatewayReward, HeartbeatValidity, LocationSource, + MobileRewardShare, RadioRewardV2, SeniorityUpdateReason, + ServiceProviderBoostedRewardsBannedRadioIngestReportV1, + ServiceProviderBoostedRewardsBannedRadioReqV1, SignalLevel, + }; +} + const HOTSPOT_1: &str = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6"; const HOTSPOT_2: &str = "11uJHS2YaEWJqgqC7yza9uvSmpv5FWoMQXiP8WbxBGgNUmifUJf"; const HOTSPOT_3: &str = "112E7TxoNHV46M6tiPA8N1MkeMeQxc9ztb4JQLXBVAAUfq1kJLoF"; @@ -55,7 +69,7 @@ async fn test_poc_and_dc_rewards(pool: PgPool) -> anyhow::Result<()> { &epoch, dec!(0.0001) ), - receive_expected_rewards(&mut mobile_rewards) + receive_expected_rewards_with_counts(&mut mobile_rewards, 3, 3, false) ); if let Ok((poc_rewards, dc_rewards)) = rewards { // assert poc reward outputs @@ -123,29 +137,103 @@ async fn test_poc_and_dc_rewards(pool: PgPool) -> anyhow::Result<()> { Ok(()) } -async fn receive_expected_rewards( - mobile_rewards: &mut MockFileSinkReceiver, -) -> anyhow::Result<(Vec, Vec)> { - // get the filestore outputs from rewards run +#[sqlx::test] +async fn test_qualified_wifi_poc_rewards(pool: PgPool) -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + sqlx::migrate!().run(&pool).await?; - // expect 3 gateway rewards for dc transfer - let dc_reward1 = mobile_rewards.receive_gateway_reward().await; - let dc_reward2 = mobile_rewards.receive_gateway_reward().await; - let dc_reward3 = mobile_rewards.receive_gateway_reward().await; - let mut dc_rewards = vec![dc_reward1, dc_reward2, dc_reward3]; - dc_rewards.sort_by(|a, b| b.hotspot_key.cmp(&a.hotspot_key)); + let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink(); + let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); + let now = Utc::now(); + let epoch = (now - ChronoDuration::hours(24))..now; + let pubkey: PublicKeyBinary = HOTSPOT_3.to_string().parse().unwrap(); // wifi hotspot - // we will have 3 radio rewards, 1 wifi radio and 2 cbrs radios - let radio_reward1 = mobile_rewards.receive_radio_reward().await; - let radio_reward2 = mobile_rewards.receive_radio_reward().await; - let radio_reward3 = mobile_rewards.receive_radio_reward().await; - // ordering is not guaranteed, so stick the rewards into a vec and sort - let mut poc_rewards = vec![radio_reward1, radio_reward2, radio_reward3]; - // after sorting reward 1 = cbrs radio1, 2 = cbrs radio2, 3 = wifi radio - poc_rewards.sort_by(|a, b| b.hotspot_key.cmp(&a.hotspot_key)); + // seed all the things + let mut txn = pool.clone().begin().await?; + seed_heartbeats(epoch.start, &mut txn).await?; + seed_speedtests(epoch.end, &mut txn).await?; + seed_data_sessions(epoch.start, &mut txn).await?; + txn.commit().await?; + update_assignments_bad(&pool).await?; + + // Run rewards with no unique connections, no poc rewards, expect unallocated + let boosted_hexes = vec![]; + let hex_boosting_client = MockHexBoostingClient::new(boosted_hexes); + let (_, _rewards) = tokio::join!( + rewarder::reward_poc_and_dc( + &pool, + &hex_boosting_client, + &mobile_rewards_client, + &speedtest_avg_client, + &epoch, + dec!(0.0001) + ), + // expecting NO poc rewards, expecting unallocated + receive_expected_rewards_with_counts(&mut mobile_rewards, 3, 0, true) + ); + + // seed single unique conections report within epoch + let mut txn = pool.begin().await?; + seed_unique_connections(&mut txn, &[(pubkey.clone(), 42)], &epoch).await?; + txn.commit().await?; + + // SP ban radio, unique connections should supersede banning + let mut txn = pool.begin().await?; + ban_wifi_radio_for_epoch(&mut txn, pubkey.clone(), &epoch).await?; + txn.commit().await?; + + let (_, rewards) = tokio::join!( + // run rewards for poc and dc + rewarder::reward_poc_and_dc( + &pool, + &hex_boosting_client, + &mobile_rewards_client, + &speedtest_avg_client, + &epoch, + dec!(0.0001) + ), + // expecting single radio with poc rewards, no unallocated + receive_expected_rewards_with_counts(&mut mobile_rewards, 3, 1, false) + ); + let Ok((poc_rewards, dc_rewards)) = rewards else { + panic!("rewards failed"); + }; + + let poc_sum: u64 = poc_rewards.iter().map(|r| r.total_poc_reward()).sum(); + let dc_sum: u64 = dc_rewards.iter().map(|r| r.dc_transfer_reward).sum(); + let total = poc_sum + dc_sum; + + let expected_sum = reward_shares::get_scheduled_tokens_for_poc(epoch.end - epoch.start) + .to_u64() + .unwrap(); + assert_eq!(expected_sum, total); + + Ok(()) +} - // should be no further msgs - mobile_rewards.assert_no_messages(); +async fn receive_expected_rewards_with_counts( + mobile_rewards: &mut MockFileSinkReceiver, + expected_dc_reward_count: usize, + expected_poc_reward_count: usize, + expect_unallocated: bool, +) -> anyhow::Result<(Vec, Vec)> { + let mut dc_rewards = vec![]; + let mut poc_rewards = vec![]; + + for _ in 0..expected_dc_reward_count { + dc_rewards.push(mobile_rewards.receive_gateway_reward().await); + } + + for _ in 0..expected_poc_reward_count { + poc_rewards.push(mobile_rewards.receive_radio_reward().await); + } + + if expect_unallocated { + mobile_rewards.receive_unallocated_reward().await; + } + + dc_rewards.sort_by(|a, b| b.hotspot_key.cmp(&a.hotspot_key)); + poc_rewards.sort_by(|a, b| b.hotspot_key.cmp(&a.hotspot_key)); Ok((poc_rewards, dc_rewards)) } @@ -175,13 +263,13 @@ async fn seed_heartbeats( coverage_object: Some(cov_obj_1.coverage_object.uuid), location_validation_timestamp: None, timestamp: ts + ChronoDuration::hours(n), - location_source: LocationSource::Gps, + location_source: proto::LocationSource::Gps, }, cell_type: CellType::SercommIndoor, distance_to_asserted: None, coverage_meta: None, location_trust_score_multiplier: dec!(1.0), - validity: HeartbeatValidity::Valid, + validity: proto::HeartbeatValidity::Valid, }; let hotspot_key2: PublicKeyBinary = HOTSPOT_2.to_string().parse().unwrap(); @@ -204,13 +292,13 @@ async fn seed_heartbeats( coverage_object: Some(cov_obj_2.coverage_object.uuid), location_validation_timestamp: None, timestamp: ts + ChronoDuration::hours(n), - location_source: LocationSource::Gps, + location_source: proto::LocationSource::Gps, }, cell_type: CellType::SercommOutdoor, distance_to_asserted: None, coverage_meta: None, location_trust_score_multiplier: dec!(1.0), - validity: HeartbeatValidity::Valid, + validity: proto::HeartbeatValidity::Valid, }; let hotspot_key3: PublicKeyBinary = HOTSPOT_3.to_string().parse().unwrap(); @@ -232,13 +320,13 @@ async fn seed_heartbeats( coverage_object: Some(cov_obj_3.coverage_object.uuid), location_validation_timestamp: Some(ts - ChronoDuration::hours(24)), timestamp: ts + ChronoDuration::hours(n), - location_source: LocationSource::Skyhook, + location_source: proto::LocationSource::Skyhook, }, cell_type: CellType::NovaGenericWifiIndoor, distance_to_asserted: Some(10), coverage_meta: None, location_trust_score_multiplier: dec!(1.0), - validity: HeartbeatValidity::Valid, + validity: proto::HeartbeatValidity::Valid, }; save_seniority_object(ts + ChronoDuration::hours(n), &wifi_heartbeat, txn).await?; @@ -265,6 +353,15 @@ async fn update_assignments(pool: &PgPool) -> anyhow::Result<()> { Ok(()) } +async fn update_assignments_bad(pool: &PgPool) -> anyhow::Result<()> { + let _ = common::set_unassigned_oracle_boosting_assignments( + pool, + &common::mock_hex_boost_data_bad(), + ) + .await?; + Ok(()) +} + async fn seed_speedtests( ts: DateTime, txn: &mut Transaction<'_, Postgres>, @@ -338,6 +435,30 @@ async fn seed_data_sessions( Ok(()) } +async fn seed_unique_connections( + txn: &mut Transaction<'_, Postgres>, + things: &[(PublicKeyBinary, u64)], + epoch: &Range>, +) -> anyhow::Result<()> { + let mut reports = vec![]; + for (pubkey, unique_connections) in things { + reports.push(UniqueConnectionsIngestReport { + received_timestamp: epoch.start + chrono::Duration::hours(1), + report: UniqueConnectionReq { + pubkey: pubkey.clone(), + start_timestamp: Utc::now(), + end_timestamp: Utc::now(), + unique_connections: *unique_connections, + timestamp: Utc::now(), + carrier_key: pubkey.clone(), + signature: vec![], + }, + }); + } + unique_connections::db::save(txn, &reports).await?; + Ok(()) +} + fn create_coverage_object( ts: DateTime, cbsd_id: Option, @@ -357,7 +478,7 @@ fn create_coverage_object( coverage_claim_time: ts, coverage: vec![RadioHexSignalLevel { location, - signal_level: SignalLevel::High, + signal_level: proto::SignalLevel::High, signal_power: 1000, }], indoor, @@ -366,7 +487,7 @@ fn create_coverage_object( }; CoverageObject { coverage_object: report, - validity: CoverageObjectValidity::Valid, + validity: proto::CoverageObjectValidity::Valid, } } @@ -389,9 +510,33 @@ async fn save_seniority_object( .bind(hb.heartbeat.coverage_object) .bind(ts) .bind(ts) - .bind(SeniorityUpdateReason::NewCoverageClaimTime as i32) + .bind(proto::SeniorityUpdateReason::NewCoverageClaimTime as i32) .bind(hb.heartbeat.hb_type) .execute(&mut *exec) .await?; Ok(()) } + +async fn ban_wifi_radio_for_epoch( + txn: &mut Transaction<'_, Postgres>, + pubkey: PublicKeyBinary, + epoch: &Range>, +) -> anyhow::Result<()> { + let until = (epoch.start + chrono::Duration::days(7)).timestamp_millis() as u64; + let received_timestamp = (epoch.start + chrono::Duration::hours(2)).timestamp_millis() as u64; + + let ban_report = proto::ServiceProviderBoostedRewardsBannedRadioIngestReportV1 { + received_timestamp, + report: Some(proto::ServiceProviderBoostedRewardsBannedRadioReqV1 { + pubkey: pubkey.clone().into(), + reason: proto::SpBoostedRewardsBannedRadioReason::NoNetworkCorrelation.into(), + until, + signature: vec![], + ban_type: proto::SpBoostedRewardsBannedRadioBanType::Poc.into(), + key_type: Some(proto::KeyType::HotspotKey(pubkey.into())), + }), + }; + let ban_report = BannedRadioReport::try_from(ban_report)?; + sp_boosted_rewards_bans::db::update_report(txn, &ban_report).await?; + Ok(()) +}