Skip to content

Commit

Permalink
remove weighted random selection
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus committed Apr 15, 2024
1 parent 4b6ce4a commit 22a8db8
Show file tree
Hide file tree
Showing 11 changed files with 70 additions and 132 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# candidate-selection

Utilities for picking candidates out of a set, based on weighted random selection. The weights used for selection are combuted using a [weighted product model](https://en.wikipedia.org/wiki/Weighted_product_model) over criteria such as success rate, latency, etc.
Utilities for picking candidates out of a set based on the [weighted product model](https://en.wikipedia.org/wiki/Weighted_product_model) over criteria such as success rate, latency, etc.

This library is heavily influenced by, and intended to replace, the original indexer selection algorithm used by the Graph Gateway. The original algorithm was designed by Zachary Burns & Theodore Butler.
4 changes: 0 additions & 4 deletions candidate-selection/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,3 @@ arrayvec = "0.7.4"
ordered-float = { version = "4.2.0", default-features = false }
permutation = "0.4.1"
proptest = "1.4.0"
rand = { version = "0.8.5", default-features = false, features = ["alloc"] }

[dev-dependencies]
rand = { version = "0.8.5", default-features = true, features = ["small_rng"] }
91 changes: 42 additions & 49 deletions candidate-selection/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ mod test;

pub use crate::num::Normalized;
pub use arrayvec::ArrayVec;
use rand::seq::SliceRandom as _;
use std::collections::BTreeMap;
use ordered_float::NotNan;

pub trait Candidate {
type Id: Eq + Ord;
fn id(&self) -> Self::Id;
fn fee(&self) -> Normalized;
fn score(&self) -> Normalized;
fn score_many<const LIMIT: usize>(candidates: &[&Self]) -> Normalized;
}
Expand All @@ -24,60 +24,53 @@ pub trait Candidate {
///
/// If a candidate's score is below `min_score_cutoff` as a proportion of the max provider's
/// individual score, then the provider will not be selected.
pub fn select<'c, Rng, Candidate, const LIMIT: usize>(
rng: &mut Rng,
pub fn select<'c, Candidate, const LIMIT: usize>(
candidates: &'c [Candidate],
min_score_cutoff: Normalized,
) -> ArrayVec<&'c Candidate, LIMIT>
where
Rng: rand::Rng,
Candidate: crate::Candidate,
{
assert!(LIMIT > 0);
// Collect into a map to remove duplicate candidates.
let candidates: BTreeMap<Candidate::Id, (&Candidate, Normalized)> = candidates
.iter()
.map(|candidate| {
let score = Candidate::score(candidate);
(candidate.id(), (candidate, score))
})
.filter(|(_, (_, score))| score > &Normalized::ZERO)
.collect();
if candidates.is_empty() {
return ArrayVec::new();
}
let max_score = *candidates.values().map(|(_, score)| score).max().unwrap();
let cutoff_score = max_score * min_score_cutoff;
// Collect into a vec because `choose_weighted` requires a slice to pick from.
let mut candidates: Vec<(&Candidate, Normalized)> = candidates
.into_iter()
.filter(|(_, (_, score))| *score >= cutoff_score)
.map(|(_, (candidate, score))| (candidate, score))
.collect();
// At this point we have reduced the candidates to those with a nonzero score above the cutoff.

let (first_selection, combined_score) = *candidates
.choose_weighted(rng, |(_, score)| score.as_f64())
.unwrap();
let mut selections: ArrayVec<&Candidate, LIMIT> = Default::default();
selections.push(first_selection);
candidates.retain(|(candidate, _)| candidate.id() != first_selection.id());
let marginal_score = |current_score: Normalized,
selected: &ArrayVec<&'c Candidate, LIMIT>,
candidate: &'c Candidate| {
let mut buf = selected.clone();
buf.push(candidate);
let potential_score = Candidate::score_many::<LIMIT>(&buf);
NotNan::new(potential_score.as_f64() - current_score.as_f64()).unwrap()
};

// Sample sets of candidates to find combinations that increase the combined score.
let sample_limit = candidates.len().min(LIMIT * 5);
for _ in 0..sample_limit {
if (selections.len() == LIMIT) || candidates.is_empty() {
break;
}
let (picked, _) = *candidates
.choose_weighted(rng, |(_, score)| score.as_f64())
.unwrap();
selections.push(picked);
if Candidate::score_many::<LIMIT>(&selections) > combined_score {
candidates.retain(|(candidate, _)| candidate.id() != picked.id());
} else {
selections.pop();
}
let mut selected: ArrayVec<&Candidate, LIMIT> = Default::default();
while selected.len() < LIMIT {
let current_score = match selected.len() {
0 => Normalized::ZERO,
1 => Candidate::score(selected[0]),
_ => Candidate::score_many::<LIMIT>(&selected),
};
let selection = candidates
.iter()
.filter(|c| selected.iter().all(|s| s.id() != c.id()))
.map(|c| (c, marginal_score(current_score, &selected, c)))
.max_by_key(|(c, marginal_score)| {
if c.fee() == Normalized::ZERO {
return *marginal_score;
}
marginal_score / c.fee().as_f64()
})
.filter(|(c, marginal_score)| {
if current_score == Normalized::ZERO {
return true;
}
let max_score = 0.5 * *(marginal_score / current_score.as_f64());
c.fee().as_f64() <= max_score
});
match selection {
Some((selection, _)) => {
selected.push(selection);
}
_ => break,
};
}
selections
selected
}
2 changes: 1 addition & 1 deletion candidate-selection/src/num.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl Normalized {
self == &Self::ZERO
}

pub fn arbitrary() -> impl Strategy<Value = Normalized> {
pub fn arbitrary() -> impl Strategy<Value = Self> {
(0.0..=1.0).prop_map(|n| Normalized::new(n).unwrap())
}
}
Expand Down
31 changes: 11 additions & 20 deletions candidate-selection/src/test.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
use crate::{select, ArrayVec, Candidate, Normalized};
use proptest::{prelude::prop, prop_assert_eq, prop_compose, proptest};
use rand::{rngs::SmallRng, SeedableRng as _};

#[derive(Debug)]
struct TestCandidate {
id: usize,
id: u8,
fee: Normalized,
score: Normalized,
}

impl Candidate for TestCandidate {
type Id = usize;
type Id = u8;
fn id(&self) -> Self::Id {
self.id
}
fn fee(&self) -> Normalized {
self.fee
}
fn score(&self) -> Normalized {
self.score
}
Expand All @@ -26,35 +29,23 @@ impl Candidate for TestCandidate {
}

prop_compose! {
fn candidates()(scores in prop::collection::vec(Normalized::arbitrary(), 1..32)) -> Vec<TestCandidate> {
scores.into_iter().enumerate().map(|(id, score)| TestCandidate { id, score }).collect()
fn candidate()(id: u8, fee in Normalized::arbitrary(), score in Normalized::arbitrary()) -> TestCandidate {
TestCandidate { id, fee, score }
}
}
proptest! {
#[test]
fn acceptable_candidates_selected(
seed: u64,
candidates in candidates(),
min_score_cutoff in Normalized::arbitrary(),
candidates in prop::collection::vec(candidate(), 1..16),
) {
let mut rng = SmallRng::seed_from_u64(seed);
let exists_acceptable_candidate = candidates.iter().any(|c| c.score > Normalized::ZERO);
let min_score = candidates
.iter()
.filter(|c| c.score > Normalized::ZERO)
.map(|c| c.score)
.max()
.map(|s| s * min_score_cutoff)
.unwrap_or(Normalized::ZERO);

let selections: ArrayVec<&TestCandidate, 1> = select(&mut rng, &candidates, min_score_cutoff);
let selections: ArrayVec<&TestCandidate, 1> = select(&candidates);
prop_assert_eq!(exists_acceptable_candidate, !selections.is_empty());
prop_assert_eq!(true, selections.iter().all(|s| s.score > Normalized::ZERO));
prop_assert_eq!(true, selections.iter().all(|s| s.score >= min_score));

let selections: ArrayVec<&TestCandidate, 3> = select(&mut rng, &candidates, min_score_cutoff);
let selections: ArrayVec<&TestCandidate, 3> = select(&candidates);
prop_assert_eq!(true, selections.iter().all(|s| s.score > Normalized::ZERO));
prop_assert_eq!(exists_acceptable_candidate, !selections.is_empty());
prop_assert_eq!(true, selections.iter().all(|s| s.score >= min_score));
}
}
2 changes: 0 additions & 2 deletions indexer-selection/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ edition = "2021"
[dependencies]
candidate-selection = { path = "../candidate-selection" }
custom_debug = "0.6.1"
rand = { version = "0.8.5", default-features = false }
thegraph-core = "0.3.0"
url = "2.5.0"

[dev-dependencies]
proptest = "1.4.0"
rand = { version = "0.8.5", default-features = true, features = ["small_rng"] }
39 changes: 10 additions & 29 deletions indexer-selection/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,8 @@ pub struct Candidate {
pub zero_allocation: bool,
}

const MIN_SCORE_CUTOFF: f64 = 0.25;

pub fn select<'c, Rng, const LIMIT: usize>(
rng: &mut Rng,
candidates: &'c [Candidate],
) -> ArrayVec<&'c Candidate, LIMIT>
where
Rng: rand::Rng,
{
candidate_selection::select(rng, candidates, Normalized::new(MIN_SCORE_CUTOFF).unwrap())
pub fn select<const LIMIT: usize>(candidates: &[Candidate]) -> ArrayVec<&Candidate, LIMIT> {
candidate_selection::select(candidates)
}

impl candidate_selection::Candidate for Candidate {
Expand All @@ -51,11 +43,14 @@ impl candidate_selection::Candidate for Candidate {
hasher.finish()
}

fn fee(&self) -> Normalized {
self.fee
}

fn score(&self) -> Normalized {
[
score_success_rate(self.perf.success_rate),
score_latency(self.perf.latency_ms()),
score_fee(self.fee),
score_seconds_behind(self.seconds_behind),
score_slashable_grt(self.slashable_grt),
score_subgraph_versions_behind(self.subgraph_versions_behind),
Expand All @@ -67,10 +62,9 @@ impl candidate_selection::Candidate for Candidate {

fn score_many<const LIMIT: usize>(candidates: &[&Self]) -> Normalized {
let fee = candidates.iter().map(|c| c.fee.as_f64()).sum::<f64>();
let fee = match Normalized::new(fee) {
Some(fee) => fee,
None => return Normalized::ZERO,
};
if Normalized::new(fee).is_none() {
return Normalized::ZERO;
}

let perf: ArrayVec<ExpectedPerformance, LIMIT> =
candidates.iter().map(|c| c.perf).collect();
Expand Down Expand Up @@ -113,7 +107,6 @@ impl candidate_selection::Candidate for Candidate {
[
score_success_rate(success_rate),
score_latency(latency),
score_fee(fee),
score_seconds_behind(seconds_behind),
score_slashable_grt(slashable_grt),
score_subgraph_versions_behind(subgraph_versions_behind),
Expand All @@ -124,21 +117,9 @@ impl candidate_selection::Candidate for Candidate {
}
}

/// Score the given `fee`, which is a fraction of some budget. The weight chosen for WPM should be
/// set to target the "optimal" value shown as the vertical line in the following plot.
/// https://www.desmos.com/calculator/wf0tsp1sxh
pub fn score_fee(fee: Normalized) -> Normalized {
// (5_f64.sqrt() - 1.0) / 2.0
const S: f64 = 0.6180339887498949;
let score = (fee.as_f64() + S).recip() - S;
// Set minimum score, since a very small negative value can result from loss of precision when
// the fee approaches the budget.
Normalized::new(score.max(1e-18)).unwrap()
}

/// Avoid serving deployments at versions behind, unless newer versions have poor indexer support.
fn score_subgraph_versions_behind(subgraph_versions_behind: u8) -> Normalized {
Normalized::new(MIN_SCORE_CUTOFF.powi(subgraph_versions_behind as i32)).unwrap()
Normalized::new(0.25_f64.powi(subgraph_versions_behind as i32)).unwrap()
}

/// https://www.desmos.com/calculator/wmgkasfvza
Expand Down
21 changes: 1 addition & 20 deletions indexer-selection/src/test.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,10 @@
use crate::*;
use candidate_selection::num::assert_within;
use proptest::{prop_assert, prop_compose, proptest};
use rand::{rngs::SmallRng, SeedableRng};

mod limits {
use super::*;

#[test]
fn fee() {
assert_within(score_fee(Normalized::ZERO).as_f64(), 1.0, 1e-12);
assert_within(
score_fee(Normalized::new(1e-18).unwrap()).as_f64(),
1.0,
1e-12,
);
assert_within(score_fee(Normalized::ONE).as_f64(), 0.0, 1e-12);
assert_within(
score_fee(Normalized::new(1.0 - 1e-18).unwrap()).as_f64(),
0.0,
1e-12,
);
}

#[test]
fn success_rate() {
assert_within(score_success_rate(Normalized::ZERO).as_f64(), 0.01, 0.001);
Expand Down Expand Up @@ -78,11 +61,9 @@ prop_compose! {
proptest! {
#[test]
fn select(
seed: u64,
candidates in candidates(),
) {
let mut rng = SmallRng::seed_from_u64(seed);
let selections: ArrayVec<&Candidate, 3> = crate::select(&mut rng, &candidates);
let selections: ArrayVec<&Candidate, 3> = crate::select(&candidates);
println!("{:#?}", selections.iter().map(|c| c.indexer).collect::<Vec<_>>());

let valid_candidate = |c: &Candidate| -> bool {
Expand Down
2 changes: 1 addition & 1 deletion simulator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ edition = "2021"
[dependencies]
candidate-selection = { path = "../candidate-selection" }
indexer-selection = { path = "../indexer-selection" }
rand = { version = "0.8.5", default-features = true, features = ["small_rng"] }
rand = "0.8.5"
thegraph-core = "0.3.0"
6 changes: 3 additions & 3 deletions simulator/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{collections::BTreeMap, io::stdin, time::Instant};

use rand::{rngs::SmallRng, Rng, SeedableRng};
use rand::{thread_rng, Rng as _};
use thegraph_core::types::alloy_primitives::Address;

use candidate_selection::{
Expand Down Expand Up @@ -45,7 +45,7 @@ fn main() {
})
.collect();

let mut rng = SmallRng::from_entropy();
let mut rng = thread_rng();

let mut perf: BTreeMap<Address, Performance> = characteristics
.iter()
Expand Down Expand Up @@ -97,7 +97,7 @@ fn main() {
.collect();

let t0 = Instant::now();
let selections: ArrayVec<&Candidate, 3> = select(&mut rng, &candidates);
let selections: ArrayVec<&Candidate, 3> = select(&candidates);
total_selection_μs += Instant::now().duration_since(t0).as_micros();
total_fees_usd += selections
.iter()
Expand Down

0 comments on commit 22a8db8

Please sign in to comment.