From 696f294338df68b40d2c186be7f9e23a3e7a4895 Mon Sep 17 00:00:00 2001 From: zhaoyu Date: Fri, 13 Dec 2024 14:18:33 -0500 Subject: [PATCH] Add support for ART Local tests in the APIs --- core/art/constants.py | 3 + core/art/modelsART.py | 1 + core/art/utils.py | 19 +++++- core/art/views.py | 144 ++++++++++++++++++++++++++---------------- 4 files changed, 113 insertions(+), 54 deletions(-) diff --git a/core/art/constants.py b/core/art/constants.py index 9ecadf01..d5d5b6d0 100644 --- a/core/art/constants.py +++ b/core/art/constants.py @@ -9,6 +9,8 @@ CACHE_TIMEOUT_MINUTES = 15 RETENTION_PERIOD_DAYS = 180 # 6 months EOS_PREFIX = 'https://atlas-art-data.web.cern.ch/atlas-art-data/grid-output/' +EOS_PREFIX_LOCAL = 'https://atlas-art-data.web.cern.ch/atlas-art-data/local-output/' +INITIAL_LOCAL_ID = 1 # dicts DATETIME_FORMAT = MappingProxyType({ @@ -118,3 +120,4 @@ "egammaValidation": "Reconstruction/egamma/egammaValidation/test" }) +AUTHORIZED_HOSTS = ['aibuild16-025', 'aibuild16-026'] diff --git a/core/art/modelsART.py b/core/art/modelsART.py index 946f0609..21c8ebae 100644 --- a/core/art/modelsART.py +++ b/core/art/modelsART.py @@ -28,6 +28,7 @@ class ARTTests(models.Model): attemptnr = models.DecimalField(decimal_places=0, max_digits=3, db_column='attemptnr') maxattempt = models.DecimalField(decimal_places=0, max_digits=3, db_column='maxattempt') status = models.DecimalField(decimal_places=0, max_digits=3, db_column='status') + test_type = models.CharField(max_length=1000, db_column='test_type', null=True, blank=True) class Meta: db_table = f'"{settings.DB_SCHEMA}"."art_tests"' diff --git a/core/art/utils.py b/core/art/utils.py index e50533cc..911ea958 100644 --- a/core/art/utils.py +++ b/core/art/utils.py @@ -138,6 +138,9 @@ def setupView(request): if p == f: query[f] = v + # For transiton period to integrate ART Local tests, temporarily requiring test_type=grid to only show ART Grid test + query['test_type'] = "grid" + return query, query_str @@ -324,4 +327,18 @@ def clean_tests_list(tests, add_link_previous_attempt=False): t['linktopreviousattemptlogs'] = f"?pandaid={min(tmp_dict[m])}" tests_filtered.append(t) - return tests_filtered \ No newline at end of file + return tests_filtered + + +def get_client_ip(request): + # Get the 'X-Forwarded-For' header (which contains the real client IP if behind a proxy) + x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR') + if x_forwarded_for: + # If there are multiple IP addresses in the header (because of multiple proxies), + # the first one is usually the original client IP + ip = x_forwarded_for.split(',')[0] + else: + # Otherwise, use the direct connection's IP address (REMOTE_ADDR) + ip = request.META.get('REMOTE_ADDR') + + return ip diff --git a/core/art/views.py b/core/art/views.py index b3a729aa..a803d6a9 100644 --- a/core/art/views.py +++ b/core/art/views.py @@ -5,6 +5,7 @@ import json import re import time +import socket import multiprocessing from datetime import datetime, timedelta @@ -37,7 +38,7 @@ from core.common.models import Filestable4 from core.art.utils import setupView, get_test_diff, get_result_for_multijob_test, concat_branch, \ - find_last_successful_test, build_gitlab_link, clean_tests_list + find_last_successful_test, build_gitlab_link, clean_tests_list, get_client_ip from django.conf import settings import core.art.constants as art_const @@ -1131,16 +1132,50 @@ def registerARTTest(request): tarindex = None inputfileid = None gitlabid = None + test_type = "grid" # log all the req params for debug _logger.debug('[ART] registerARTtest requestParams: ' + str(request.session['requestParams'])) # Checking whether params were provided - if 'requestParams' in request.session and 'pandaid' in request.session['requestParams'] and 'testname' in request.session['requestParams']: - pandaid = request.session['requestParams']['pandaid'] + if 'requestParams' in request.session and 'test_type' in request.session['requestParams']: + test_type = request.session['requestParams']['test_type'] + + if 'requestParams' in request.session and 'testname' in request.session['requestParams']: testname = request.session['requestParams']['testname'] else: - data = {'exit_code': -1, 'message': "There were not recieved any pandaid and testname"} + data = {'exit_code': -1, 'message': "There were not received any testname"} + _logger.warning(data['message'] + str(request.session['requestParams'])) + return HttpResponse(json.dumps(data), status=400, content_type='application/json') + + if 'requestParams' in request.session and 'pandaid' in request.session['requestParams']: + pandaid = request.session['requestParams']['pandaid'] + elif test_type == 'local': + #session_id = request.COOKIES.get('sessionid') + #if session_id: + client_ip = get_client_ip(request) + try: + # Perform reverse DNS lookup to get the client's hostname + art_host = socket.gethostbyaddr(client_ip)[0] + except socket.herror: + # If reverse DNS lookup fails, return the IP address as fallback + art_host = client_ip + + if art_host.split('.')[0] not in art_const.AUTHORIZED_HOSTS: + return JsonResponse({"error": "Invalid ART API user!"}, status=403) + + # Generate job ID for ART Local + query = {'test_type': 'local'} + pandaid = ARTTests.objects.filter(test_type='local').aggregate(Max('pandaid')) + if len(pandaid) > 0 and pandaid['pandaid__max'] is not None: + pandaid = int(pandaid['pandaid__max']) + 1 + else: + pandaid = art_const.INITIAL_LOCAL_ID + _logger.info("JobID: {} was generated".format(pandaid)) + attemptnr = 1 + computingsite = "ART Local" + else: + data = {'exit_code': -1, 'message': "There were not received any pandaid"} _logger.warning(data['message'] + str(request.session['requestParams'])) return HttpResponse(json.dumps(data), status=400, content_type='application/json') @@ -1206,56 +1241,58 @@ def registerARTTest(request): _logger.warning(data['message'] + str(request.session['requestParams'])) return HttpResponse(json.dumps(data), status=422, content_type='application/json') - # Checking if provided pandaid exists in panda db - query = {'pandaid': pandaid} - values = ('pandaid', 'jeditaskid', 'username', 'computingsite', 'jobname') - jobs = [] - jobs.extend(CombinedWaitActDefArch4.objects.filter(**query).values(*values)) - try: - job = jobs[0] - except: - data = {'exit_code': -1, 'message': "Provided pandaid does not exists"} - _logger.warning(data['message'] + str(request.session['requestParams'])) - return HttpResponse(json.dumps(data), status=422, content_type='application/json') - - # Checking whether provided pandaid is art job - if 'username' in job and job['username'] != 'artprod': - data = {'exit_code': -1, 'message': "Provided pandaid is not art job"} - _logger.warning(data['message'] + str(request.session['requestParams'])) - return HttpResponse(json.dumps(data), status=422, content_type='application/json') - - # Preparing params to register art job + # Only check ART Grid tests branch = concat_branch({'nightly_release_short':nightly_release_short, 'project': project, 'platform': platform}) - if 'computingsite' in job: - computingsite = job['computingsite'] - if 'jeditaskid' in job: - jeditaskid = job['jeditaskid'] - - # get files -> extract log tarball name, attempts - files = [] - fquery = {'jeditaskid': jeditaskid, 'pandaid': pandaid, 'type__in': ('pseudo_input', 'input', 'log')} - files.extend(Filestable4.objects.filter(**fquery).values('jeditaskid', 'pandaid', 'fileid', 'lfn', 'type', 'attemptnr')) - # count of attempts starts from 0, for readability change it to start from 1 - if len(files) > 0: - input_files = [f for f in files if f['type'] in ('pseudo_input', 'input')] - if len(input_files) > 0: - attemptnr = 1 + max([f['attemptnr'] for f in input_files]) - inputfileid = max([f['fileid'] for f in input_files]) - log_lfn = [f['lfn'] for f in files if f['type'] == 'log'] - if len(log_lfn) > 0: - try: - tarindex = int(re.search('.([0-9]{6}).log.', log_lfn[0]).group(1)) - except: - _logger.info('Failed to extract tarindex from log lfn') - tarindex = None - if 'jobname' in job: + if test_type and test_type == 'grid': + # Checking if provided pandaid exists in panda db + query = {'pandaid': pandaid} + values = ('pandaid', 'jeditaskid', 'username', 'computingsite', 'jobname') + jobs = [] + jobs.extend(CombinedWaitActDefArch4.objects.filter(**query).values(*values)) try: - gitlabid = int(re.search('.([0-9]{6,8}).', job['jobname']).group(1)) + job = jobs[0] except: - _logger.info('Failed to extract tarindex from log lfn') - gitlabid = None - _logger.info(f"""Got job-related metadata for test {pandaid}: - computingsite={computingsite}, tarindex={tarindex}, inputfileid={inputfileid}, attemptnr={attemptnr}""") + data = {'exit_code': -1, 'message': "Provided pandaid does not exists"} + _logger.warning(data['message'] + str(request.session['requestParams'])) + return HttpResponse(json.dumps(data), status=422, content_type='application/json') + + # Checking whether provided pandaid is art job + if 'username' in job and job['username'] != 'artprod': + data = {'exit_code': -1, 'message': "Provided pandaid is not art job"} + _logger.warning(data['message'] + str(request.session['requestParams'])) + return HttpResponse(json.dumps(data), status=422, content_type='application/json') + + # Preparing params to register art job + if 'computingsite' in job: + computingsite = job['computingsite'] + if 'jeditaskid' in job: + jeditaskid = job['jeditaskid'] + + # get files -> extract log tarball name, attempts + files = [] + fquery = {'jeditaskid': jeditaskid, 'pandaid': pandaid, 'type__in': ('pseudo_input', 'input', 'log')} + files.extend(Filestable4.objects.filter(**fquery).values('jeditaskid', 'pandaid', 'fileid', 'lfn', 'type', 'attemptnr')) + # count of attempts starts from 0, for readability change it to start from 1 + if len(files) > 0: + input_files = [f for f in files if f['type'] in ('pseudo_input', 'input')] + if len(input_files) > 0: + attemptnr = 1 + max([f['attemptnr'] for f in input_files]) + inputfileid = max([f['fileid'] for f in input_files]) + log_lfn = [f['lfn'] for f in files if f['type'] == 'log'] + if len(log_lfn) > 0: + try: + tarindex = int(re.search('.([0-9]{6}).log.', log_lfn[0]).group(1)) + except: + _logger.info('Failed to extract tarindex from log lfn') + tarindex = None + if 'jobname' in job: + try: + gitlabid = int(re.search('.([0-9]{6,8}).', job['jobname']).group(1)) + except: + _logger.info('Failed to extract tarindex from log lfn') + gitlabid = None + _logger.info(f"""Got job-related metadata for test {pandaid}: + computingsite={computingsite}, tarindex={tarindex}, inputfileid={inputfileid}, attemptnr={attemptnr}""") # extract datetime from str nightly time nightly_tag_date = None @@ -1289,9 +1326,10 @@ def registerARTTest(request): gitlabid=gitlabid, computingsite=computingsite, status=art_const.TEST_STATUS_INDEX['active'], + test_type=test_type, ) insertRow.save() - data = {'exit_code': 0, 'message': "Provided pandaid has been successfully registered"} + data = {'exit_code': 0, 'pandaid': pandaid, 'message': "ART test has been successfully registered"} _logger.info(data['message'] + str(request.session['requestParams'])) except Exception as e: data = {'exit_code': 0, 'message': "Failed to register test, can not save the row to DB"} @@ -1644,4 +1682,4 @@ def fill_table(request): _logger.exception(f"""Failed to update test {pandaid} with gitlabid={gitlabid}\n{str(ex)}""") return JsonResponse({'message': f"Failed to update info for test {pandaid}"}, status=500) - return JsonResponse({'message': f"Updated {len(tests_to_update)} tests for ntag={ntag}, it took {(datetime.now()-start).total_seconds()}s"}, status=200) \ No newline at end of file + return JsonResponse({'message': f"Updated {len(tests_to_update)} tests for ntag={ntag}, it took {(datetime.now()-start).total_seconds()}s"}, status=200)