Skip to content

Commit

Permalink
Computing status, earnings PoC, Non-blocking ec2
Browse files Browse the repository at this point in the history
  • Loading branch information
cryptobench committed Mar 20, 2024
1 parent a081bb9 commit 7529b0f
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 111 deletions.
98 changes: 59 additions & 39 deletions stats-backend/api2/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
from django.db.models.functions import Abs
from decimal import Decimal
from .utils import (
get_pricing,
get_ec2_products,
find_cheapest_price,
has_vcpu_memory,
round_to_three_decimals,
identify_network_by_offer,
is_provider_online,
process_and_store_product_data,
extract_pricing_from_vm_properties,
identify_network,
)

pool = redis.ConnectionPool(host="redis", port=6379, db=0)
Expand Down Expand Up @@ -685,41 +685,38 @@ def healthcheck_provider(node_id, network, taskId):
return rc


@app.task
def store_ec2_info():
ec2_info = {}
products_data = get_ec2_products()

for product in products_data:
details = product.get("details", {})
if not has_vcpu_memory(details):
continue
print(product)
product_id = product["id"]
category = product.get("category")
name = product.get("name")

pricing_data = get_pricing(product_id)
cheapest_price = find_cheapest_price(pricing_data["prices"])

# Convert memory to float and price to Decimal
memory_gb = float(details["memory"])
price = cheapest_price["amount"] if cheapest_price else None

# Use get_or_create to store or update the instance in the database
instance, created = EC2Instance.objects.get_or_create(
name=name,
defaults={"vcpu": details["vcpu"], "memory": memory_gb, "price_usd": price},
)

ec2_info[product_id] = {
"category": category,
"name": name,
"details": details,
"cheapest_price": cheapest_price,
}
@app.task(
bind=True,
autoretry_for=(requests.exceptions.RequestException,),
retry_backoff=True,
retry_jitter=True,
retry_kwargs={"max_retries": 5},
)
def store_ec2_info(self):
url = "https://api.vantage.sh/v2/products?service_id=aws-ec2"
headers = {
"accept": "application/json",
"authorization": f'Bearer {os.environ.get("VANTAGE_API_KEY")}',
}
try:
response = requests.get(url, headers=headers)
# Check for rate limiting error
if response.status_code == 429:
reset_time = int(response.headers.get("x-rate-limit-reset", 0))
current_time = time.time()
retry_after = max(
reset_time - current_time, 1
) # Ensure there's at least a 1-second wait
# Schedule the next retry to align with the rate limit reset time
raise self.retry(countdown=retry_after)
response.raise_for_status()

return ec2_info
products_data = response.json().get("products", [])
for product in products_data:
process_and_store_product_data.delay(product)
except requests.RequestException as exc:
# Reraise with self.retry to utilize Celery's built-in retry mechanism
raise self.retry(exc=exc)


import time
Expand Down Expand Up @@ -1276,3 +1273,26 @@ def count_cpu_architecture():
cpu_architecture_json = json.dumps(cpu_architecture_count)

r.set("cpu_architecture_count", cpu_architecture_json)


import urllib.parse


@app.task
def online_nodes_computing():
end = round(time.time())
start = end - 10
query = 'activity_provider_created{job="community.1"} - activity_provider_destroyed{job="community.1"}'
url = f"{os.environ.get('STATS_URL')}api/datasources/proxy/40/api/v1/query_range?query={urllib.parse.quote(query)}&start={start}&end={end}&step=1"
data = get_stats_data(url)

if data[1] == 200 and data[0]["status"] == "success" and data[0]["data"]["result"]:
computing_node_ids = [
node["metric"]["instance"]
for node in data[0]["data"]["result"]
if node["values"][-1][1] == "1"
]
Node.objects.filter(node_id__in=computing_node_ids).update(computing_now=True)
Node.objects.exclude(node_id__in=computing_node_ids).update(computing_now=False)
NodeV1.objects.filter(node_id__in=computing_node_ids).update(computing_now=True)
NodeV1.objects.exclude(node_id__in=computing_node_ids).update(computing_now=False)
1 change: 1 addition & 0 deletions stats-backend/api2/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
path("provider/wallet/<wallet>", views.node_wallet),
path("provider/node/<yagna_id>", views.node),
path("provider/uptime/<yagna_id>", views.node_uptime),
path("provider/earnings/<node_id>/<epoch>", views.get_transfer_sum),
path("website/globe_data", views.globe_data),
path("website/index", views.golem_main_website_index),
path("network/historical/stats", views.network_historical_stats),
Expand Down
149 changes: 89 additions & 60 deletions stats-backend/api2/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import subprocess
from django.conf import settings
import os
from .models import Offer
from .models import Offer, EC2Instance


def identify_network_by_offer(offer):
Expand Down Expand Up @@ -55,87 +55,116 @@ def identify_network(provider):
return "testnet"


from celery import shared_task
from decimal import Decimal, ROUND_DOWN
import requests
import os
import time


def make_request_with_rate_limit_handling(url, headers):
while True:
response = requests.get(url, headers=headers)
if response.status_code == 429: # Rate limit hit

reset_time = int(response.headers.get("x-rate-limit-reset", 0))
sleep_duration = max(reset_time - time.time(), 0)
print(f"Ratelimited waiting for {sleep_duration}")
time.sleep(sleep_duration + 1) # Sleep until the limit resets, then retry
else:
return response
from core.celery import app
import requests
import os


def get_ec2_products():
products = []
@app.task(
bind=True,
autoretry_for=(requests.exceptions.RequestException,),
retry_backoff=True,
max_retries=5,
)
def fetch_and_store_ec2_product_list(self):
url = "https://api.vantage.sh/v2/products?service_id=aws-ec2"
headers = {
"accept": "application/json",
"authorization": f'Bearer {os.environ.get("VANTAGE_API_KEY")}',
}
headers = headers_setup()

try:
response = make_request_with_rate_limit_handling(url, headers, self)
products_data = response.json().get("products", [])
for product in products_data:
process_and_store_product_data.delay(product)
except requests.RequestException as e:
raise self.retry(exc=e)


@app.task(
bind=True,
autoretry_for=(requests.exceptions.RequestException,),
retry_backoff=True,
max_retries=5,
)
def process_and_store_product_data(self, product):
details = product.get("details", {})
if not has_vcpu_memory(details):
return

product_id, category, name = item_details(product)
fetch_pricing_data.delay(product_id, category, name, details)


@app.task(
bind=True,
autoretry_for=(requests.exceptions.RequestException,),
retry_backoff=True,
max_retries=5,
)
def fetch_pricing_data(self, product_id, category, name, details):
url = f"https://api.vantage.sh/v2/products/{product_id}/prices"
headers = headers_setup()

while url:
response = make_request_with_rate_limit_handling(url, headers)
data = response.json()
print("Got product list")
products.extend(data.get("products", []))
url = data["links"].get("next") # Get the next page URL
try:
response = make_request_with_rate_limit_handling(url, headers, self)
pricing_data = response.json()
store_ec2_instance_data.delay(pricing_data, product_id, category, name, details)
except requests.RequestException as e:
raise self.retry(exc=e)

return products

@app.task
def store_ec2_instance_data(pricing_data, product_id, category, name, details):
cheapest_price = find_cheapest_price(pricing_data["prices"])
memory_gb, price = details_conversion(details, cheapest_price)

def get_pricing(product_id):
url = f"https://api.vantage.sh/v2/products/{product_id}/prices"
headers = {
"accept": "application/json",
"authorization": f'Bearer {os.environ.get("VANTAGE_API_KEY")}',
}
response = make_request_with_rate_limit_handling(url, headers)
print("Got price")
return response.json()
# Adjust to match the actual model fields; removed the non-existent 'product_id'
instance, created = EC2Instance.objects.get_or_create(
name=name,
defaults={"vcpu": details["vcpu"], "memory": memory_gb, "price_usd": price},
)


def find_cheapest_price(prices):
return min(prices, key=lambda x: x["amount"]) if prices else None
def make_request_with_rate_limit_handling(url, headers, task_instance=None):
response = requests.get(url, headers=headers)
if response.status_code == 429:
if task_instance:
reset_time = int(response.headers.get("x-rate-limit-reset", 0))
current_time = time.time()
retry_after = max(reset_time - current_time, 1) # Ensure at least 1 second
raise task_instance.retry(
countdown=retry_after, exc=Exception("Rate limit exceeded")
)
response.raise_for_status()
return response


def has_vcpu_memory(details):
return "vcpu" in details and "memory" in details


from decimal import Decimal, ROUND_DOWN
def find_cheapest_price(prices):
return min(prices, key=lambda x: x["amount"]) if prices else None


def round_to_three_decimals(value):
# Convert to Decimal
decimal_value = Decimal(value)
def details_conversion(details, cheapest_price):
return float(details["memory"]), (
cheapest_price["amount"] if cheapest_price else None
)

# If the value is less than 1 and not zero, handle the first non-zero digit
if 0 < decimal_value < 1:
# Convert to scientific notation to find the first non-zero digit
value_scientific = format(decimal_value, ".6e")
exponent = int(value_scientific.split("e")[-1])

# If the exponent is significantly small, handle as a special case
if exponent <= -6:
rounded_value = decimal_value
else:
# Calculate the number of decimal places to keep
decimal_places = abs(exponent) + 2 # 2 more than the exponent
quantize_pattern = Decimal("1e-" + str(decimal_places))
def item_details(product):
return product["id"], product.get("category"), product.get("name")

# Rounding the value
rounded_value = decimal_value.quantize(
quantize_pattern, rounding=ROUND_DOWN
)
else:
# If the value is 1 or more, or exactly 0, round to a maximum of three decimal places
rounded_value = decimal_value.quantize(Decimal("0.001"), rounding=ROUND_DOWN)

return rounded_value
def headers_setup():
return {
"accept": "application/json",
"authorization": f'Bearer {os.environ.get("VANTAGE_API_KEY")}',
}
30 changes: 30 additions & 0 deletions stats-backend/api2/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,36 @@ async def cpu_architecture_stats(request):
return HttpResponse(status=400)


from collector.models import Requestors


def get_transfer_sum(request, node_id, epoch):
try:
url = f"http://polygongas.org:14059/erc20/api/stats/transfers?chain=137&account={node_id}&from={epoch}"
response = requests.get(url)
if response.status_code != 200:
return JsonResponse({'error': 'Failed to get data from API'}, status=500)
data = response.json()
from_addrs = [t["fromAddr"] for t in data.get("transfers", [])]
from_addrs_in_db = Requestors.objects.filter(node_id__in=from_addrs).values_list("node_id", flat=True)
total_amount_wei_matched = sum(
int(t["tokenAmount"])
for t in data.get("transfers", [])
if t["fromAddr"] in from_addrs_in_db
)
total_amount_wei_not_matched = sum(
int(t["tokenAmount"])
for t in data.get("transfers", [])
if t["fromAddr"] not in from_addrs_in_db
)
return JsonResponse({
'total_amount_matched': total_amount_wei_matched / 1e18,
'total_amount_not_matched': total_amount_wei_not_matched / 1e18
})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)


async def network_online(request):
if request.method == "GET":
pool = aioredis.ConnectionPool.from_url(
Expand Down
16 changes: 13 additions & 3 deletions stats-backend/collector/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,24 +637,34 @@ def provider_accepted_invoices_1h():
r.set("provider_accepted_invoice_percentage", serialized)


import urllib.parse


@app.task
def online_nodes_computing():
end = round(time.time())
start = end - 60
start = end - 10
providers = Node.objects.filter(online=True)
computing_node_ids = []

for node in providers:
url = f"api/datasources/proxy/40/api/v1/query_range?query=sum(changes(activity_provider_created%7Bjob%3D~%22community.1%22%2C%20instance%3D~%22{node.node_id}%22%7D[60m]))&start={start}&end={end}&step=30"
query = (
f'activity_provider_created{{instance=~"{node.node_id}", job=~"community.1"}}'
" - "
f'activity_provider_destroyed{{instance=~"{node.node_id}", job=~"community.1"}}'
)
encoded_query = urllib.parse.quote(query)
url = f"api/datasources/proxy/40/api/v1/query_range?query={encoded_query}&start={start}&end={end}&step=120"
domain = os.environ.get("STATS_URL") + url
data = get_stats_data(domain)

if (
data[1] == 200
and data[0]["status"] == "success"
and data[0]["data"]["result"]
):
values = data[0]["data"]["result"][0]["values"]
if values[-1][1] != "0":
if any(float(value[1]) > 0 for value in values):
computing_node_ids.append(node.pk)

Node.objects.filter(pk__in=computing_node_ids).update(computing_now=True)
Expand Down
Loading

0 comments on commit 7529b0f

Please sign in to comment.