Skip to content

Commit

Permalink
Incorporate challenge into scoring (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shr1ftyy authored Jan 13, 2025
2 parents 5f3c581 + 43fc2ee commit 2e68833
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 30 deletions.
2 changes: 1 addition & 1 deletion docs/miner.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Follow these steps to run a miner on the Storb Testnet:
SUBTENSOR_ADDRESS=<subtensor_address> \
MIN_STAKE_THRESHOLD=-1 \
python storb/miner \
--netuid 1 \
--netuid 269 \ # 269 on testnet
--subtensor.network <subtensor_network> \
--subtensor.address <subtensor_address> \
--wallet_name miner \
Expand Down
15 changes: 2 additions & 13 deletions docs/validator.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,7 @@

Validators play a crucial role in the Storb network by serving as gateways to the storage subnet. They handle the storage and retrieval of files, ensuring data integrity and availability.

## Types of Validators

- **Organic Validator:** Directly participates in the network by storing and retrieving files.
- **Synthetic (Challenge) Validator:** *(Coming Soon)* Designed for specific challenge scenarios where they verify that piece are stored by the designated miners. [Learn more](https://github.com/fr34kcoders/storb/pull/20)

---

## Organic Validator

Organic validators are the backbone of the Storb storage subnet, responsible for managing file storage and retrieval operations.

### Testnet Setup
### Setup

1. **Activate Your Virtual Environment**

Expand Down Expand Up @@ -43,7 +32,7 @@ Organic validators are the backbone of the Storb storage subnet, responsible for
SUBTENSOR_ADDRESS=<subtensor_address> \
MIN_STAKE_THRESHOLD=-1 \
python storb/validator \
--netuid 1 \
--netuid 269 \ # 269 on testnet
--subtensor.network <subtensor_network> \
--subtensor.address <subtensor_address> \
--wallet_name validator \
Expand Down
1 change: 1 addition & 0 deletions storb/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class NewChallenge(BaseModel):
challenge_id: str
piece_id: str
validator_id: int
miner_id: int
challenge_deadline: str
public_key: int
public_exponent: int
Expand Down
39 changes: 38 additions & 1 deletion storb/validator/reward.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def get_response_rate_scores(
Returns
-------
np.ndarray
An array of rewards for the given query and responses.
Arrays of uids and rewards for the given query and responses.
"""

uids = []
Expand All @@ -39,3 +39,40 @@ def get_response_rate_scores(
scores = np.asarray(weighted_rate_sums) / sum_max

return uids, scores


def get_challenge_scores(
self,
miner_stats: dict,
) -> tuple[np.ndarray, np.ndarray]:
"""Returns an array of scores for the given miner stats
Parameters
----------
miner_stats : dict
A dictionary of miner statistics
Returns
-------
tuple[np.ndarray, np.ndarray]
A tuple of arrays, corresponding to the given query and responses.
"""

uids = []
success_rates = []

uids_filter = list(range(len(self.metagraph.nodes)))
for uid, miner_stats in miner_stats.items():
if uid not in uids_filter:
continue
uids.append(uid)
challenge_attempts = max(miner_stats.get("challenge_attempts", 1), 1)
success_rate = abs(
miner_stats.get("challenge_successes", 0) / challenge_attempts
)
success_rates.append(success_rate)

uids = np.array(uids)
scores = np.asarray(success_rates) / max(*success_rates)

return uids, scores
102 changes: 87 additions & 15 deletions storb/validator/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
process_response,
)
from storb.util.uids import get_random_hotkeys
from storb.validator.reward import get_response_rate_scores
from storb.validator.reward import get_challenge_scores, get_response_rate_scores

logger = get_logger(__name__)

Expand Down Expand Up @@ -119,6 +119,7 @@ def __init__(self):
self.final_latency_scores = np.zeros(
len(self.metagraph.nodes), dtype=np.float32
)
self.challenge_scores = np.zeros(len(self.metagraph.nodes), dtype=np.float32)

# Initialize Challenge dictionary to store challenges sent to miners
self.challenges: dict[str, protocol.NewChallenge] = {}
Expand Down Expand Up @@ -299,6 +300,7 @@ def sync_metagraph(self):
new_store_latency_scores = np.zeros((len(self.metagraph.nodes)))
new_ret_latency_scores = np.zeros((len(self.metagraph.nodes)))
new_final_latency_scores = np.zeros((len(self.metagraph.nodes)))
new_challenge_scores = np.zeros((len(self.metagraph.nodes)))
min_len = min(len(old_hotkeys), len(self.scores))

len_store_latencies = min(
Expand All @@ -316,6 +318,9 @@ def sync_metagraph(self):
len_final_latency_scores = min(
len(old_hotkeys), len(self.final_latency_scores)
)
len_challenge_scores = min(
len(old_hotkeys), len(self.challenge_scores)
)

new_moving_average[:min_len] = self.scores[:min_len]
new_store_latencies[:len_store_latencies] = self.store_latencies[
Expand All @@ -333,6 +338,9 @@ def sync_metagraph(self):
new_final_latency_scores[:len_final_latency_scores] = (
self.final_latency_scores[:len_final_latency_scores]
)
new_challenge_scores[:len_challenge_scores] = self.challenge_scores[
:len_challenge_scores
]

self.scores = new_moving_average

Expand All @@ -341,7 +349,7 @@ def sync_metagraph(self):
self.store_latency_scores = new_store_latency_scores
self.ret_latency_scores = new_ret_latency_scores
self.final_latency_scores = new_final_latency_scores
# TODO: use debug for these
self.challenge_scores = new_challenge_scores
logger.debug(f"(len: {len(self.scores)}) New scores: {self.scores}")
logger.debug(
f"(len: {len(self.store_latencies)}) New store latencies: {self.store_latencies}"
Expand All @@ -358,6 +366,9 @@ def sync_metagraph(self):
logger.debug(
f"(len: {len(self.final_latency_scores)}) New latency scores: {self.final_latency_scores}"
)
logger.debug(
f"(len: {len(self.challenge_scores)}) New challenge scores: {self.challenge_scores}"
)

except Exception as e:
logger.error(f"Failed to sync metagraph: {str(e)}")
Expand Down Expand Up @@ -654,6 +665,7 @@ async def challenge_miner(self, miner_id: int, piece_id: str, tag: str):
challenge_id=uuid.uuid4().hex,
piece_id=piece_id,
validator_id=self.uid,
miner_id=miner_id,
challenge_deadline=challenge_deadline,
public_key=self.challenge.key.rsa.public_key().public_numbers().n,
public_exponent=self.challenge.key.rsa.public_key().public_numbers().e,
Expand All @@ -676,6 +688,16 @@ async def challenge_miner(self, miner_id: int, piece_id: str, tag: str):
logger.info(
f"Sent challenge {challenge_message.challenge_id} to miner {miner_id} for piece {piece_id}"
)

async with db.get_db_connection(db_dir=self.db_dir) as conn:
miner_stats = await db.get_miner_stats(
conn=conn, miner_uid=challenge_message.miner_id
)
miner_stats["challenge_attempts"] += 1
await db.update_stats(
conn=conn, miner_uid=challenge_message.miner_id, stats=miner_stats
)

logger.debug(f"PRF KEY: {payload.data.challenge.prf_key}")
_, response = await self.query_miner(
miner_hotkey, "/challenge", payload, method="POST"
Expand Down Expand Up @@ -703,11 +725,12 @@ async def forward(self):
- Updating the scores
"""

# TODO: challenge miners - based on: https://dl.acm.org/doi/10.1145/1315245.1315318

# TODO: should we lock the db when scoring?
# scoring

# remove expired challenges
await self.remove_expired_challenges()

# scoring
# obtain all miner stats from the validator database
async with db.get_db_connection(self.db_dir) as conn:
miner_stats = await db.get_all_miner_stats(conn)
Expand All @@ -725,6 +748,11 @@ async def forward(self):
self, miner_stats
)

challenge_uids, self.challenge_scores = get_challenge_scores(self, miner_stats)

# TODO: error handlin'?
assert (response_rate_uids == challenge_uids).all()

if (
len(self.store_latencies) < len(response_rate_scores)
or len(self.retrieve_latencies) < len(response_rate_scores)
Expand All @@ -737,10 +765,12 @@ async def forward(self):
new_store_latency_scores = np.zeros(new_len)
new_ret_latency_scores = np.zeros(new_len)
new_final_latency_scores = np.zeros(new_len)
new_challenge_scores = np.zeros(new_len)
new_scores = np.zeros(new_len)

len_lat = len(self.store_latencies)
len_lat_scores = len(self.store_latency_scores)
len_challenge_scores = len(self.challenge_scores)

with self.scores_lock:
len_scores = len(self.scores)
Expand All @@ -756,6 +786,9 @@ async def forward(self):
new_final_latency_scores[:len_lat_scores] = self.final_latency_scores[
:len_lat_scores
]
new_challenge_scores[:len_challenge_scores] = self.challenge_scores[
:len_challenge_scores
]
new_scores[:len_scores] = self.scores[:len_scores]

self.store_latencies = new_store_latencies
Expand All @@ -769,7 +802,7 @@ async def forward(self):
self.final_latency_scores = np.nan_to_num(
new_final_latency_scores, nan=0.0
)

self.challenge_scores = np.nan_to_num(new_challenge_scores, nan=0.0)
self.scores = new_scores

latency_scores = (
Expand All @@ -779,8 +812,11 @@ async def forward(self):
latency_scores.max() if latency_scores.max() != 0 else 1
)

# TODO: this should also take the "pdp challenge score" into account
rewards = 0.2 * self.final_latency_scores + 0.3 * response_rate_scores
rewards = (
0.2 * self.final_latency_scores
+ 0.3 * response_rate_scores
+ 0.5 * self.challenge_scores
)

logger.info(f"response rate scores: {response_rate_scores}")
logger.info(f"moving avg. store latencies: {self.store_latencies}")
Expand All @@ -790,6 +826,7 @@ async def forward(self):
f"moving avg. retrieve latency scores: {self.retrieve_latency_scores}"
)
logger.info(f"moving avg. latency scores: {self.final_latency_scores}")
logger.info(f"challenge rate scores: {self.challenge_scores}")

self.update_scores(rewards, response_rate_uids)

Expand Down Expand Up @@ -1093,9 +1130,30 @@ async def handle_batch_requests():
piece_hashes=piece_hashes, processed_pieces=processed_pieces
)

async def remove_expired_challenges(self):
"""
Remove expired challenges from the `self.challenges` dictionary
Returns:
None
"""
now = datetime.now(UTC).isoformat()
keys_to_remove = []

for key, challenge in self.challenges.items():
if challenge.challenge_deadline < now:
keys_to_remove.append(key)

if not keys_to_remove:
return

for key in keys_to_remove:
challenge = self.challenges[key]
del self.challenges[key]

"""API Routes"""

def verify_challenge(self, challenge_request: protocol.ProofResponse) -> bool:
async def verify_challenge(self, challenge_request: protocol.ProofResponse) -> bool:
"""Verify the challenge proof from the miner
Parameters
Expand All @@ -1110,20 +1168,25 @@ def verify_challenge(self, challenge_request: protocol.ProofResponse) -> bool:
"""

logger.debug(f"Verifying challenge proof: {challenge_request.challenge_id}")
proof = challenge_request.proof
try:
proof = Proof.model_validate(proof)
except Exception as e:
logger.error(f"Invalid proof: {e}")
return False

challenge: protocol.NewChallenge = self.challenges.get(
challenge_request.challenge_id
)

if not challenge:
logger.error(f"Challenge {challenge_request.challenge_id} not found")
return False

async with db.get_db_connection(db_dir=self.db_dir) as conn:
miner_stats = await db.get_miner_stats(conn, challenge.miner_id)

proof = challenge_request.proof
try:
proof = Proof.model_validate(proof)
except Exception as e:
logger.error(f"Invalid proof: {e}")
return False

if challenge.challenge_deadline < datetime.now(UTC).isoformat():
logger.error(f"Challenge {challenge_request.challenge_id} has expired")
return False
Expand Down Expand Up @@ -1152,6 +1215,15 @@ def verify_challenge(self, challenge_request: protocol.ProofResponse) -> bool:
f"Proof verification successful for challenge {challenge.challenge_id}"
)

async with db.get_db_connection(db_dir=self.db_dir) as conn:
miner_stats["challenge_successes"] += 1
await db.update_stats(
conn=conn, miner_uid=challenge.miner_id, stats=miner_stats
)

# remove challenge from memory
del self.challenges[challenge_request.challenge_id]

return True

def status(self) -> str:
Expand Down

0 comments on commit 2e68833

Please sign in to comment.