This repository has been archived by the owner on Jan 30, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathATLASExperiment.py
2430 lines (2040 loc) · 107 KB
/
ATLASExperiment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Class de tolog("Unset ATHENA_PROC_NUMBER")finition:
# ATLASExperiment
# This class is the ATLAS experiment class inheriting from Experiment
# Instances are generated with ExperimentFactory via pUtil::getExperiment()
# Implemented as a singleton class
# http://stackoverflow.com/questions/42558/python-and-the-singleton-pattern
# Import relevant python/pilot modules
from Experiment import Experiment # Main experiment class
from pUtil import tolog # Logging method that sends text to the pilot log
from pUtil import readpar # Used to read values from the schedconfig DB (queuedata)
from pUtil import isAnalysisJob # Is the current job a user analysis job or a production job?
from pUtil import grep # Grep function - reimplement using cli command
from pUtil import getCmtconfig # Get the cmtconfig from the job def or queuedata
from pUtil import verifyReleaseString # To verify the release string (move to Experiment later)
from pUtil import timedCommand # Protect cmd with timed_command
from pUtil import getSiteInformation # Get the SiteInformation object corresponding to the given experiment
from pUtil import isBuildJob # Is the current job a build job?
from pUtil import remove # Used to remove redundant file before log file creation
from pUtil import isAGreaterOrEqualToB #
from pUtil import convert_unicode_string # Needed to avoid unicode strings in the memory output text file
from PilotErrors import PilotErrors # Error codes
from FileHandling import readFile, writeFile # File handling methods
from FileHandling import updatePilotErrorReport # Used to set the priority of an error
from FileHandling import getJSONDictionary # Used by getUtilityInfo()
from RunJobUtilities import dumpOutput # ASCII dump
from RunJobUtilities import getStdoutFilename #
from RunJobUtilities import findVmPeaks #
from RunJobUtilities import getSourceSetup #
# Standard python modules
import re
import os
import time
import commands
from glob import glob
class ATLASExperiment(Experiment):
# private data members
__experiment = "ATLAS"
__instance = None # Boolean used by subclasses to become a Singleton
__warning = ""
__analysisJob = False
__job = None # Current Job object
__error = PilotErrors() # PilotErrors object
__doFileLookups = False # True for LFC based file lookups
__atlasEnv = False # True for releases beginning with "Atlas-"
# Required methods
# def __init__(self, *args, **kwargs):
def __init__(self):
""" Default initialization """
pass
# super(ATLASExperiment, self).__init__(self, *args, **kwargs)
def __new__(cls, *args, **kwargs):
""" Override the __new__ method to make the class a singleton """
if not cls.__instance:
cls.__instance = super(ATLASExperiment, cls).__new__(cls, *args, **kwargs)
# cls.__instance = super(ATLASExperiment, cls).__new__(cls)
return cls.__instance
def getExperiment(self):
""" Return a string with the experiment name """
return self.__experiment
def setParameters(self, *args, **kwargs):
""" Set any internally needed variables """
# set initial values
self.__job = kwargs.get('job', None)
if self.__job:
self.__analysisJob = isAnalysisJob(self.__job.trf)
else:
self.__warning = "setParameters found no job object"
def addMAKEFLAGS(self, jobCoreCount, cmd2):
""" Correct for multi-core if necessary (especially important in case coreCount=1 to limit parallel make) """
# ATHENA_PROC_NUMBER is set in Node.py using the schedconfig value
try:
coreCount = int(os.environ['ATHENA_PROC_NUMBER'])
except:
coreCount = -1
if coreCount == -1:
try:
coreCount = int(jobCoreCount)
except:
pass
else:
if coreCount >= 1:
# Note: the original request (AF) was to use j%d and not -j%d, now using the latter
cmd2 += 'export MAKEFLAGS="-j%d QUICK=1 -l1";' % (coreCount)
tolog("Added multi-core support to cmd2: %s" % (cmd2))
# make sure that MAKEFLAGS is always set
if not "MAKEFLAGS=" in cmd2:
cmd2 += 'export MAKEFLAGS="-j1 QUICK=1 -l1";'
return cmd2
def isNightliesRelease(self, homePackage):
""" Is the homePackage for a nightlies release? """
# The pilot will regard the release as a nightlies if the homePackage contains a time-stamp
# (the time stamp is aactually a sub directory;
# e.g. /cvmfs/atlas-nightlies.cern.ch/repo/sw/21.0.X/2016-12-01T2101/)
status = False
# If a timestamp can be extracted from the homePackage, it is a nightlies release
if self.extractNightliesTimestamp(homePackage) != "":
status = True
return status
# Optional
def shouldPilotPrepareASetup(self, noExecStrCnv, jobPars):
""" Should pilot be in charge of preparing asetup? """
# If noExecStrCnv is set, then jobPars is expected to contain asetup.sh + options
prepareASetup = True
if noExecStrCnv:
if "asetup.sh" in jobPars:
tolog("asetup will be taken from jobPars")
prepareASetup = False
else:
tolog("noExecStrCnv is set but asetup command was not found in jobPars (pilot will prepare asetup)")
prepareASetup = True
else:
tolog("Pilot will prepare asetup")
prepareASetup = True
return prepareASetup
def getJobExecutionCommand(self, job, jobSite, pilot_initdir):
""" Define and test the command(s) that will be used to execute the payload """
pilotErrorDiag = ""
cmd = ""
special_setup_cmd = ""
JEM = "NO"
# homePackage variants:
# user jobs
# 1. AnalysisTransforms (using asetup)
# 2. AnalysisTransforms-<project>_<cache>, e.g. project=AthAnalysisBase,AtlasDerivation,AtlasProduction,MCProd,TrigMC; cache=20.1.6.2,..
# 3. AnalysisTransforms-<project>_rel_<N>, e.g. project=AtlasOffline; N=0,1,2,..
# 4. [homaPackage not set]
#
# production jobs
# 1. <project>/<cache>, e.g. AtlasDerivation/20.1.6.2, AtlasProd1/20.1.5.10.1
# 2. <project>,rel_<N>[,devval], e.g. AtlasProduction,rel_4,devval
#
# Tested (USER ANALYSIS)
# homePackage = AnalysisTransforms
# setup = export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase;source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh --quiet;
# source $AtlasSetup/scripts/asetup.sh 17.2.7,notest --platform x86_64-slc5-gcc43-opt --makeflags=\"$MAKEFLAGS\";
# export MAKEFLAGS=\"-j1 QUICK=1 -l1\";[proxy export];./runAthena-00-00-11 ..
# homePackage = AnalysisTransforms-AtlasDerivation_20.1.5.7
# setup = export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase;source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh --quiet;
# source $AtlasSetup/scripts/asetup.sh AtlasDerivation,20.1.5.7,notest --platform x86_64-slc6-gcc48-opt --makeflags=\"$MAKEFLAGS\";
# export MAKEFLAGS=\"-j1 QUICK=1 -l1\";[proxy export];./runAthena-00-00-11 ..
# homePackage=AnalysisTransforms-AtlasDerivation_rel_1
# setup = export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase;source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh --quiet;
# source $AtlasSetup/scripts/asetup.sh AtlasDerivation,20.1.X.Y-VAL,rel_1,notest --platform x86_64-slc6-gcc48-opt --makeflags=\"$MAKEFLAGS\";
# export MAKEFLAGS=\"-j1 QUICK=1 -l1\";source /cvmfs/atlas.cern.ch/repo/sw/local/xrootdsetup.sh;[proxy export];./runAthena-00-00-11 ..
# homePackage not set, release not set
# setup = source /cvmfs/atlas.cern.ch/repo/sw/local/setup.sh; [proxy export];./runGen-00-00-02 ..
# homePackage = AnalysisTransforms-AthAnalysisBase_2.3.11 (release = AthAnalysisBase/x86_64-slc6-gcc48-opt/2.3.11
# setup = export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase;source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh --quiet;
# source $AtlasSetup/scripts/asetup.sh AthAnalysisBase,2.3.11,notest --platform x86_64-slc6-gcc48-opt --makeflags="$MAKEFLAGS";
# export MAKEFLAGS="-j1 QUICK=1 -l1";source /cvmfs/atlas.cern.ch/repo/sw/local/xrootdsetup.sh;[proxy export];./runAthena-00-00-11 ..
# Tested (PRODUCTION)
# homePackage = AtlasProduction/17.7.3.12
# setup = export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase;source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh --quiet;
# source $AtlasSetup/scripts/asetup.sh AtlasProduction,17.7.3.12 --platform x86_64-slc6-gcc46-opt --makeflags=\"$MAKEFLAGS\";Sim_tf.py ..
# homePackage = AtlasDerivation/20.1.5.7
# setup = export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase;source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh --quiet;
# source $AtlasSetup/scripts/asetup.sh AtlasDerivation,20.1.5.7 --platform x86_64-slc6-gcc48-opt --makeflags=\"$MAKEFLAGS\";Reco_tf.py ..
# TRF's:
# AtlasG4_tf.py:
# PandaID=2675460595
# Release=17.7.3
# homePackage=MCProd/17.7.3.9.6
# Sim_tf.py:
# PandaID=2675460792
# Release=1.0.3
# homePackage=AthSimulationBase/1.0.3
# Sim_tf.py, event service:
# PandaID=2676267339
# Release=Atlas-20.3.3
# homePackage=AtlasProduction/20.3.3.2
# Reco_tf.py:
# PandaID=2675925382
# Release=20.1.5
# homePackage=AtlasProduction/20.1.5.10
# Should the pilot do the asetup or do the jobPars already contain the information?
prepareASetup = self.shouldPilotPrepareASetup(job.noExecStrCnv, job.jobPars)
# Is it a user job or not?
analysisJob = isAnalysisJob(job.trf)
# Get the cmtconfig value
cmtconfig = getCmtconfig(job.cmtconfig)
# Define the setup for asetup, i.e. including full path to asetup and setting of ATLAS_LOCAL_ROOT_BASE
asetup_path = self.getModernASetup(asetup=prepareASetup)
asetup_options = " "
# Is it a standard ATLAS job? (i.e. with swRelease = 'Atlas-...')
if self.__atlasEnv:
# Normal setup (production and user jobs)
tolog("Preparing normal production/analysis job setup command")
cmd = asetup_path
if prepareASetup:
options = self.getASetupOptions(job.release, job.homePackage)
asetup_options = " " + options + " --platform " + cmtconfig
# Always set the --makeflags option (to prevent asetup from overwriting it)
asetup_options += ' --makeflags=\"$MAKEFLAGS\"'
cmd += asetup_options
# Verify that the setup works
exitcode, output = timedCommand(cmd, timeout=5*60)
if exitcode != 0:
if "No release candidates found" in output:
pilotErrorDiag = "No release candidates found"
tolog("!!WARNING!!3434!! %s" % (pilotErrorDiag))
return self.__error.ERR_NORELEASEFOUND, pilotErrorDiag, "", special_setup_cmd, JEM, cmtconfig
else:
tolog("Verified setup command")
if analysisJob:
# Set the INDS env variable (used by runAthena)
self.setINDS(job.realDatasetsIn)
# Try to download the trf
ec, pilotErrorDiag, trfName = self.getAnalysisTrf('wget', job.trf, pilot_initdir)
if ec != 0:
return ec, pilotErrorDiag, "", special_setup_cmd, JEM, cmtconfig
if prepareASetup:
ec, pilotErrorDiag, _cmd = self.getAnalysisRunCommand(job, jobSite, trfName)
if ec != 0:
return ec, pilotErrorDiag, "", special_setup_cmd, JEM, cmtconfig
else:
_cmd = job.jobPars
tolog("_cmd = %s" % (_cmd))
# Correct for multi-core if necessary (especially important in case coreCount=1 to limit parallel make)
cmd += "; " + self.addMAKEFLAGS(job.coreCount, "") + _cmd
else:
# Add Database commands if they are set by the local site
cmd += os.environ.get('PILOT_DB_LOCAL_SETUP_CMD','')
# Add the transform and the job parameters (production jobs)
if prepareASetup:
cmd += ";%s %s" % (job.trf, job.jobPars)
else:
cmd += "; " + job.jobPars
cmd = cmd.replace(';;', ';')
else: # Generic, non-ATLAS specific jobs, or at least a job with undefined swRelease
tolog("Generic job")
# Set python executable
ec, pilotErrorDiag, pybin = self.setPython(job.release, job.homePackage, cmtconfig, jobSite.sitename)
if ec == self.__error.ERR_MISSINGINSTALLATION:
return ec, pilotErrorDiag, "", special_setup_cmd, JEM, cmtconfig
if analysisJob:
# Try to download the analysis trf
status, pilotErrorDiag, trfName = self.getAnalysisTrf('wget', job.trf, pilot_initdir)
if status != 0:
return status, pilotErrorDiag, "", special_setup_cmd, JEM, cmtconfig
# Set up the run command
if (job.prodSourceLabel == 'ddm' or job.prodSourceLabel == 'software') and prepareASetup:
cmd = '%s %s %s' % (pybin, trfName, job.jobPars)
else:
if prepareASetup:
ec, pilotErrorDiag, cmd = self.getAnalysisRunCommand(job, jobSite, trfName)
if ec != 0:
return ec, pilotErrorDiag, "", special_setup_cmd, JEM, cmtconfig
else:
cmd = job.jobPars
# correct for multi-core if necessary (especially important in case coreCount=1 to limit parallel make)
cmd2 = self.addMAKEFLAGS(job.coreCount, "")
tolog("cmd2 = %s" % (cmd2))
cmd = cmd2 + cmd
# should asetup be used? If so, sqeeze it into the run command (rather than moving the entire getAnalysisRunCommand() into this class)
if prepareASetup:
m_cacheDirVer = re.search('AnalysisTransforms-([^/]+)', job.homePackage)
if m_cacheDirVer != None:
# homePackage="AnalysisTransforms-AthAnalysisBase_2.0.14"
# -> cacheDir = AthAnalysisBase, cacheVer = 2.0.14
cacheDir, cacheVer = self.getCacheInfo(m_cacheDirVer, "dummy_atlasRelease")
if cacheDir != "" and cacheVer != "":
asetup = self.getModernASetup()
asetup += " %s,%s --platform=%s;" % (cacheDir, cacheVer, cmtconfig)
# now squeeze it back in
cmd = cmd.replace('./' + trfName, asetup + './' + trfName)
tolog("Updated run command for special homePackage: %s" % (cmd))
else:
tolog("asetup not needed (mo special home package: %s)" % (job.homePackage))
else:
tolog("asetup not needed (no special homePackage)")
elif verifyReleaseString(job.homePackage) != 'NULL' and job.homePackage != ' ':
if 'HPC_' in readpar("catchall"):
cmd = {"interpreter": pybin,
"payload": ("%s/%s" % (job.homePackage, job.trf)),
"parameters": job.jobPars }
else:
if prepareASetup:
cmd = "%s %s/%s %s" % (pybin, job.homePackage, job.trf, job.jobPars)
else:
cmd = job.jobPars
else:
if 'HPC_' in readpar("catchall"):
cmd = {"interpreter": pybin,
"payload": job.trf,
"parameters": job.jobPars }
else:
if prepareASetup:
cmd = "%s %s %s" % (pybin, job.trf, job.jobPars)
else:
cmd = job.jobPars
# Add FRONTIER debugging and RUCIO env variables
if 'HPC_' in readpar("catchall") and 'HPC_HPC' not in readpar("catchall"):
cmd['environment'] = self.getEnvVars2Cmd(job.jobId, job.taskID, job.processingType, jobSite.sitename, analysisJob)
else:
cmd = self.addEnvVars2Cmd(cmd, job.jobId, job.taskID, job.processingType, jobSite.sitename, analysisJob)
if 'HPC_HPC' in readpar("catchall"):
cmd = 'export JOB_RELEASE=%s;export JOB_HOMEPACKAGE=%s;JOB_CMTCONFIG=%s;%s' % (job.release, job.homePackage, cmtconfig, cmd)
ver = os.environ.get('ALRB_asetupVersion', None)
if ver is not None:
cmd = 'export ALRB_asetupVersion=%s;%s' % (ver, cmd)
# Explicitly add the ATHENA_PROC_NUMBER (or JOB value)
cmd = self.addAthenaProcNumber(cmd)
# cmd = "export XRD_LOGLEVEL=Debug;" + cmd
# Wrap the job execution command with Singularity if necessary
from Singularity import singularityWrapper
cmd = singularityWrapper(cmd, cmtconfig, job.workdir)
tolog("\nCommand to run the job is: \n%s" % (cmd))
return 0, pilotErrorDiag, cmd, special_setup_cmd, JEM, cmtconfig
def addAthenaProcNumber(self, cmd):
"""
Add the ATHENA_PROC_NUMBER to the payload command if necessary
:param cmd: payload execution command
:return: updated payload execution command
"""
if not "ATHENA_PROC_NUMBER" in cmd:
if "ATHENA_PROC_NUMBER" in os.environ:
cmd = 'export ATHENA_PROC_NUMBER=%s;' % os.environ['ATHENA_PROC_NUMBER'] + cmd
elif "ATHENA_PROC_NUMBER_JOB" in os.environ:
try:
value = int(os.environ['ATHENA_PROC_NUMBER_JOB'])
except:
tolog("!!WARNING!!3433!! Failed to convert ATHENA_PROC_NUMBER_JOB=%s to int" % (os.environ['ATHENA_PROC_NUMBER_JOB']))
else:
if value > 1:
cmd = 'export ATHENA_PROC_NUMBER=%d;' % value + cmd
else:
tolog("Will not add ATHENA_PROC_NUMBER to cmd since the value is %d" % value)
else:
tolog("!!WARNING!!3434!! Don't know how to set ATHENA_PROC_NUMBER (could not find it in os.environ)")
else:
tolog("ATHENA_PROC_NUMBER already in job command")
return cmd
def getAnalysisRunCommand(self, job, jobSite, trfName):
""" Get the run command for analysis jobs """
# The run command is used to setup up user job transform
from RunJobUtilities import updateCopysetups
ec = 0
pilotErrorDiag = ""
run_command = ""
# get the queuedata info
# (directAccess info is stored in the copysetup variable)
# get relevant file transfer info
dInfo, useCopyTool, useDirectAccess, dummy, oldPrefix, newPrefix, copysetup, usePFCTurl =\
self.getFileTransferInfo(job.transferType, isBuildJob(job.outFiles))
# add the user proxy
if os.environ.has_key('X509_USER_PROXY'):
run_command += 'export X509_USER_PROXY=%s;' % os.environ['X509_USER_PROXY']
else:
tolog("Could not add user proxy to the run command (proxy does not exist)")
# set up analysis trf
run_command += './%s %s' % (trfName, job.jobPars)
# sort out direct access info for non-FAX cases
if dInfo:
# in case of forced usePFCTurl
if usePFCTurl and not '--usePFCTurl' in run_command:
oldPrefix = ""
newPrefix = ""
run_command += ' --usePFCTurl'
tolog("reset old/newPrefix (forced TURL mode (1))")
# sort out when directIn should be used
if useDirectAccess and '--directIn' not in job.jobPars and '--directIn' not in run_command:
run_command += ' --directIn'
# old style copysetups will contain oldPrefix and newPrefix needed for the old style remote I/O
if oldPrefix != "" and newPrefix != "":
run_command += ' --oldPrefix "%s" --newPrefix %s' % (oldPrefix, newPrefix)
else:
# --directIn should be used in combination with --usePFCTurl, but not --old/newPrefix
if usePFCTurl and not '--usePFCTurl' in run_command:
run_command += ' --usePFCTurl'
if job.transferType == 'fax' and readpar('direct_access_wan').lower() == 'true' and '--directIn' not in run_command:
run_command += ' --directIn'
if job.transferType == 'direct':
# update the copysetup
# transferType is only needed if copysetup does not contain remote I/O info
updateCopysetups(run_command, transferType=job.transferType, useCT=False, directIn=useDirectAccess)
# add options for file stager if necessary (ignore if transferType = direct)
if "accessmode" in job.jobPars and job.transferType != 'direct':
accessmode_useCT = None
accessmode_directIn = None
_accessmode_dic = { "--accessmode=copy":["copy-to-scratch mode", ""],
"--accessmode=direct":["direct access mode", " --directIn"]}
# update run_command according to jobPars
for _mode in _accessmode_dic.keys():
if _mode in job.jobPars:
# any accessmode set in jobPars should overrule schedconfig
tolog("Enforcing %s" % (_accessmode_dic[_mode][0]))
if _mode == "--accessmode=copy":
# make sure direct access and file stager get turned off
usePFCTurl = False
accessmode_useCT = True
accessmode_directIn = False
elif _mode == "--accessmode=direct":
# make sure copy-to-scratch and file stager get turned off
usePFCTurl = True
accessmode_useCT = False
accessmode_directIn = True
else:
usePFCTurl = False
accessmode_useCT = False
accessmode_directIn = False
# update run_command (do not send the accessmode switch to runAthena)
run_command += _accessmode_dic[_mode][1]
if _mode in run_command:
run_command = run_command.replace(_mode, "")
if "directIn" in run_command and not dInfo:
if not usePFCTurl:
usePFCTurl = True
tolog("WARNING: accessmode mode specified directIn but direct access mode is not specified in copysetup (will attempt to create TURL based PFC later)")
if not "usePFCTurl" in run_command:
run_command += ' --usePFCTurl'
# need to add proxy if not there already
if "--directIn" in run_command and not "export X509_USER_PROXY" in run_command:
if os.environ.has_key('X509_USER_PROXY'):
run_command = run_command.replace("./%s" % (trfName), "export X509_USER_PROXY=%s;./%s" % (os.environ['X509_USER_PROXY'], trfName))
else:
tolog("Did not add user proxy to the run command (proxy does not exist)")
# update the copysetup
updateCopysetups(run_command, transferType=None, useCT=accessmode_useCT, directIn=accessmode_directIn)
# add guids when needed
# get the correct guids list (with only the direct access files)
if not isBuildJob(job.outFiles):
_guids = self.getGuidsFromJobPars(job.jobPars, job.inFiles, job.inFilesGuids)
run_command += ' --inputGUIDs \"%s\"' % (str(_guids))
# if both direct access and the accessmode loop added a directIn switch, remove the first one from the string
if run_command.count("directIn") > 1:
run_command = run_command.replace("--directIn", "", 1)
return ec, pilotErrorDiag, run_command
def getFileLookups(self):
""" Return the file lookup boolean """
return self.__doFileLookups
def doFileLookups(self, doFileLookups):
""" Update the file lookups boolean """
self.__doFileLookups = doFileLookups
def willDoFileLookups(self):
""" Should (LFC) file lookups be done by the pilot or not? """
return self.getFileLookups()
def willDoAlternativeFileLookups(self):
""" Should file lookups be done using alternative methods? """
# E.g. in the migration period where LFC lookups are halted in favour of other methods in the Rucio API
# (for ATLAS), this method could be useful. See the usage in Mover::getReplicaDictionary() which is called
# after Experiment::willDoFileLookups() defined above. The motivation is that direct LFC calls are not to be
# used any longer by the pilot, and in the migration period the actual LFC calls will be done in the Rucio
# API. Eventually this API will switch to alternative file lookups.
tolog("Using alternative file catalog lookups")
return True
def willDoFileRegistration(self):
""" Should (LFC) file registration be done by the pilot or not? """
status = False
# should the LFC file registration be done by the pilot or by the server?
if readpar('lfcregister') != "server":
status = True
# make sure that the lcgcpSiteMover (and thus lcg-cr) is not used
if readpar('copytool') == "lcgcp" or readpar('copytool') == "lcg-cp":
status = False
return status
# Additional optional methods
def removeRedundantFiles(self, workdir):
""" Remove redundant files and directories """
tolog("Removing redundant files prior to log creation")
dir_list = ["AtlasProduction*",
"AtlasPoint1",
"AtlasTier0",
"buildJob*",
"CDRelease*",
"csc*.log",
"DBRelease*",
"EvgenJobOptions",
"external",
"fort.*",
"geant4",
"geomDB",
"geomDB_sqlite",
"home",
"o..pacman..o",
"pacman-*",
"python",
"runAthena*",
"share",
"sources.*",
"sqlite*",
"sw",
"tcf_*",
"triggerDB",
"trusted.caches",
"workdir",
"*.data*",
"*.events",
"*.py",
"*.pyc",
"*.root*",
"JEM",
"tmp*",
"*.tmp",
"*.TMP",
"MC11JobOptions",
"scratch",
"jobState-*-test.pickle",
"*.writing",
"pwg*",
"pwhg*",
"*PROC*",
"madevent",
"HPC",
"objectstore*.json",
"saga",
"radical",
"movers",
"_joproxy15",
"ckpt*",
"HAHM_*",
"Process",
"merged_lhef._0.events-new"]
# remove core and pool.root files from AthenaMP sub directories
try:
self.cleanupAthenaMP(workdir)
except Exception, e:
tolog("!!WARNING!!2341!! Failed to execure cleanupAthenaMP(): %s" % (e))
# explicitly remove any soft linked archives (.a files) since they will be dereferenced by the tar command (--dereference option)
matches = []
import fnmatch
for root, dirnames, filenames in os.walk(workdir):
for filename in fnmatch.filter(filenames, '*.a'):
matches.append(os.path.join(root, filename))
for root, dirnames, filenames in os.walk(os.path.dirname(workdir)):
for filename in fnmatch.filter(filenames, 'EventService_premerge_*.tar'):
matches.append(os.path.join(root, filename))
if matches != []:
tolog("!!WARNING!!4990!! Encountered %d archive files - will be purged" % len(matches))
tolog("To be removed: %s" % (matches))
rc = remove(matches)
if not rc:
tolog("WARNING: Failed to remove redundant files")
else:
tolog("Found no archive files")
# note: these should be partitial file/dir names, not containing any wildcards
exceptions_list = ["runargs", "runwrapper", "jobReport", "log."]
for _dir in dir_list:
files = glob(os.path.join(workdir, _dir))
exclude = []
# remove any dirs/files from the exceptions list
if files:
for exc in exceptions_list:
for f in files:
if exc in f:
exclude.append(f)
if exclude != []:
tolog('To be excluded from removal: %s' % (exclude))
_files = []
for f in files:
if not f in exclude:
_files.append(f)
files = _files
tolog("To be removed: %s" % (files))
rc = remove(files)
if not rc:
tolog("IGNORE: Failed to remove redundant file(s): %s" % (files))
# run a second pass to clean up any broken links
broken = []
for root, dirs, files in os.walk(workdir):
for filename in files:
path = os.path.join(root,filename)
if os.path.islink(path):
target_path = os.readlink(path)
# Resolve relative symlinks
if not os.path.isabs(target_path):
target_path = os.path.join(os.path.dirname(path),target_path)
if not os.path.exists(target_path):
broken.append(path)
else:
# If it's not a symlink we're not interested.
continue
if broken != []:
tolog("!!WARNING!!4991!! Encountered %d broken soft links - will be purged" % len(broken))
rc = remove(broken)
if not rc:
tolog("WARNING: Failed to remove broken soft links")
else:
tolog("Found no broken links")
def getWarning(self):
""" Return any warning message passed to __warning """
return self.__warning
def displayChangeLog(self):
""" Display the cvmfs ChangeLog is possible """
# 'head' the ChangeLog on cvmfs (/cvmfs/atlas.cern.ch/repo/sw/ChangeLog)
# get the site information object
si = getSiteInformation(self.__experiment)
appdir = readpar('appdir')
if appdir == "":
if os.environ.has_key('VO_ATLAS_SW_DIR'):
appdir = os.environ['VO_ATLAS_SW_DIR']
else:
appdir = ""
if appdir != "":
# there might be more than one appdir, try them all
appdirs = si.getAppdirs(appdir)
tolog("appdirs = %s" % str(appdirs))
for appdir in appdirs:
path = os.path.join(appdir, 'ChangeLog')
if os.path.exists(path):
try:
rs = commands.getoutput("head %s" % (path))
except Exception, e:
tolog("!!WARNING!!1232!! Failed to read the ChangeLog: %s" % (e))
else:
rs = "\n"+"-"*80 + "\n" + rs
rs += "\n"+"-"*80
tolog("head of %s: %s" % (path, rs))
else:
tolog("No such path: %s (ignore)" % (path))
else:
tolog("Can not display ChangeLog: Found no appdir")
def getCVMFSPath(self):
""" Return the proper cvmfs path """
# get the site information object
si = getSiteInformation(self.__experiment)
return si.getFileSystemRootPath()
def testCVMFS(self):
""" Run the CVMFS diagnostics tool """
status = False
timeout = 5*60
cmd = "export ATLAS_LOCAL_ROOT_BASE=%s/atlas.cern.ch/repo/ATLASLocalRootBase;$ATLAS_LOCAL_ROOT_BASE/utilities/checkValidity.sh" % \
(self.getCVMFSPath())
tolog("Executing command: %s (time-out: %d)" % (cmd, timeout))
exitcode, output = timedCommand(cmd, timeout=timeout)
if exitcode != 0:
if "No such file or directory" in output:
tolog("!!WARNING!!1235!! Command checkValidity.sh was not found (can not run CVMFS validity test)")
status = True
elif "timed out" in output:
tolog("!!WARNING!!1236!! Command checkValidity.sh timed out: %s (ignore)" % (output))
status = True
else:
tolog("!!WARNING!!1234!! CVMFS diagnostics tool failed: %d, %s" % (exitcode, output))
else:
tolog("Diagnostics tool has verified CVMFS")
status = True
return status
def getNumberOfEvents(self, **kwargs):
""" Return the number of events """
# ..and a string of the form N|N|..|N with the number of jobs in the trf(s)
job = kwargs.get('job', None)
number_of_jobs = kwargs.get('number_of_jobs', 1)
if not job:
tolog("!!WARNING!!2332!! getNumberOfEvents did not receive a job object")
return 0, 0, ""
tolog("Looking for number of processed events (pass -1: jobReport.json)")
from FileHandling import getNumberOfEvents
nEventsRead = getNumberOfEvents(job.workdir)
nEventsWritten = 0
if nEventsRead > 0:
return nEventsRead, nEventsWritten, str(nEventsRead)
else:
nEventsRead = 0
tolog("Looking for number of processed events (pass 0: metadata.xml)")
nEventsRead = self.processMetadata(job.workdir)
nEventsWritten = 0
if nEventsRead > 0:
return nEventsRead, nEventsWritten, str(nEventsRead)
else:
nEventsRead = 0
tolog("Looking for number of processed events (pass 1: Athena summary file(s))")
nEventsRead, nEventsWritten = self.processAthenaSummary(job.workdir)
if nEventsRead > 0:
return nEventsRead, nEventsWritten, str(nEventsRead)
tolog("Looking for number of processed events (pass 2: Resorting to brute force grepping of payload stdout)")
nEvents_str = ""
for i in range(number_of_jobs):
_stdout = job.stdout
if number_of_jobs > 1:
_stdout = _stdout.replace(".txt", "_%d.txt" % (i + 1))
filename = os.path.join(job.workdir, _stdout)
N = 0
if os.path.exists(filename):
tolog("Processing stdout file: %s" % (filename))
matched_lines = grep(["events processed so far"], filename)
if len(matched_lines) > 0:
if "events read and" in matched_lines[-1]:
# event #415044, run #142189 2 events read and 0 events processed so far
N = int(re.match('.* run #\d+ \d+ events read and (\d+) events processed so far.*', matched_lines[-1]).group(1))
else:
# event #4, run #0 3 events processed so far
N = int(re.match('.* run #\d+ (\d+) events processed so far.*', matched_lines[-1]).group(1))
if len(nEvents_str) == 0:
nEvents_str = str(N)
else:
nEvents_str += "|%d" % (N)
nEventsRead += N
return nEventsRead, nEventsWritten, nEvents_str
def processMetadata(self, workdir):
""" Extract number of events from metadata.xml """
N = 0
filename = os.path.join(workdir, "metadata.xml")
if os.path.exists(filename):
# Get the metadata
try:
f = open(filename, "r")
except IOError, e:
tolog("!!WARNING!!1222!! Exception: %s" % (e))
else:
xmlIN = f.read()
f.close()
# Get the XML objects
from xml.dom import minidom
xmldoc = minidom.parseString(xmlIN)
fileList = xmldoc.getElementsByTagName("File")
# Loop over all files, assume that the number of events are the same in all files
for _file in fileList:
lrc_metadata_dom = _file.getElementsByTagName("metadata")
for i in range(len(lrc_metadata_dom)):
_key = str(_file.getElementsByTagName("metadata")[i].getAttribute("att_name"))
_value = str(_file.getElementsByTagName("metadata")[i].getAttribute("att_value"))
if _key == "events" and _value:
try:
N = int(_value)
except Exception, e:
tolog("!!WARNING!!1222!! Number of events not an integer: %s" % (e))
else:
tolog("Number of events from metadata file: %d" % (N))
break
else:
tolog("%s does not exist" % (filename))
return N
def processAthenaSummary(self, workdir):
""" extract number of events etc from athena summary file(s) """
N1 = 0
N2 = 0
file_pattern_list = ['AthSummary*', 'AthenaSummary*']
file_list = []
# loop over all patterns in the list to find all possible summary files
for file_pattern in file_pattern_list:
# get all the summary files for the current file pattern
files = glob(os.path.join(workdir, file_pattern))
# append all found files to the file list
for summary_file in files:
file_list.append(summary_file)
if file_list == [] or file_list == ['']:
tolog("Did not find any athena summary files")
else:
# find the most recent and the oldest files
oldest_summary_file = ""
recent_summary_file = ""
oldest_time = 9999999999
recent_time = 0
if len(file_list) > 1:
for summary_file in file_list:
# get the modification time
try:
st_mtime = os.path.getmtime(summary_file)
except Exception, e:
tolog("!!WARNING!!1800!! Could not read modification time of file %s: %s" % (summary_file, str(e)))
else:
if st_mtime > recent_time:
recent_time = st_mtime
recent_summary_file = summary_file
if st_mtime < oldest_time:
oldest_time = st_mtime
oldest_summary_file = summary_file
else:
oldest_summary_file = file_list[0]
recent_summary_file = oldest_summary_file
oldest_time = os.path.getmtime(oldest_summary_file)
recent_time = oldest_time
if oldest_summary_file == recent_summary_file:
tolog("Summary file: %s: Will be processed for errors and number of events" %\
(os.path.basename(oldest_summary_file)))
else:
tolog("Most recent summary file: %s (updated at %d): Will be processed for errors" %\
(os.path.basename(recent_summary_file), recent_time))
tolog("Oldest summary file: %s (updated at %d): Will be processed for number of events" %\
(os.path.basename(oldest_summary_file), oldest_time))
# Get the number of events from the oldest summary file
try:
f = open(oldest_summary_file, "r")
except Exception, e:
tolog("!!WARNING!!1800!! Failed to get number of events from summary file. Could not open file: %s" % str(e))
else:
lines = f.readlines()
f.close()
if len(lines) > 0:
for line in lines:
if "Events Read:" in line:
N1 = int(re.match('Events Read\: *(\d+)', line).group(1))
if "Events Written:" in line:
N2 = int(re.match('Events Written\: *(\d+)', line).group(1))
if N1 > 0 and N2 > 0:
break
else:
tolog("!!WARNING!!1800!! Failed to get number of events from summary file. Encountered an empty summary file.")
tolog("Number of events: %d (read)" % (N1))
tolog("Number of events: %d (written)" % (N2))
# Get the errors from the most recent summary file
# ...
return N1, N2
def isOutOfMemory(self, **kwargs):
""" Try to identify out of memory errors in the stderr/out """
# (Used by ErrorDiagnosis)
# make this function shorter, basically same code twice
out_of_memory = False
job = kwargs.get('job', None)
number_of_jobs = kwargs.get('number_of_jobs', 1)
if not job:
tolog("!!WARNING!!3222!! isOutOfMemory() did not receive a job object")
return False
tolog("Checking for memory errors in stderr")
for i in range(number_of_jobs):
_stderr = job.stderr
if number_of_jobs > 1:
_stderr = _stderr.replace(".txt", "_%d.txt" % (i + 1))
filename = os.path.join(job.workdir, _stderr)
if os.path.exists(filename):
tolog("Processing stderr file: %s" % (filename))
if os.path.getsize(filename) > 0:
tolog("WARNING: %s produced stderr, will dump to log" % (job.payload))
stderr_output = dumpOutput(filename)
if stderr_output.find("MemoryRescueSvc") >= 0 and \
stderr_output.find("FATAL out of memory: taking the application down") > 0:
out_of_memory = True
else:
tolog("Warning: File %s does not exist" % (filename))
# try to identify out of memory errors in the stdout
tolog("Checking for memory errors in stdout..")
for i in range(number_of_jobs):
_stdout = job.stdout
if number_of_jobs > 1:
_stdout = _stdout.replace(".txt", "_%d.txt" % (i + 1))
filename = os.path.join(job.workdir, _stdout)
if os.path.exists(filename):
tolog("Processing stdout file: %s" % (filename))
matched_lines = grep(["St9bad_alloc", "std::bad_alloc"], filename)
if len(matched_lines) > 0:
tolog("Identified an out of memory error in %s stdout:" % (job.payload))
for line in matched_lines:
tolog(line)
out_of_memory = True
else:
tolog("Warning: File %s does not exist" % (filename))
return out_of_memory
def getCacheInfo(self, m_cacheDirVer, atlasRelease):
""" Get the cacheDir and cacheVer """