From 4705f30ec801d7cff1d2beef145b3b383dc3e9ed Mon Sep 17 00:00:00 2001 From: Phillip Jensen Date: Thu, 22 Feb 2024 21:12:43 +0100 Subject: [PATCH] Add uptime tracking --- requirements.pip | 2 +- .../api2/migrations/0013_nodestatushistory.py | 23 + .../migrations/0014_node_uptime_created_at.py | 20 + stats-backend/api2/models.py | 17 +- stats-backend/api2/scanner.py | 411 ++++++++++++++++++ stats-backend/api2/scoring.py | 29 ++ stats-backend/api2/serializers.py | 32 +- stats-backend/api2/tasks.py | 217 ++------- stats-backend/api2/urls.py | 1 + stats-backend/api2/views.py | 101 +++++ stats-backend/api2/yapapi_utils.py | 111 +++++ stats-backend/core/celery.py | 16 +- .../examples/low-level-api/v2/list-offers.py | 3 +- 13 files changed, 770 insertions(+), 213 deletions(-) create mode 100644 stats-backend/api2/migrations/0013_nodestatushistory.py create mode 100644 stats-backend/api2/migrations/0014_node_uptime_created_at.py create mode 100644 stats-backend/api2/scanner.py create mode 100644 stats-backend/api2/scoring.py create mode 100644 stats-backend/api2/yapapi_utils.py diff --git a/requirements.pip b/requirements.pip index 87e7ef3..fc4c5a5 100644 --- a/requirements.pip +++ b/requirements.pip @@ -86,4 +86,4 @@ eth-account eth-tester PyJWT djangorestframework-simplejwt -hcloud \ No newline at end of file +django-ninja \ No newline at end of file diff --git a/stats-backend/api2/migrations/0013_nodestatushistory.py b/stats-backend/api2/migrations/0013_nodestatushistory.py new file mode 100644 index 0000000..4328164 --- /dev/null +++ b/stats-backend/api2/migrations/0013_nodestatushistory.py @@ -0,0 +1,23 @@ +# Generated by Django 4.1.7 on 2024-02-20 23:05 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('api2', '0012_offer_cheaper_than_offer_times_cheaper'), + ] + + operations = [ + migrations.CreateModel( + name='NodeStatusHistory', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('is_online', models.BooleanField()), + ('timestamp', models.DateTimeField(auto_now_add=True)), + ('provider', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='api2.node')), + ], + ), + ] diff --git a/stats-backend/api2/migrations/0014_node_uptime_created_at.py b/stats-backend/api2/migrations/0014_node_uptime_created_at.py new file mode 100644 index 0000000..9a3169e --- /dev/null +++ b/stats-backend/api2/migrations/0014_node_uptime_created_at.py @@ -0,0 +1,20 @@ +# Generated by Django 4.1.7 on 2024-02-21 08:17 + +from django.db import migrations, models +import django.utils.timezone + + +class Migration(migrations.Migration): + + dependencies = [ + ('api2', '0013_nodestatushistory'), + ] + + operations = [ + migrations.AddField( + model_name='node', + name='uptime_created_at', + field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now), + preserve_default=False, + ), + ] diff --git a/stats-backend/api2/models.py b/stats-backend/api2/models.py index 4423550..2d6912f 100644 --- a/stats-backend/api2/models.py +++ b/stats-backend/api2/models.py @@ -15,6 +15,7 @@ class Node(models.Model): version = models.CharField(max_length=7) updated_at = models.DateTimeField(auto_now=True) created_at = models.DateTimeField(auto_now_add=True) + uptime_created_at = models.DateTimeField(auto_now_add=True) class EC2Instance(models.Model): @@ -26,6 +27,7 @@ class EC2Instance(models.Model): def __str__(self): return self.name + class Offer(models.Model): properties = models.JSONField(null=True) runtime = models.CharField(max_length=42) @@ -35,11 +37,16 @@ class Offer(models.Model): monthly_price_glm = models.FloatField(null=True, blank=True) monthly_price_usd = models.FloatField(null=True, blank=True) is_overpriced = models.BooleanField(default=False) - overpriced_compared_to = models.ForeignKey(EC2Instance, on_delete=models.CASCADE, null=True) + overpriced_compared_to = models.ForeignKey( + EC2Instance, on_delete=models.CASCADE, null=True + ) suggest_env_per_hour_price = models.FloatField(null=True) times_more_expensive = models.FloatField(null=True) - cheaper_than = models.ForeignKey(EC2Instance, on_delete=models.CASCADE, null=True, related_name='cheaper_offers') + cheaper_than = models.ForeignKey( + EC2Instance, on_delete=models.CASCADE, null=True, related_name="cheaper_offers" + ) times_cheaper = models.FloatField(null=True) + class Meta: unique_together = ( "runtime", @@ -59,4 +66,10 @@ class GLM(models.Model): current_price = models.FloatField(null=True) +class NodeStatusHistory(models.Model): + provider = models.ForeignKey(Node, on_delete=models.CASCADE) + is_online = models.BooleanField() + timestamp = models.DateTimeField(auto_now_add=True) + def __str__(self): + return f"{self.provider.node_id} - {'Online' if self.is_online else 'Offline'} at {self.timestamp}" diff --git a/stats-backend/api2/scanner.py b/stats-backend/api2/scanner.py new file mode 100644 index 0000000..e33f4a9 --- /dev/null +++ b/stats-backend/api2/scanner.py @@ -0,0 +1,411 @@ +#!/usr/bin/env python3 +import asyncio +import csv +import json +import pathlib +import sys +import subprocess +from datetime import datetime, timedelta +from django.utils import timezone +from .models import Node, NodeStatusHistory, GLM, EC2Instance, Offer +from asgiref.sync import sync_to_async +from yapapi import props as yp +from yapapi.config import ApiConfig +from yapapi.log import enable_default_logger +from yapapi.props.builder import DemandBuilder +from yapapi.rest import Configuration, Market +from core.celery import app +from django.db.models import Q +from django.db.models import Case, When, Value, F +from django.db.models.functions import Abs +from django.db import transaction +import calendar + + +@app.task(queue="yagna", options={"queue": "yagna", "routing_key": "yagna"}) +def update_providers_info(node_props): + unique_providers = set() # Initialize a set to track unique providers + now = timezone.now() + days_in_current_month = calendar.monthrange(now.year, now.month)[1] + seconds_current_month = days_in_current_month * 24 * 60 * 60 + glm_usd_value = GLM.objects.get(id=1) + print(f"Updating {len(node_props)} providers") + for prop in node_props: + data = json.loads(prop) + provider_id = data["node_id"] + wallet = data["wallet"] + unique_providers.add(provider_id) # Add provider to the set + obj, created = Node.objects.get_or_create(node_id=provider_id) + if created: + print(f"Created new provider: {prop}") + offerobj = Offer.objects.create( + properties=data, provider=obj, runtime=data["golem.runtime.name"] + ) + if data["golem.runtime.name"] == "vm": + vectors = {} + for key, value in enumerate(data["golem.com.usage.vector"]): + vectors[value] = key + monthly_pricing = ( + ( + data["golem.com.pricing.model.linear.coeffs"][ + vectors["golem.usage.duration_sec"] + ] + * seconds_current_month + ) + + ( + data["golem.com.pricing.model.linear.coeffs"][ + vectors["golem.usage.cpu_sec"] + ] + * seconds_current_month + * data["golem.inf.cpu.threads"] + ) + + data["golem.com.pricing.model.linear.coeffs"][-1] + ) + if not monthly_pricing: + print(f"Monthly price is {monthly_pricing}") + offerobj.monthly_price_glm = monthly_pricing + offerobj.monthly_price_usd = ( + monthly_pricing * glm_usd_value.current_price + ) + vcpu_needed = data.get("golem.inf.cpu.threads", 0) + memory_needed = data.get("golem.inf.mem.gib", 0.0) + closest_ec2 = ( + EC2Instance.objects.annotate( + cpu_diff=Abs(F("vcpu") - vcpu_needed), + memory_diff=Abs(F("memory") - memory_needed), + ) + .order_by("cpu_diff", "memory_diff", "price_usd") + .first() + ) + + # Compare and update the Offer object + if closest_ec2 and monthly_pricing: + offer_price_usd = monthly_pricing * glm_usd_value.current_price + ec2_monthly_price = closest_ec2.price_usd * 730 + + offer_is_more_expensive = offer_price_usd > ec2_monthly_price + offer_is_cheaper = offer_price_usd < ec2_monthly_price + + # Update Offer object fields for expensive comparison + offerobj.is_overpriced = offer_is_more_expensive + offerobj.overpriced_compared_to = ( + closest_ec2 if offer_is_more_expensive else None + ) + offerobj.times_more_expensive = ( + offer_price_usd / float(ec2_monthly_price) + if offer_is_more_expensive + else None + ) + + # Update Offer object fields for cheaper comparison + offerobj.cheaper_than = closest_ec2 if offer_is_cheaper else None + offerobj.times_cheaper = ( + float(ec2_monthly_price) / offer_price_usd + if offer_is_cheaper + else None + ) + + else: + # print( + # "No matching EC2Instance found or monthly pricing is not available." + # ) + offerobj.is_overpriced = False + offerobj.overpriced_compared_to = None + offerobj.save() + obj.wallet = wallet + # Verify each node's status + is_online = check_node_status(obj.node_id) + + obj.online = is_online + obj.save() + else: + offerobj, offercreated = Offer.objects.get_or_create( + provider=obj, runtime=data["golem.runtime.name"] + ) + if data["golem.runtime.name"] == "vm": + vectors = {} + for key, value in enumerate(data["golem.com.usage.vector"]): + vectors[value] = key + monthly_pricing = ( + ( + data["golem.com.pricing.model.linear.coeffs"][ + vectors["golem.usage.duration_sec"] + ] + * seconds_current_month + ) + + ( + data["golem.com.pricing.model.linear.coeffs"][ + vectors["golem.usage.cpu_sec"] + ] + * seconds_current_month + * data["golem.inf.cpu.threads"] + ) + + data["golem.com.pricing.model.linear.coeffs"][-1] + ) + if not monthly_pricing: + print(f"Monthly price is {monthly_pricing}") + offerobj.monthly_price_glm = monthly_pricing + offerobj.monthly_price_usd = ( + monthly_pricing * glm_usd_value.current_price + ) + + vcpu_needed = data.get("golem.inf.cpu.threads", 0) + memory_needed = data.get("golem.inf.mem.gib", 0.0) + closest_ec2 = ( + EC2Instance.objects.annotate( + cpu_diff=Abs(F("vcpu") - vcpu_needed), + memory_diff=Abs(F("memory") - memory_needed), + ) + .order_by("cpu_diff", "memory_diff", "price_usd") + .first() + ) + + # Compare and update the Offer object + if closest_ec2 and monthly_pricing: + offer_price_usd = monthly_pricing * glm_usd_value.current_price + ec2_monthly_price = closest_ec2.price_usd * 730 + + offer_is_more_expensive = offer_price_usd > ec2_monthly_price + offer_is_cheaper = offer_price_usd < ec2_monthly_price + + # Update Offer object fields for expensive comparison + offerobj.is_overpriced = offer_is_more_expensive + offerobj.overpriced_compared_to = ( + closest_ec2 if offer_is_more_expensive else None + ) + offerobj.times_more_expensive = ( + offer_price_usd / float(ec2_monthly_price) + if offer_is_more_expensive + else None + ) + + # Update Offer object fields for cheaper comparison + offerobj.cheaper_than = closest_ec2 if offer_is_cheaper else None + offerobj.times_cheaper = ( + float(ec2_monthly_price) / offer_price_usd + if offer_is_cheaper + else None + ) + + else: + # print( + # "No matching EC2Instance found or monthly pricing is not available." + # ) + offerobj.is_overpriced = False + offerobj.overpriced_compared_to = None + + offerobj.properties = data + offerobj.save() + obj.runtime = data["golem.runtime.name"] + obj.wallet = wallet + # Verify each node's status + is_online = check_node_status(obj.node_id) + obj.online = is_online + obj.save() + + online_nodes = Node.objects.filter(online=True) + for node in online_nodes: + if not node.node_id in unique_providers: + command = f"yagna net find {node.node_id}" + result = subprocess.run(command, shell=True, capture_output=True, text=True) + is_online = "Exiting..., error details: Request failed" not in result.stderr + node.online = is_online + node.computing_now = False + node.save(update_fields=["online", "computing_now"]) + print(f"Done updating {len(unique_providers)} providers") + + +TESTNET_KEYS = [ + "golem.com.payment.platform.erc20-goerli-tglm.address", + "golem.com.payment.platform.erc20-mumbai-tglm.address", + "golem.com.payment.platform.erc20-holesky-tglm.address", + "golem.com.payment.platform.erc20next-goerli-tglm.address", + "golem.com.payment.platform.erc20next-mumbai-tglm.address", + "golem.com.payment.platform.erc20next-holesky-tglm.address", +] + +examples_dir = pathlib.Path(__file__).resolve().parent.parent +sys.path.append(str(examples_dir)) +from .yapapi_utils import build_parser, print_env_info, format_usage # noqa: E402 + +import redis + + +def update_nodes_status(provider_id, is_online_now): + provider, created = Node.objects.get_or_create(node_id=provider_id) + + # Check the last status in the NodeStatusHistory + last_status = NodeStatusHistory.objects.filter(provider=provider).last() + + if not last_status or last_status.is_online != is_online_now: + # Create a new status entry if there's a change in status + NodeStatusHistory.objects.create(provider=provider, is_online=is_online_now) + + +@app.task(queue="yagna", options={"queue": "yagna", "routing_key": "yagna"}) +def update_nodes_data(node_props): + r = redis.Redis(host="redis", port=6379, db=0) + + for props in node_props: + props = json.loads(props) + issuer_id = props["node_id"] + is_online_now = check_node_status(issuer_id) + try: + update_nodes_status(issuer_id, is_online_now) + r.set(f"provider:{issuer_id}:status", str(is_online_now)) + except Exception as e: + print(f"Error updating NodeStatus for {issuer_id}: {e}") + print(f"Done updating {len(node_props)} providers") + # Deserialize each element in node_props into a dictionary + deserialized_node_props = [json.loads(props) for props in node_props] + + # Now create the set + provider_ids_in_props = {props["node_id"] for props in deserialized_node_props} + previously_online_providers_ids = ( + Node.objects.filter(nodestatushistory__is_online=True) + .distinct() + .values_list("node_id", flat=True) + ) + + provider_ids_not_in_scan = ( + set(previously_online_providers_ids) - provider_ids_in_props + ) + + for issuer_id in provider_ids_not_in_scan: + is_online_now = check_node_status(issuer_id) + + try: + update_nodes_status(issuer_id, is_online_now) + r.set(f"provider:{issuer_id}:status", str(is_online_now)) + except Exception as e: + print(f"Error verifying/updating NodeStatus for {issuer_id}: {e}") + print(f"Done updating {len(provider_ids_not_in_scan)} OFFLINE providers") + + +def check_node_status(issuer_id): + try: + process = subprocess.run( + ["yagna", "net", "find", issuer_id], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=5, # 5-second timeout for the subprocess + ) + + # Process finished, return True if it was successful and "seen:" is in the output + return process.returncode == 0 and "seen:" in process.stdout.decode() + except subprocess.TimeoutExpired as e: + print("Timeout reached while checking node status", e) + return False + except Exception as e: + print(f"Unexpected error checking node status: {e}") + return False + + +async def list_offers( + conf: Configuration, subnet_tag: str, current_scan_providers, node_props +): + async with conf.market() as client: + market_api = Market(client) + dbuild = DemandBuilder() + dbuild.add(yp.NodeInfo(name="some scanning node", subnet_tag=subnet_tag)) + dbuild.add(yp.Activity(expiration=datetime.now(timezone.utc))) + + async with market_api.subscribe( + dbuild.properties, dbuild.constraints + ) as subscription: + async for event in subscription.events(): + data = event.props + if not event.issuer in current_scan_providers: + current_scan_providers.add(event.issuer) + if "golem.com.payment.platform.zksync-mainnet-glm.address" in str( + event.props + ): + data["wallet"] = event.props[ + "golem.com.payment.platform.zksync-mainnet-glm.address" + ] + elif "golem.com.payment.platform.zksync-rinkeby-tglm.address" in str( + event.props + ): + data["wallet"] = event.props[ + "golem.com.payment.platform.zksync-rinkeby-tglm.address" + ] + elif "golem.com.payment.platform.erc20-mainnet-glm.address" in str( + event.props + ): + data["wallet"] = event.props[ + "golem.com.payment.platform.erc20-mainnet-glm.address" + ] + elif "golem.com.payment.platform.erc20-polygon-glm.address" in str( + event.props + ): + data["wallet"] = event.props[ + "golem.com.payment.platform.erc20-polygon-glm.address" + ] + elif "golem.com.payment.platform.erc20-goerli-tglm.address" in str( + event.props + ): + data["wallet"] = event.props[ + "golem.com.payment.platform.erc20-goerli-tglm.address" + ] + elif "golem.com.payment.platform.erc20-rinkeby-tglm.address" in str( + event.props + ): + data["wallet"] = event.props[ + "golem.com.payment.platform.erc20-rinkeby-tglm.address" + ] + elif "golem.com.payment.platform.polygon-polygon-glm.address" in str( + event.props + ): + data["wallet"] = event.props[ + "golem.com.payment.platform.polygon-polygon-glm.address" + ] + elif "golem.com.payment.platform.erc20next-mainnet-glm.address" in str( + event.props + ): + data["wallet"] = event.props[ + "golem.com.payment.platform.erc20next-mainnet-glm.address" + ] + elif "golem.com.payment.platform.erc20next-polygon-glm.address" in str( + event.props + ): + data["wallet"] = event.props[ + "golem.com.payment.platform.erc20next-polygon-glm.address" + ] + elif "golem.com.payment.platform.erc20next-goerli-tglm.address" in str( + event.props + ): + data["wallet"] = event.props[ + "golem.com.payment.platform.erc20next-goerli-tglm.address" + ] + elif "golem.com.payment.platform.erc20next-rinkeby-tglm.address" in str( + event.props + ): + data["wallet"] = event.props[ + "golem.com.payment.platform.erc20next-rinkeby-tglm.address" + ] + data["node_id"] = event.issuer + node_props.append(json.dumps(data)) + + +async def monitor_nodes_status(subnet_tag: str = "public"): + node_props = [] + current_scan_providers = set() + + # Call list_offers with a timeout + try: + await asyncio.wait_for( + list_offers( + Configuration(api_config=ApiConfig()), + subnet_tag=subnet_tag, + node_props=node_props, + current_scan_providers=current_scan_providers, + ), + timeout=60, # 30-second timeout for each scan + ) + except asyncio.TimeoutError: + print("Scan timeout reached") + print(f"In the current scan, we found {len(current_scan_providers)} providers") + # Delay update_nodes_data call using Celery + + update_providers_info.delay(node_props) + update_nodes_data.delay(node_props) diff --git a/stats-backend/api2/scoring.py b/stats-backend/api2/scoring.py new file mode 100644 index 0000000..e45f3b7 --- /dev/null +++ b/stats-backend/api2/scoring.py @@ -0,0 +1,29 @@ +from .models import Node, Offer, EC2Instance, NodeStatusHistory +from datetime import timedelta +from django.utils import timezone + + +def calculate_uptime_percentage(node_id, node=None): + if node is None: + node = Node.objects.get(node_id=node_id) + statuses = NodeStatusHistory.objects.filter(provider=node).order_by("timestamp") + + online_duration = timedelta(0) + last_online_time = None + + for status in statuses: + if status.is_online: + last_online_time = status.timestamp + elif last_online_time: + online_duration += status.timestamp - last_online_time + last_online_time = None + + # Check if the node is currently online and add the duration + if last_online_time is not None: + online_duration += timezone.now() - last_online_time + + total_duration = timezone.now() - node.uptime_created_at + uptime_percentage = ( + online_duration.total_seconds() / total_duration.total_seconds() + ) * 100 + return uptime_percentage diff --git a/stats-backend/api2/serializers.py b/stats-backend/api2/serializers.py index c6e6bf7..8ab1755 100644 --- a/stats-backend/api2/serializers.py +++ b/stats-backend/api2/serializers.py @@ -1,30 +1,45 @@ from rest_framework import serializers -from .models import Node, Offer, EC2Instance +from .models import Node, Offer, EC2Instance, NodeStatusHistory +from .scoring import calculate_uptime_percentage + class EC2InstanceSerializer(serializers.ModelSerializer): class Meta: model = EC2Instance - fields = '__all__' + fields = "__all__" + class OfferSerializer(serializers.ModelSerializer): overpriced_compared_to = EC2InstanceSerializer(read_only=True) - cheaper_than = EC2InstanceSerializer(read_only=True) # Serialize the cheaper_than field + cheaper_than = EC2InstanceSerializer( + read_only=True + ) # Serialize the cheaper_than field class Meta: model = Offer fields = [ - "runtime", "monthly_price_glm", "properties", - "updated_at", "monthly_price_usd", "is_overpriced", - "overpriced_compared_to", "suggest_env_per_hour_price", "times_more_expensive", - "cheaper_than", "times_cheaper", + "runtime", + "monthly_price_glm", + "properties", + "updated_at", + "monthly_price_usd", + "is_overpriced", + "overpriced_compared_to", + "suggest_env_per_hour_price", + "times_more_expensive", + "cheaper_than", + "times_cheaper", ] + class NodeSerializer(serializers.ModelSerializer): runtimes = serializers.SerializerMethodField("get_offers") + uptime = serializers.SerializerMethodField("get_uptime") class Meta: model = Node fields = [ + "uptime", "earnings_total", "node_id", "online", @@ -39,3 +54,6 @@ class Meta: def get_offers(self, node): offers = Offer.objects.filter(provider=node) return {offer.runtime: OfferSerializer(offer).data for offer in offers} + + def get_uptime(self, node): + return calculate_uptime_percentage(node.node_id, node) diff --git a/stats-backend/api2/tasks.py b/stats-backend/api2/tasks.py index 67fa609..7e9436e 100644 --- a/stats-backend/api2/tasks.py +++ b/stats-backend/api2/tasks.py @@ -16,7 +16,13 @@ from django.db.models import F from django.db.models.functions import Abs from decimal import Decimal -from .utils import get_pricing, get_ec2_products, find_cheapest_price, has_vcpu_memory, round_to_three_decimals +from .utils import ( + get_pricing, + get_ec2_products, + find_cheapest_price, + has_vcpu_memory, + round_to_three_decimals, +) pool = redis.ConnectionPool(host="redis", port=6379, db=0) r = redis.Redis(connection_pool=pool) @@ -332,7 +338,7 @@ def get_current_glm_price(): response = requests.get(url) if response.status_code == 200: data = response.json() - price = str(data['market_data']['current_price']['usd'])[0:5] + price = str(data["market_data"]["current_price"]["usd"])[0:5] obj, created = GLM.objects.get_or_create(id=1) obj.current_price = price obj.save() @@ -340,182 +346,14 @@ def get_current_glm_price(): print("Failed to retrieve data") -@app.task -def v2_offer_scraper(): - os.chdir("/stats-backend/yapapi/examples/low-level-api/v2") - with open("data.config") as f: - for line in f: - command = line - proc = subprocess.Popen(command, shell=True) - proc.wait() - content = r.get("offers_v2") - serialized = json.loads(content) - now = datetime.datetime.now() - days_in_current_month = calendar.monthrange(now.year, now.month)[1] - seconds_current_month = days_in_current_month * 24 * 60 * 60 - glm_usd_value = GLM.objects.get(id=1) - for line in serialized: - data = json.loads(line) - provider = data["id"] - wallet = data["wallet"] - obj, created = Node.objects.get_or_create(node_id=provider) - if created: - offerobj = Offer.objects.create( - properties=data, provider=obj, runtime=data["golem.runtime.name"] - ) - if data["golem.runtime.name"] == "vm": - vectors = {} - for key, value in enumerate(data["golem.com.usage.vector"]): - vectors[value] = key - monthly_pricing = ( - ( - data["golem.com.pricing.model.linear.coeffs"][ - vectors["golem.usage.duration_sec"] - ] - * seconds_current_month - ) - + ( - data["golem.com.pricing.model.linear.coeffs"][ - vectors["golem.usage.cpu_sec"] - ] - * seconds_current_month - * data["golem.inf.cpu.threads"] - ) - + data["golem.com.pricing.model.linear.coeffs"][-1] - ) - if not monthly_pricing: - print(f"Monthly price is {monthly_pricing}") - offerobj.monthly_price_glm = monthly_pricing - offerobj.monthly_price_usd = monthly_pricing * glm_usd_value.current_price - vcpu_needed = data.get("golem.inf.cpu.threads", 0) - memory_needed = data.get("golem.inf.mem.gib", 0.0) - closest_ec2 = EC2Instance.objects.annotate( - cpu_diff=Abs(F('vcpu') - vcpu_needed), - memory_diff=Abs(F('memory') - memory_needed) - ).order_by('cpu_diff', 'memory_diff', 'price_usd').first() - - # Compare and update the Offer object - if closest_ec2 and monthly_pricing: - offer_price_usd = monthly_pricing * glm_usd_value.current_price - ec2_monthly_price = closest_ec2.price_usd * 730 - - offer_is_more_expensive = offer_price_usd > ec2_monthly_price - offer_is_cheaper = offer_price_usd < ec2_monthly_price - - # Update Offer object fields for expensive comparison - offerobj.is_overpriced = offer_is_more_expensive - offerobj.overpriced_compared_to = closest_ec2 if offer_is_more_expensive else None - offerobj.times_more_expensive = offer_price_usd / float(ec2_monthly_price) if offer_is_more_expensive else None - - # Update Offer object fields for cheaper comparison - offerobj.cheaper_than = closest_ec2 if offer_is_cheaper else None - offerobj.times_cheaper = float(ec2_monthly_price) / offer_price_usd if offer_is_cheaper else None - - else: - print("No matching EC2Instance found or monthly pricing is not available.") - offerobj.is_overpriced = False - offerobj.overpriced_compared_to = None - offerobj.save() - obj.wallet = wallet - # Verify each node's status - command = f"yagna net find {provider}" - result = subprocess.run(command, shell=True, capture_output=True, text=True) - is_online = "Exiting..., error details: Request failed" not in result.stderr - - obj.online = is_online - obj.save() - else: - offerobj, offercreated = Offer.objects.get_or_create( - provider=obj, runtime=data["golem.runtime.name"] - ) - if data["golem.runtime.name"] == "vm": - vectors = {} - for key, value in enumerate(data["golem.com.usage.vector"]): - vectors[value] = key - monthly_pricing = ( - ( - data["golem.com.pricing.model.linear.coeffs"][ - vectors["golem.usage.duration_sec"] - ] - * seconds_current_month - ) - + ( - data["golem.com.pricing.model.linear.coeffs"][ - vectors["golem.usage.cpu_sec"] - ] - * seconds_current_month - * data["golem.inf.cpu.threads"] - ) - + data["golem.com.pricing.model.linear.coeffs"][-1] - ) - if not monthly_pricing: - print(f"Monthly price is {monthly_pricing}") - offerobj.monthly_price_glm = monthly_pricing - offerobj.monthly_price_usd = monthly_pricing * glm_usd_value.current_price - - - vcpu_needed = data.get("golem.inf.cpu.threads", 0) - memory_needed = data.get("golem.inf.mem.gib", 0.0) - closest_ec2 = EC2Instance.objects.annotate( - cpu_diff=Abs(F('vcpu') - vcpu_needed), - memory_diff=Abs(F('memory') - memory_needed) - ).order_by('cpu_diff', 'memory_diff', 'price_usd').first() +import asyncio +from .scanner import monitor_nodes_status - # Compare and update the Offer object - if closest_ec2 and monthly_pricing: - offer_price_usd = monthly_pricing * glm_usd_value.current_price - ec2_monthly_price = closest_ec2.price_usd * 730 - offer_is_more_expensive = offer_price_usd > ec2_monthly_price - offer_is_cheaper = offer_price_usd < ec2_monthly_price - - # Update Offer object fields for expensive comparison - offerobj.is_overpriced = offer_is_more_expensive - offerobj.overpriced_compared_to = closest_ec2 if offer_is_more_expensive else None - offerobj.times_more_expensive = offer_price_usd / float(ec2_monthly_price) if offer_is_more_expensive else None - - # Update Offer object fields for cheaper comparison - offerobj.cheaper_than = closest_ec2 if offer_is_cheaper else None - offerobj.times_cheaper = float(ec2_monthly_price) / offer_price_usd if offer_is_cheaper else None - - - else: - print("No matching EC2Instance found or monthly pricing is not available.") - offerobj.is_overpriced = False - offerobj.overpriced_compared_to = None - - offerobj.properties = data - offerobj.save() - obj.runtime = data["golem.runtime.name"] - obj.wallet = wallet - # Verify each node's status - command = f"yagna net find {provider}" - result = subprocess.run(command, shell=True, capture_output=True, text=True) - is_online = "Exiting..., error details: Request failed" not in result.stderr - obj.online = is_online - obj.save() - # Find offline providers - str1 = "".join(serialized) - fd, path = tempfile.mkstemp() - try: - with os.fdopen(fd, "w") as tmp: - # do stuff with temp file - tmp.write(str1) - online_nodes = Node.objects.filter(online=True) - for node in online_nodes: - if not node.node_id in str1: - command = f"yagna net find {node.node_id}" - result = subprocess.run( - command, shell=True, capture_output=True, text=True - ) - is_online = ( - "Exiting..., error details: Request failed" not in result.stderr - ) - node.online = is_online - node.computing_now = False - node.save(update_fields=["online", "computing_now"]) - finally: - os.remove(path) +@app.task +def v2_offer_scraper(subnet_tag="public"): + # Run the asyncio function using asyncio.run() + asyncio.run(monitor_nodes_status(subnet_tag)) @app.task(queue="yagna") @@ -535,41 +373,38 @@ def healthcheck_provider(node_id, network, taskId): return rc - - - @app.task def store_ec2_info(): ec2_info = {} products_data = get_ec2_products() for product in products_data: - details = product.get('details', {}) + details = product.get("details", {}) if not has_vcpu_memory(details): continue print(product) - product_id = product['id'] - category = product.get('category') - name = product.get('name') + product_id = product["id"] + category = product.get("category") + name = product.get("name") pricing_data = get_pricing(product_id) - cheapest_price = find_cheapest_price(pricing_data['prices']) + cheapest_price = find_cheapest_price(pricing_data["prices"]) # Convert memory to float and price to Decimal - memory_gb = float(details['memory']) - price = cheapest_price['amount'] if cheapest_price else None + memory_gb = float(details["memory"]) + price = cheapest_price["amount"] if cheapest_price else None # Use get_or_create to store or update the instance in the database instance, created = EC2Instance.objects.get_or_create( name=name, - defaults={'vcpu': details['vcpu'], 'memory': memory_gb, 'price_usd': price} + defaults={"vcpu": details["vcpu"], "memory": memory_gb, "price_usd": price}, ) ec2_info[product_id] = { - 'category': category, - 'name': name, - 'details': details, - 'cheapest_price': cheapest_price + "category": category, + "name": name, + "details": details, + "cheapest_price": cheapest_price, } return ec2_info diff --git a/stats-backend/api2/urls.py b/stats-backend/api2/urls.py index 21b249a..bd8ffd5 100644 --- a/stats-backend/api2/urls.py +++ b/stats-backend/api2/urls.py @@ -15,6 +15,7 @@ path("network/offers/cheapest/cores", views.cheapest_by_cores), path("provider/wallet/", views.node_wallet), path("provider/node/", views.node), + path("provider/uptime/", views.node_uptime), path("website/globe_data", views.globe_data), path("website/index", views.golem_main_website_index), path("network/offers/cheapest", views.cheapest_offer), diff --git a/stats-backend/api2/views.py b/stats-backend/api2/views.py index 95f10aa..b78ea24 100644 --- a/stats-backend/api2/views.py +++ b/stats-backend/api2/views.py @@ -13,6 +13,107 @@ pool = redis.ConnectionPool(host="redis", port=6379, db=0) r = redis.Redis(connection_pool=pool) +from datetime import timedelta +from typing import List +from .models import Node, NodeStatusHistory +from django.utils import timezone + +# from ninja import NinjaAPI, Schema + +# api = NinjaAPI() + + +# class TrackerSchema(Schema): +# color: str +# tooltip: str +from rest_framework.decorators import api_view +from rest_framework.response import Response +from django.db import models + + +from django.utils.timesince import timesince +from django.db.models.functions import Coalesce +from math import ceil +from .scoring import calculate_uptime_percentage + + +@api_view(["GET"]) +def node_uptime(request, yagna_id): + node = Node.objects.filter(node_id=yagna_id).first() + if not node: + return Response({"first_seen": None, "data": []}) + + statuses = NodeStatusHistory.objects.filter(provider=node).order_by("timestamp") + response_data = [] + + total_span_seconds = (timezone.now() - node.uptime_created_at).total_seconds() + minimum_data_points = 90 + + # Calculating granularity to achieve more granular data points. + granularity_options = [ + 60, + 300, + 600, + 1800, + 3600, + 86400, + 604800, + ] # 1 min, 5 mins, 10 mins, 30 mins, 1 hour, 1 day, 1 week + granularity = max( + 60, ceil(total_span_seconds / minimum_data_points) + ) # Ensuring minimum granularity of 1 minute. + for g in granularity_options: + if granularity <= g: + granularity = g + break + else: + granularity = granularity_options[-1] + + next_check_time = node.uptime_created_at + while next_check_time < timezone.now(): + end_time = next_check_time + timedelta(seconds=granularity) + window_statuses = statuses.filter( + timestamp__gte=next_check_time, timestamp__lt=end_time + ) + + has_offline = window_statuses.filter(is_online=False).exists() + + # Improved time difference message + if granularity >= 86400: # More than or equal to 1 day + time_diff = f"{(next_check_time - node.uptime_created_at).days} days ago" + elif granularity >= 3600: # More than or equal to 1 hour but less than 1 day + hours_ago = int((timezone.now() - next_check_time).total_seconds() / 3600) + time_diff = f"{hours_ago} hours ago" if hours_ago > 1 else "1 hour ago" + else: # Less than an hour + minutes_ago = int((timezone.now() - next_check_time).total_seconds() / 60) + time_diff = ( + f"{minutes_ago} minutes ago" if minutes_ago > 1 else "1 minute ago" + ) + + if has_offline: + response_data.append( + { + "tooltip": time_diff, + "status": "Issue Detected", + } + ) + else: + response_data.append( + { + "tooltip": time_diff, + "status": "Operational", + } + ) + next_check_time = end_time + + return Response( + { + "first_seen": node.uptime_created_at.strftime("%Y-%m-%d %H:%M:%S"), + "uptime_percentage": calculate_uptime_percentage(yagna_id, node), + "data": response_data, + } + ) + def globe_data(request): # open json file and return data diff --git a/stats-backend/api2/yapapi_utils.py b/stats-backend/api2/yapapi_utils.py new file mode 100644 index 0000000..13bdbb3 --- /dev/null +++ b/stats-backend/api2/yapapi_utils.py @@ -0,0 +1,111 @@ +"""Utilities for yapapi example scripts.""" + +import argparse +import asyncio +import tempfile +from datetime import datetime, timezone +from pathlib import Path + +import colorama # type: ignore + +from yapapi import Golem, NoPaymentAccountError +from yapapi import __version__ as yapapi_version +from yapapi import windows_event_loop_fix +from yapapi.log import enable_default_logger + +TEXT_COLOR_RED = "\033[31;1m" +TEXT_COLOR_GREEN = "\033[32;1m" +TEXT_COLOR_YELLOW = "\033[33;1m" +TEXT_COLOR_BLUE = "\033[34;1m" +TEXT_COLOR_MAGENTA = "\033[35;1m" +TEXT_COLOR_CYAN = "\033[36;1m" +TEXT_COLOR_WHITE = "\033[37;1m" + +TEXT_COLOR_DEFAULT = "\033[0m" + +colorama.init() + + +def build_parser(description: str) -> argparse.ArgumentParser: + current_time_str = datetime.now(tz=timezone.utc).strftime("%Y%m%d_%H%M%S%z") + default_log_path = Path(tempfile.gettempdir()) / f"yapapi_{current_time_str}.log" + + parser = argparse.ArgumentParser(description=description) + parser.add_argument( + "--payment-driver", "--driver", help="Payment driver name, for example `erc20`" + ) + parser.add_argument( + "--payment-network", + "--network", + help="Payment network name, for example `goerli`", + ) + parser.add_argument("--subnet-tag", help="Subnet name, for example `public`") + parser.add_argument( + "--log-file", + default=str(default_log_path), + help="Log file for YAPAPI; default: %(default)s", + ) + return parser + + +def format_usage(usage): + return { + "current_usage": usage.current_usage, + "timestamp": usage.timestamp.isoformat(sep=" ") if usage.timestamp else None, + } + + +def print_env_info(golem: Golem): + print( + f"yapapi version: {TEXT_COLOR_YELLOW}{yapapi_version}{TEXT_COLOR_DEFAULT}\n" + f"Using subnet: {TEXT_COLOR_YELLOW}{golem.subnet_tag}{TEXT_COLOR_DEFAULT}, " + f"payment driver: {TEXT_COLOR_YELLOW}{golem.payment_driver}{TEXT_COLOR_DEFAULT}, " + f"and network: {TEXT_COLOR_YELLOW}{golem.payment_network}{TEXT_COLOR_DEFAULT}\n" + ) + + +def run_golem_example(example_main, log_file=None): + # This is only required when running on Windows with Python prior to 3.8: + windows_event_loop_fix() + + if log_file: + enable_default_logger( + log_file=log_file, + debug_activity_api=True, + debug_market_api=True, + debug_payment_api=True, + debug_net_api=True, + ) + + loop = asyncio.get_event_loop() + task = loop.create_task(example_main) + + try: + loop.run_until_complete(task) + except NoPaymentAccountError as e: + handbook_url = ( + "https://handbook.golem.network/requestor-tutorials/" + "flash-tutorial-of-requestor-development" + ) + print( + f"{TEXT_COLOR_RED}" + f"No payment account initialized for driver `{e.required_driver}` " + f"and network `{e.required_network}`.\n\n" + f"See {handbook_url} on how to initialize payment accounts for a requestor node." + f"{TEXT_COLOR_DEFAULT}" + ) + except KeyboardInterrupt: + print( + f"{TEXT_COLOR_YELLOW}" + "Shutting down gracefully, please wait a short while " + "or press Ctrl+C to exit immediately..." + f"{TEXT_COLOR_DEFAULT}" + ) + task.cancel() + try: + loop.run_until_complete(task) + print( + f"{TEXT_COLOR_YELLOW}Shutdown completed, thank you for waiting!{TEXT_COLOR_DEFAULT}" + ) + except (asyncio.CancelledError, KeyboardInterrupt): + pass diff --git a/stats-backend/core/celery.py b/stats-backend/core/celery.py index 84a3de5..a0b1051 100644 --- a/stats-backend/core/celery.py +++ b/stats-backend/core/celery.py @@ -51,7 +51,7 @@ def setup_periodic_tasks(sender, **kwargs): v2_cheapest_offer, v2_network_online_to_redis_flatmap, get_current_glm_price, - store_ec2_info + store_ec2_info, ) sender.add_periodic_task( @@ -92,7 +92,7 @@ def setup_periodic_tasks(sender, **kwargs): options={"queue": "default", "routing_key": "default"}, ) sender.add_periodic_task( - 30.0, + 60.0, v2_offer_scraper.s(), queue="yagna", options={"queue": "yagna", "routing_key": "yagna"}, @@ -130,10 +130,8 @@ def setup_periodic_tasks(sender, **kwargs): # sender.add_periodic_task( # 10.0, # save_endpoint_logs_to_db.s(), - # queue='default', - # options={ - # 'queue': 'default', - # 'routing_key': 'default'} + # queue="default", + # options={"queue": "default", "routing_key": "default"}, # ) sender.add_periodic_task( 60, @@ -144,10 +142,8 @@ def setup_periodic_tasks(sender, **kwargs): # sender.add_periodic_task( # 10.0, # requests_served.s(), - # queue='default', - # options={ - # 'queue': 'default', - # 'routing_key': 'default'} + # queue="default", + # options={"queue": "default", "routing_key": "default"}, # ) sender.add_periodic_task( 15.0, diff --git a/stats-backend/yapapi/examples/low-level-api/v2/list-offers.py b/stats-backend/yapapi/examples/low-level-api/v2/list-offers.py index d3925f1..9cb06de 100755 --- a/stats-backend/yapapi/examples/low-level-api/v2/list-offers.py +++ b/stats-backend/yapapi/examples/low-level-api/v2/list-offers.py @@ -13,7 +13,6 @@ from yapapi.config import ApiConfig - data = [] test = [] @@ -114,7 +113,7 @@ def main(): except TimeoutError: pass - print(len(test)) + print(f"TOTAL COUNT Offers: {len(test)}") if len(test) > 0: serialized = json.dumps(test) r = redis.Redis(host="redis", port=6379, db=0)