From 31e226f9167adaa3493e9e7f0b74dcb234bb2be5 Mon Sep 17 00:00:00 2001 From: shr1ftyy Date: Thu, 9 Jan 2025 19:22:41 +0600 Subject: [PATCH 1/3] feat: challenge scoring --- storb/protocol.py | 1 + storb/validator/reward.py | 37 +++++++++++++ storb/validator/validator.py | 100 ++++++++++++++++++++++++++++++----- 3 files changed, 125 insertions(+), 13 deletions(-) diff --git a/storb/protocol.py b/storb/protocol.py index 52d2e6a..62f6a0a 100644 --- a/storb/protocol.py +++ b/storb/protocol.py @@ -36,6 +36,7 @@ class NewChallenge(BaseModel): challenge_id: str piece_id: str validator_id: int + miner_id: int challenge_deadline: str public_key: int public_exponent: int diff --git a/storb/validator/reward.py b/storb/validator/reward.py index 203b4d8..e4dc1b2 100644 --- a/storb/validator/reward.py +++ b/storb/validator/reward.py @@ -39,3 +39,40 @@ def get_response_rate_scores( scores = np.asarray(weighted_rate_sums) / sum_max return uids, scores + + +def get_challenge_scores( + self, + miner_stats: dict, +) -> tuple[np.ndarray, np.ndarray]: + """Returns an array of scores for the given miner stats + + Parameters + ---------- + miner_stats : dict + A dictionary of miner statistics + + Returns + ------- + np.ndarray + An array of rewards for the given query and responses. + """ + + uids = [] + success_rates = [] + + uids_filter = list(range(len(self.metagraph.nodes))) + for uid, miner_stats in miner_stats.items(): + if uid not in uids_filter: + continue + uids.append(uid) + challenge_attempts = max(miner_stats.get("challenge_attempts", 1), 1) + success_rate = abs( + miner_stats.get("challenge_successes", 0) / challenge_attempts + ) + success_rates.append(success_rate) + + uids = np.array(uids) + scores = np.asarray(success_rates) / max(*success_rates) + + return uids, scores diff --git a/storb/validator/validator.py b/storb/validator/validator.py index d761e34..562c9bf 100644 --- a/storb/validator/validator.py +++ b/storb/validator/validator.py @@ -68,7 +68,7 @@ process_response, ) from storb.util.uids import get_random_hotkeys -from storb.validator.reward import get_response_rate_scores +from storb.validator.reward import get_challenge_scores, get_response_rate_scores logger = get_logger(__name__) @@ -119,6 +119,7 @@ def __init__(self): self.final_latency_scores = np.zeros( len(self.metagraph.nodes), dtype=np.float32 ) + self.challenge_scores = np.zeros(len(self.metagraph.nodes), dtype=np.float32) # Initialize Challenge dictionary to store challenges sent to miners self.challenges: dict[str, protocol.NewChallenge] = {} @@ -299,6 +300,7 @@ def sync_metagraph(self): new_store_latency_scores = np.zeros((len(self.metagraph.nodes))) new_ret_latency_scores = np.zeros((len(self.metagraph.nodes))) new_final_latency_scores = np.zeros((len(self.metagraph.nodes))) + new_challenge_scores = np.zeros((len(self.metagraph.nodes))) min_len = min(len(old_hotkeys), len(self.scores)) len_store_latencies = min( @@ -316,6 +318,9 @@ def sync_metagraph(self): len_final_latency_scores = min( len(old_hotkeys), len(self.final_latency_scores) ) + len_challenge_scores = min( + len(old_hotkeys), len(self.challenge_scores) + ) new_moving_average[:min_len] = self.scores[:min_len] new_store_latencies[:len_store_latencies] = self.store_latencies[ @@ -333,6 +338,9 @@ def sync_metagraph(self): new_final_latency_scores[:len_final_latency_scores] = ( self.final_latency_scores[:len_final_latency_scores] ) + new_challenge_scores[:len_challenge_scores] = self.challenge_scores[ + :len_challenge_scores + ] self.scores = new_moving_average @@ -341,6 +349,7 @@ def sync_metagraph(self): self.store_latency_scores = new_store_latency_scores self.ret_latency_scores = new_ret_latency_scores self.final_latency_scores = new_final_latency_scores + self.challenge_scores = new_challenge_scores # TODO: use debug for these logger.debug(f"(len: {len(self.scores)}) New scores: {self.scores}") logger.debug( @@ -358,6 +367,9 @@ def sync_metagraph(self): logger.debug( f"(len: {len(self.final_latency_scores)}) New latency scores: {self.final_latency_scores}" ) + logger.debug( + f"(len: {len(self.challenge_scores)}) New challenge scores: {self.challenge_scores}" + ) except Exception as e: logger.error(f"Failed to sync metagraph: {str(e)}") @@ -654,6 +666,7 @@ async def challenge_miner(self, miner_id: int, piece_id: str, tag: str): challenge_id=uuid.uuid4().hex, piece_id=piece_id, validator_id=self.uid, + miner_id=miner_id, challenge_deadline=challenge_deadline, public_key=self.challenge.key.rsa.public_key().public_numbers().n, public_exponent=self.challenge.key.rsa.public_key().public_numbers().e, @@ -676,6 +689,16 @@ async def challenge_miner(self, miner_id: int, piece_id: str, tag: str): logger.info( f"Sent challenge {challenge_message.challenge_id} to miner {miner_id} for piece {piece_id}" ) + + async with db.get_db_connection(db_dir=self.db_dir) as conn: + miner_stats = await db.get_miner_stats( + conn=conn, miner_uid=challenge_message.miner_id + ) + miner_stats["challenge_attempts"] += 1 + await db.update_stats( + conn=conn, miner_uid=challenge_message.miner_id, stats=miner_stats + ) + logger.debug(f"PRF KEY: {payload.data.challenge.prf_key}") _, response = await self.query_miner( miner_hotkey, "/challenge", payload, method="POST" @@ -703,11 +726,12 @@ async def forward(self): - Updating the scores """ - # TODO: challenge miners - based on: https://dl.acm.org/doi/10.1145/1315245.1315318 - # TODO: should we lock the db when scoring? - # scoring + # remove expired challenges + await self.remove_expired_challenges() + + # scoring # obtain all miner stats from the validator database async with db.get_db_connection(self.db_dir) as conn: miner_stats = await db.get_all_miner_stats(conn) @@ -725,6 +749,11 @@ async def forward(self): self, miner_stats ) + challenge_uids, self.challenge_scores = get_challenge_scores(self, miner_stats) + + # TODO: error handlin'? + assert (response_rate_uids == challenge_uids).all() + if ( len(self.store_latencies) < len(response_rate_scores) or len(self.retrieve_latencies) < len(response_rate_scores) @@ -737,10 +766,12 @@ async def forward(self): new_store_latency_scores = np.zeros(new_len) new_ret_latency_scores = np.zeros(new_len) new_final_latency_scores = np.zeros(new_len) + new_challenge_scores = np.zeros(new_len) new_scores = np.zeros(new_len) len_lat = len(self.store_latencies) len_lat_scores = len(self.store_latency_scores) + len_challenge_scores = len(self.challenge_scores) with self.scores_lock: len_scores = len(self.scores) @@ -756,6 +787,9 @@ async def forward(self): new_final_latency_scores[:len_lat_scores] = self.final_latency_scores[ :len_lat_scores ] + new_challenge_scores[:len_challenge_scores] = self.challenge_scores[ + :len_challenge_scores + ] new_scores[:len_scores] = self.scores[:len_scores] self.store_latencies = new_store_latencies @@ -769,7 +803,7 @@ async def forward(self): self.final_latency_scores = np.nan_to_num( new_final_latency_scores, nan=0.0 ) - + self.challenge_scores = np.nan_to_num(new_challenge_scores, nan=0.0) self.scores = new_scores latency_scores = ( @@ -780,7 +814,11 @@ async def forward(self): ) # TODO: this should also take the "pdp challenge score" into account - rewards = 0.2 * self.final_latency_scores + 0.3 * response_rate_scores + rewards = ( + 0.2 * self.final_latency_scores + + 0.3 * response_rate_scores + + 0.5 * self.challenge_scores + ) logger.info(f"response rate scores: {response_rate_scores}") logger.info(f"moving avg. store latencies: {self.store_latencies}") @@ -790,6 +828,7 @@ async def forward(self): f"moving avg. retrieve latency scores: {self.retrieve_latency_scores}" ) logger.info(f"moving avg. latency scores: {self.final_latency_scores}") + logger.info(f"challenge rate scores: {self.challenge_scores}") self.update_scores(rewards, response_rate_uids) @@ -1093,9 +1132,30 @@ async def handle_batch_requests(): piece_hashes=piece_hashes, processed_pieces=processed_pieces ) + async def remove_expired_challenges(self): + """ + Remove expired challenges from the `self.challenges` dictionary + + Returns: + None + """ + now = datetime.now(UTC).isoformat() + keys_to_remove = [] + + for key, challenge in self.challenges.items(): + if challenge.challenge_deadline < now: + keys_to_remove.append(key) + + if not keys_to_remove: + return + + for key in keys_to_remove: + challenge = self.challenges[key] + del self.challenges[key] + """API Routes""" - def verify_challenge(self, challenge_request: protocol.ProofResponse) -> bool: + async def verify_challenge(self, challenge_request: protocol.ProofResponse) -> bool: """Verify the challenge proof from the miner Parameters @@ -1110,20 +1170,25 @@ def verify_challenge(self, challenge_request: protocol.ProofResponse) -> bool: """ logger.debug(f"Verifying challenge proof: {challenge_request.challenge_id}") - proof = challenge_request.proof - try: - proof = Proof.model_validate(proof) - except Exception as e: - logger.error(f"Invalid proof: {e}") - return False challenge: protocol.NewChallenge = self.challenges.get( challenge_request.challenge_id ) + if not challenge: logger.error(f"Challenge {challenge_request.challenge_id} not found") return False + async with db.get_db_connection(db_dir=self.db_dir) as conn: + miner_stats = await db.get_miner_stats(conn, challenge.miner_id) + + proof = challenge_request.proof + try: + proof = Proof.model_validate(proof) + except Exception as e: + logger.error(f"Invalid proof: {e}") + return False + if challenge.challenge_deadline < datetime.now(UTC).isoformat(): logger.error(f"Challenge {challenge_request.challenge_id} has expired") return False @@ -1152,6 +1217,15 @@ def verify_challenge(self, challenge_request: protocol.ProofResponse) -> bool: f"Proof verification successful for challenge {challenge.challenge_id}" ) + async with db.get_db_connection(db_dir=self.db_dir) as conn: + miner_stats["challenge_successes"] += 1 + await db.update_stats( + conn=conn, miner_uid=challenge.miner_id, stats=miner_stats + ) + + # remove challenge from memory + del self.challenges[challenge_request.challenge_id] + return True def status(self) -> str: From 0a7ac77a23097bd09d8278bb3871bb203b176560 Mon Sep 17 00:00:00 2001 From: shr1ftyy Date: Sun, 12 Jan 2025 21:40:35 +0600 Subject: [PATCH 2/3] chore: update docs + address okamoto-san's review --- docs/validator.md | 13 ++----------- storb/validator/reward.py | 2 +- storb/validator/validator.py | 2 -- 3 files changed, 3 insertions(+), 14 deletions(-) diff --git a/docs/validator.md b/docs/validator.md index cd71636..7e498db 100644 --- a/docs/validator.md +++ b/docs/validator.md @@ -2,18 +2,9 @@ Validators play a crucial role in the Storb network by serving as gateways to the storage subnet. They handle the storage and retrieval of files, ensuring data integrity and availability. -## Types of Validators - -- **Organic Validator:** Directly participates in the network by storing and retrieving files. -- **Synthetic (Challenge) Validator:** *(Coming Soon)* Designed for specific challenge scenarios where they verify that piece are stored by the designated miners. [Learn more](https://github.com/fr34kcoders/storb/pull/20) - --- -## Organic Validator - -Organic validators are the backbone of the Storb storage subnet, responsible for managing file storage and retrieval operations. - -### Testnet Setup +### Setup 1. **Activate Your Virtual Environment** @@ -43,7 +34,7 @@ Organic validators are the backbone of the Storb storage subnet, responsible for SUBTENSOR_ADDRESS= \ MIN_STAKE_THRESHOLD=-1 \ python storb/validator \ - --netuid 1 \ + --netuid 269 \ # 269 on testnet --subtensor.network \ --subtensor.address \ --wallet_name validator \ diff --git a/storb/validator/reward.py b/storb/validator/reward.py index e4dc1b2..40fe2ae 100644 --- a/storb/validator/reward.py +++ b/storb/validator/reward.py @@ -15,7 +15,7 @@ def get_response_rate_scores( Returns ------- np.ndarray - An array of rewards for the given query and responses. + Arrays of uids and rewards for the given query and responses. """ uids = [] diff --git a/storb/validator/validator.py b/storb/validator/validator.py index 562c9bf..4ebcb1d 100644 --- a/storb/validator/validator.py +++ b/storb/validator/validator.py @@ -350,7 +350,6 @@ def sync_metagraph(self): self.ret_latency_scores = new_ret_latency_scores self.final_latency_scores = new_final_latency_scores self.challenge_scores = new_challenge_scores - # TODO: use debug for these logger.debug(f"(len: {len(self.scores)}) New scores: {self.scores}") logger.debug( f"(len: {len(self.store_latencies)}) New store latencies: {self.store_latencies}" @@ -813,7 +812,6 @@ async def forward(self): latency_scores.max() if latency_scores.max() != 0 else 1 ) - # TODO: this should also take the "pdp challenge score" into account rewards = ( 0.2 * self.final_latency_scores + 0.3 * response_rate_scores From 43fc2ee40cfd7e07fa1e8e6dee124803a1fc66c1 Mon Sep 17 00:00:00 2001 From: Ray Okamoto Date: Mon, 13 Jan 2025 19:57:49 +1030 Subject: [PATCH 3/3] docs: update docs --- docs/miner.md | 2 +- docs/validator.md | 2 -- storb/validator/reward.py | 4 ++-- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/docs/miner.md b/docs/miner.md index 82017ab..a77f1fe 100644 --- a/docs/miner.md +++ b/docs/miner.md @@ -26,7 +26,7 @@ Follow these steps to run a miner on the Storb Testnet: SUBTENSOR_ADDRESS= \ MIN_STAKE_THRESHOLD=-1 \ python storb/miner \ - --netuid 1 \ + --netuid 269 \ # 269 on testnet --subtensor.network \ --subtensor.address \ --wallet_name miner \ diff --git a/docs/validator.md b/docs/validator.md index 7e498db..8e0fa9b 100644 --- a/docs/validator.md +++ b/docs/validator.md @@ -2,8 +2,6 @@ Validators play a crucial role in the Storb network by serving as gateways to the storage subnet. They handle the storage and retrieval of files, ensuring data integrity and availability. ---- - ### Setup 1. **Activate Your Virtual Environment** diff --git a/storb/validator/reward.py b/storb/validator/reward.py index 40fe2ae..be68e47 100644 --- a/storb/validator/reward.py +++ b/storb/validator/reward.py @@ -54,8 +54,8 @@ def get_challenge_scores( Returns ------- - np.ndarray - An array of rewards for the given query and responses. + tuple[np.ndarray, np.ndarray] + A tuple of arrays, corresponding to the given query and responses. """ uids = []