diff --git a/stats-backend/api/views.py b/stats-backend/api/views.py index 9900be0..dc8549c 100644 --- a/stats-backend/api/views.py +++ b/stats-backend/api/views.py @@ -198,7 +198,6 @@ async def payments_last_n_hours_provider(request, yagna_id, hours): + f'api/datasources/proxy/40/api/v1/query?query=sum(increase(payment_amount_received%7Binstance%3D~"{yagna_id}"%2C%20job%3D~"community.1"%7D%5B{hours}h%5D)%2F10%5E9)&time={now}' ) data = await get_yastats_data(domain) - print(data) if data[1] == 200: if data[0]["data"]["result"]: content = {"earnings": data[0]["data"]["result"][0]["value"][1]} @@ -240,7 +239,6 @@ async def payments_earnings_provider(request, yagna_id): for interval in time_intervals: query_url = f'{base_url}?query=sum(increase(payment_amount_received%7Binstance%3D~"{yagna_id}"%2C%20job%3D~"community.1"%7D%5B{interval}h%5D)%2F10%5E9)&time={now}' data = await get_yastats_data(query_url) - print(data) if data[1] == 200 and data[0]["data"]["result"]: earnings[interval] = data[0]["data"]["result"][0]["value"][1] @@ -752,39 +750,18 @@ async def network_earnings_overview(request): return HttpResponse(status=400) -def network_earnings_overview_new(request): +async def network_earnings_overview_new(request): if request.method == "GET": - time_frames = [6, 24, 168, 720, 2160] - response_data = {} - - for frame in time_frames: - end_time = now() - start_time = end_time - timedelta(hours=frame) - total_earnings = ( - GolemTransactions.objects.filter( - timestamp__range=[start_time, end_time], tx_from_golem=True - ).aggregate(Sum("amount"))["amount__sum"] - or 0.0 - ) - - response_data[f"network_earnings_{frame}h"] = { - "total_earnings": float(total_earnings) - } - - all_time_earnings = ( - GolemTransactions.objects.filter(tx_from_golem=True).aggregate( - Sum("amount") - )["amount__sum"] - or 0.0 + pool = aioredis.ConnectionPool.from_url( + "redis://redis:6379/0", decode_responses=True ) - - print(f"all_time_earnings: {all_time_earnings}") - - response_data["network_total_earnings"] = { - "total_earnings": float(all_time_earnings) - } - - return JsonResponse(response_data) + r = aioredis.Redis(connection_pool=pool) + content = await r.get("network_earnings_overview_new") + data = json.loads(content) + pool.disconnect() + return JsonResponse(data, safe=False, json_dumps_params={"indent": 4}) + else: + return HttpResponse(status=400) async def requestors(request): diff --git a/stats-backend/api2/tasks.py b/stats-backend/api2/tasks.py index 5c1f625..c7d4067 100644 --- a/stats-backend/api2/tasks.py +++ b/stats-backend/api2/tasks.py @@ -15,6 +15,8 @@ from collector.models import Node as NodeV1 from django.db.models import F from django.db.models.functions import Abs +from django.core.serializers.json import DjangoJSONEncoder + from decimal import Decimal from .utils import ( identify_network_by_offer, @@ -1498,3 +1500,87 @@ def fetch_latest_glm_tx(): print(f"New transactions added. Latest timestamp: {latest_block_timestamp}") except Exception as e: raise e + + +@app.task +def average_transaction_value_over_time(): + data = ( + GolemTransactions.objects.annotate(date=TruncDay("timestamp")) + .values("date") + .annotate(average_value=Avg("amount")) + .order_by("date") + ) + r.set("average_transaction_value", json.dumps(list(data), cls=DjangoJSONEncoder)) + + +from django.db.models import IntegerField, ExpressionWrapper, Case, When, Avg + + +@app.task +def daily_transaction_type_counts(): + data = ( + GolemTransactions.objects.annotate(date=TruncDay("timestamp")) + .values("date") + .annotate( + singleTransfer=Sum( + ExpressionWrapper( + Case( + When(transaction_type="singleTransfer", then=1), + default=0, + output_field=IntegerField(), + ), + output_field=IntegerField(), + ) + ), + batched=Sum( + ExpressionWrapper( + Case( + When(transaction_type="batched", then=1), + default=0, + output_field=IntegerField(), + ), + output_field=IntegerField(), + ) + ), + ) + .order_by("date") + ) + + r.set( + "daily_transaction_type_counts", json.dumps(list(data), cls=DjangoJSONEncoder) + ) + + +@app.task +def transaction_type_comparison(): + data = ( + GolemTransactions.objects.filter( + transaction_type__in=["singleTransfer", "batched"] + ) + .values("transaction_type") + .annotate(total=Count("scanner_id")) + .order_by("transaction_type") + ) + r.set("transaction_type_comparison", json.dumps(list(data), cls=DjangoJSONEncoder)) + + +@app.task +def amount_transferred_over_time(): + data = ( + GolemTransactions.objects.annotate(date=TruncDay("timestamp")) + .values("date") + .annotate(total_amount=Sum("amount")) + .order_by("date") + ) + r.set("amount_transferred_over_time", json.dumps(list(data), cls=DjangoJSONEncoder)) + + +@app.task +def transaction_volume_over_time(): + data = ( + GolemTransactions.objects.annotate(date=TruncDay("timestamp")) + .values("date") + .annotate(total_transactions=Count("scanner_id")) + .order_by("date") + ) + r.set("transaction_volume_over_time", json.dumps(list(data), cls=DjangoJSONEncoder)) diff --git a/stats-backend/api2/urls.py b/stats-backend/api2/urls.py index 236d727..5ce9c7d 100644 --- a/stats-backend/api2/urls.py +++ b/stats-backend/api2/urls.py @@ -34,7 +34,6 @@ path("network/token/golemvschain", views.daily_volume_golem_vs_chain), path("network/transactions/volume", views.transaction_volume_over_time), path("network/amount/transfer", views.amount_transferred_over_time), - path("initblockchain", views.init_golem_tx_manually), path("network/transactions/type/comparison", views.transaction_type_comparison), path("network/transactions/daily-type-counts", views.daily_transaction_type_counts), path( diff --git a/stats-backend/api2/views.py b/stats-backend/api2/views.py index 934a8a7..e9f0873 100644 --- a/stats-backend/api2/views.py +++ b/stats-backend/api2/views.py @@ -703,98 +703,74 @@ def daily_volume_golem_vs_chain(request): from django.db.models import Count -def transaction_volume_over_time(request): - try: - data = ( - GolemTransactions.objects.annotate(date=TruncDay("timestamp")) - .values("date") - .annotate(total_transactions=Count("scanner_id")) - .order_by("date") +async def transaction_volume_over_time(request): + if request.method == "GET": + pool = aioredis.ConnectionPool.from_url( + "redis://redis:6379/0", decode_responses=True ) - return JsonResponse(list(data), safe=False) - except Exception as e: - return JsonResponse({"error": str(e)}, status=500) + r = aioredis.Redis(connection_pool=pool) + content = await r.get("transaction_volume_over_time") + data = json.loads(content) + pool.disconnect() + return JsonResponse(data, safe=False, json_dumps_params={"indent": 4}) + else: + return HttpResponse(status=400) -def amount_transferred_over_time(request): - try: - data = ( - GolemTransactions.objects.annotate(date=TruncDay("timestamp")) - .values("date") - .annotate(total_amount=Sum("amount")) - .order_by("date") +async def amount_transferred_over_time(request): + if request.method == "GET": + pool = aioredis.ConnectionPool.from_url( + "redis://redis:6379/0", decode_responses=True ) - return JsonResponse(list(data), safe=False) - except Exception as e: - return JsonResponse({"error": str(e)}, status=500) + r = aioredis.Redis(connection_pool=pool) + content = await r.get("amount_transferred_over_time") + data = json.loads(content) + pool.disconnect() + return JsonResponse(data, safe=False, json_dumps_params={"indent": 4}) + else: + return HttpResponse(status=400) -def transaction_type_comparison(request): - try: - data = ( - GolemTransactions.objects.filter( - transaction_type__in=["singleTransfer", "batched"] - ) - .values("transaction_type") - .annotate(total=Count("scanner_id")) - .order_by("transaction_type") +async def transaction_type_comparison(request): + if request.method == "GET": + pool = aioredis.ConnectionPool.from_url( + "redis://redis:6379/0", decode_responses=True ) - return JsonResponse(list(data), safe=False) - except Exception as e: - return JsonResponse({"error": str(e)}, status=500) + r = aioredis.Redis(connection_pool=pool) + content = await r.get("transaction_type_comparison") + data = json.loads(content) + pool.disconnect() + return JsonResponse(data, safe=False, json_dumps_params={"indent": 4}) + else: + return HttpResponse(status=400) from django.db.models import IntegerField, ExpressionWrapper, Case, When, Avg -def daily_transaction_type_counts(request): - try: - data = ( - GolemTransactions.objects.annotate(date=TruncDay("timestamp")) - .values("date") - .annotate( - singleTransfer=Sum( - ExpressionWrapper( - Case( - When(transaction_type="singleTransfer", then=1), - default=0, - output_field=IntegerField(), - ), - output_field=IntegerField(), - ) - ), - batched=Sum( - ExpressionWrapper( - Case( - When(transaction_type="batched", then=1), - default=0, - output_field=IntegerField(), - ), - output_field=IntegerField(), - ) - ), - ) - .order_by("date") +async def daily_transaction_type_counts(request): + if request.method == "GET": + pool = aioredis.ConnectionPool.from_url( + "redis://redis:6379/0", decode_responses=True ) - return JsonResponse(list(data), safe=False) - except Exception as e: - return JsonResponse({"error": str(e)}, status=500) + r = aioredis.Redis(connection_pool=pool) + content = await r.get("daily_transaction_type_counts") + data = json.loads(content) + pool.disconnect() + return JsonResponse(data, safe=False, json_dumps_params={"indent": 4}) + else: + return HttpResponse(status=400) -def average_transaction_value_over_time(request): - try: - data = ( - GolemTransactions.objects.annotate(date=TruncDay("timestamp")) - .values("date") - .annotate(average_value=Avg("amount")) - .order_by("date") +async def average_transaction_value_over_time(request): + if request.method == "GET": + pool = aioredis.ConnectionPool.from_url( + "redis://redis:6379/0", decode_responses=True ) - return JsonResponse(list(data), safe=False) - except Exception as e: - return JsonResponse({"error": str(e)}, status=500) - - -from .tasks import init_golem_tx_scraping -def init_golem_tx_manually(request): - init_golem_tx_scraping.delay() - return JsonResponse({"status": "success"}, status=200) \ No newline at end of file + r = aioredis.Redis(connection_pool=pool) + content = await r.get("average_transaction_value") + data = json.loads(content) + pool.disconnect() + return JsonResponse(data, safe=False, json_dumps_params={"indent": 4}) + else: + return HttpResponse(status=400) diff --git a/stats-backend/collector/tasks.py b/stats-backend/collector/tasks.py index 4a9ab7b..3b6f3e9 100644 --- a/stats-backend/collector/tasks.py +++ b/stats-backend/collector/tasks.py @@ -455,7 +455,6 @@ def network_versions_to_redis(): versions_nonsorted = [] versions = [] data = content[0]["data"]["result"] - print(data) # Append to array so we can sort for obj in data: versions_nonsorted.append( @@ -545,6 +544,42 @@ def network_total_earnings(): ) update_total_earnings(domain) +from api2.models import GolemTransactions +from django.db.models import Sum +from django.utils.timezone import now + +@app.task +def network_earnings_overview_new(): + time_frames = [6, 24, 168, 720, 2160] + response_data = {} + + for frame in time_frames: + end_time = now() + start_time = end_time - timedelta(hours=frame) + total_earnings = ( + GolemTransactions.objects.filter( + timestamp__range=[start_time, end_time], tx_from_golem=True + ).aggregate(Sum("amount"))["amount__sum"] + or 0.0 + ) + + response_data[f"network_earnings_{frame}h"] = { + "total_earnings": float(total_earnings) + } + + all_time_earnings = ( + GolemTransactions.objects.filter(tx_from_golem=True).aggregate( + Sum("amount") + )["amount__sum"] + or 0.0 + ) + + + response_data["network_total_earnings"] = { + "total_earnings": float(all_time_earnings) + } + + r.set("network_earnings_overview_new", json.dumps(response_data)) def update_total_earnings(domain): data = get_stats_data(domain) diff --git a/stats-backend/core/celery.py b/stats-backend/core/celery.py index 1412e29..6ef00db 100644 --- a/stats-backend/core/celery.py +++ b/stats-backend/core/celery.py @@ -38,6 +38,7 @@ def setup_periodic_tasks(sender, **kwargs): paid_invoices_1h, provider_accepted_invoices_1h, fetch_yagna_release, + network_earnings_overview_new, ) from api2.tasks import ( v2_offer_scraper, @@ -66,6 +67,48 @@ def setup_periodic_tasks(sender, **kwargs): fetch_and_store_relay_nodes, init_golem_tx_scraping, fetch_latest_glm_tx, + transaction_volume_over_time, + amount_transferred_over_time, + transaction_type_comparison, + daily_transaction_type_counts, + average_transaction_value_over_time, + ) + + sender.add_periodic_task( + 60, + transaction_volume_over_time.s(), + queue="default", + options={"queue": "default", "routing_key": "default"}, + ) + sender.add_periodic_task( + 60, + network_earnings_overview_new.s(), + queue="default", + options={"queue": "default", "routing_key": "default"}, + ) + sender.add_periodic_task( + 60, + daily_transaction_type_counts.s(), + queue="default", + options={"queue": "default", "routing_key": "default"}, + ) + sender.add_periodic_task( + 60, + average_transaction_value_over_time.s(), + queue="default", + options={"queue": "default", "routing_key": "default"}, + ) + sender.add_periodic_task( + 60, + transaction_type_comparison.s(), + queue="default", + options={"queue": "default", "routing_key": "default"}, + ) + sender.add_periodic_task( + 60, + amount_transferred_over_time.s(), + queue="default", + options={"queue": "default", "routing_key": "default"}, ) sender.add_periodic_task( @@ -86,12 +129,12 @@ def setup_periodic_tasks(sender, **kwargs): queue="default", options={"queue": "default", "routing_key": "default"}, ) - # sender.add_periodic_task( - # 30, - # fetch_and_store_relay_nodes.s(), - # queue="default", - # options={"queue": "default", "routing_key": "default"}, - # ) + sender.add_periodic_task( + 30, + fetch_and_store_relay_nodes.s(), + queue="default", + options={"queue": "default", "routing_key": "default"}, + ) sender.add_periodic_task( crontab(minute="*/60"), fetch_yagna_release.s(),