-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpipelineAttributes.py
902 lines (693 loc) · 40.3 KB
/
pipelineAttributes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
#!/bin/bash/python
from __future__ import print_function
import networkx as nx
from copy import deepcopy
import configurationClassErrors
from configurationClassErrors import *
import nodeAttributes
from nodeAttributes import *
import edgeAttributes
from edgeAttributes import *
import toolAttributes
from toolAttributes import *
import json
import os
import sys
# Define a class to store general pipeline attributes,
class pipelineAttributes:
def __init__(self):
# Provide a description of the pipeline.
self.description = None
# As with the tools, untested pipelines can be listed as experimental until they
# have been refined. This ensures that users are aware that the pipeline should
# be used with caution.
self.isDevelopmental = False
# If the pipeline should be hidden in the help.
self.isHiddenInHelp = False
# Define the categories to which pipelines belong.
self.categories = []
# Define a class to store task attribtues.
class taskAttributes:
def __init__(self):
self.tool = {}
self.outputStream = False
# Define a class to store information on pipeline nodes.
class pipelineNodeAttributes:
def __init__(self):
# Store a description of the configuration node.
self.description = None
# Store information on extensions used by the different arguments.
# This is used to link arguments to filename stub arguments.
self.extensions = None
# Store whether files created should be deleted after being used.
self.deleteFiles = False
# Provide a unique ID for the configuration node.
self.ID = None
# Store a list of greedy tasks. These are tasks which will consume all
# iterations of values from predecessor nodes.
self.greedyTasks = None
# Store if the argument is required. This will supercede instructions from the
# tool configuration files.
self.isRequired = False
# The long and short forms of the pipeline arguments.
self.longFormArgument = None
self.shortFormArgument = None
# Store the task/arguments associated with the configuration node.
self.tasks = None
# Store information on additional edges.
self.originatingEdges = None
# Store information on additional nodes. These are included in the pipeline configuration
# file and are used to essentially create a new node for an argument that already exists.
self.additionalNodes = {}
# Store information about evaluating a command.
self.evaluateCommand = None
# Define a class to store information for evaluating commands at run-time.
class evaluateCommandAttributes:
def __init__(self):
# Define the command to be evaluated.
self.command = None
# Store the values to be added to the command.
self.values = {}
# Define a class to store pipeline argument attributes.
class argumentAttributes:
def __init__(self):
# A description of the argument.
self.description = None
# If the argument is required by the pipeline, even if not by the tool.
self.isRequired = False
# Store the ID for the argument.
self.ID = None
# The node ID in the configuration file.
self.configNodeID = None
# The arguments short form.
self.shortFormArgument = None
class pipelineConfiguration:
def __init__(self):
# Define the attributes for a pipeline.
self.attributes = {}
# Define the task attributes.
self.taskAttributes = {}
# Define the node attributes.
self.nodeAttributes = {}
# Define commands to be evaluated at run time.
self.evaluateCommands = {}
# Define the errors class for handling errors.
self.errors = configurationClassErrors()
# Define a structure to store pipeline argument information.
self.pipelineArguments = {}
self.unassignedArguments = []
# Define a structure to store which tasks and arguments are common to a node. For
# example, if two tasks use the same input file, they appear in the configuration
# file in the same node in the tasks (or greedy tasks) section. Duplicate the
# greedy tasks in their own data structure,
self.commonNodes = {}
self.greedyTasks = {}
# Store information on additional edges to be created in the pipeline graph.
self.originatingEdges = {}
self.originatingConfigID = {}
# Store information on additional nodes. These are included in the pipeline configuration
# file and are used to essentially create a new node for an argument that already exists.
self.hasAdditionalNodes = False
self.additionalNodes = {}
# Define the pipeline workflow.
self.workflow = []
# Define a dictionary to store information about extensions.
self.linkedExtension = {}
# Define a structure that links the task and argument described in the 'tasks' list in
# the nodes section with the pipeline argument. Also define the reverse, that provides
# a list of all tasks and respective arguments for each pipeline argument.
self.taskArgument = {}
self.pipelineToTaskArgument = {}
# Keep track of tasks that output to the stream
self.tasksOutputtingToStream = {}
# Define a structure to keep track of which task arguments are linked to others.
self.linkedTaskArguments = {}
# Define a variable to determine whether termination should result if an error in a
# configuration file is found.
self.allowTermination = True
# Define the methods to operate on the graph nodes and edges,
self.edgeMethods = edgeClass()
self.nodeMethods = nodeClass()
#TODO REQUIRED?
self.filename = ''
self.pipelineName = ''
# Validate the contents of the tool configuration file.
def processConfigurationData(self, data, pipeline, toolFiles, allowedCategories, allowTermination):
# Set the allowTermination variable. Each of the following subroutines check different
# aspects of the configuration file. If problems are found, termination will result with
# an error message unless allowTermination is set to False.
self.allowTermination = allowTermination
success = True
# Parse the pipeline configuration file and check that all fields are valid. Ensure that there are
# no errors, omissions or inconsistencies. Store the information in the relevant data structures
# as the checks are performed.
#
# Check the general tool information.
success, self.attributes = self.checkGeneralAttributes(pipeline, data)
# Check the 'tasks' section of the configuration file.
if success: success = self.checkTasks(pipeline, data['tasks'], toolFiles)
# Check the contents of the nodes section.
if success: success = self.checkNodes(pipeline, data['nodes'])
# Check evaluate commands information.
if success: success = self.checkEvaluateCommands(pipeline)
# Check and store the pipeline arguments.
if success: self.setPipelineArguments()
# From the node data, define which arguments are greedy.
if success: success = self.getNodeTasks(pipeline)
# Check any of the configuration file nodes for information on additional edges.
if success: success = self.checkOriginatingEdges(pipeline)
# Check if any additional nodes have been defined.
if success: success = self.checkAdditionalNodes(pipeline)
# Check that some edges can be defined for the node. It is permitted that the 'tasks'
# field is empty, for example, but 'greedy tasks' or 'originating edges' must then be
# populated.
if success: success = self.checkEdgesCanBeConstructed(pipeline)
# Check that the category to which the pipeline is assigned is valid.
if success: success = self.checkCategory(pipeline, allowedCategories)
return success
def checkGeneralAttributes(self, pipeline, data):
# Set the general tool attributes.
attributes = pipelineAttributes()
# Define the allowed general attributes.
allowedAttributes = {}
allowedAttributes['description'] = (str, True, True, 'description')
allowedAttributes['developmental'] = (bool, False, True, 'isDevelopmental')
allowedAttributes['categories'] = (list, True, True, 'categories')
allowedAttributes['hide in help'] = (bool, False, True, 'isHiddenInHelp')
allowedAttributes['parameter sets'] = (list, True, False, None)
allowedAttributes['nodes'] = (list, True, False, None)
allowedAttributes['tasks'] = (dict, True, False, None)
# Keep track of the observed required values.
observedAttributes = {}
# Loop over all of the attributes in the configuration file.
for attribute in data:
# If the value is not in the allowedAttributes, it is not an allowed value and execution
# should be terminate with an error.
if attribute not in allowedAttributes:
if self.allowTermination: self.errors.invalidGeneralAttributeInConfigurationFile(pipeline, attribute, allowedAttributes, True)
else: return False, attributes
# Mark this values as having been observed,
observedAttributes[attribute] = True
# Check that the value given to the attribute is of the correct type. If the value is unicode,
# convert to a string first.
value = str(data[attribute]) if isinstance(data[attribute], unicode) else data[attribute]
if allowedAttributes[attribute][0] != type(value):
if self.allowTermination:
self.errors.incorrectTypeInPipelineConfigurationFile(pipeline, attribute, value, allowedAttributes[attribute][0], 'general')
else: return False, attributes
# At this point, the attribute in the configuration file is allowed and of valid type. Check that
# the value itself is valid (if necessary) and store the value.
if allowedAttributes[attribute][2]: self.setAttribute(attributes, allowedAttributes[attribute][3], value)
# Having parsed all of the general attributes attributes, check that all those that are required
# are present.
for attribute in allowedAttributes:
if allowedAttributes[attribute][1] and attribute not in observedAttributes:
if self.allowTermination: self.errors.missingGeneralAttributeInConfigurationFile(pipeline, attribute, allowedAttributes, True)
return False, attributes
return True, attributes
# Check the 'tasks' section of the configuration file.
def checkTasks(self, pipeline, tasks, toolFiles):
# Define the allowed general attributes.
allowedAttributes = {}
allowedAttributes['tool'] = (str, True, True, 'tool')
allowedAttributes['output to stream'] = (bool, False, True, 'outputStream')
for task in tasks:
# Define the taskAttributes object.
attributes = taskAttributes()
# Keep track of the observed required values.
observedAttributes = {}
# Check that the task name is accompanied by a dictionary.
if not isinstance(tasks[task], dict):
if self.allowTermination: self.errors.taskIsNotDictionary(pipeline, task)
else: return False
# Loop over the included attributes.
for attribute in tasks[task]:
if attribute not in allowedAttributes:
if self.allowTermination: self.errors.invalidAttributeInTasks(pipeline, task, attribute, allowedAttributes)
return False
# Check that the value given to the attribute is of the correct type. If the value is unicode,
# convert to a string first.
value = str(tasks[task][attribute]) if isinstance(tasks[task][attribute], unicode) else tasks[task][attribute]
if allowedAttributes[attribute][0] != type(value):
if self.allowTermination:
self.errors.incorrectTypeInPipelineConfigurationFile(pipeline, attribute, value, allowedAttributes[attribute][0], 'tasks')
else: return False
# Mark the attribute as seen.
observedAttributes[attribute] = True
# Store the given attribtue.
if allowedAttributes[attribute][2]: self.setAttribute(attributes, allowedAttributes[attribute][3], tasks[task][attribute])
# Having parsed all of the general attributes attributes, check that all those that are required
# are present.
for attribute in allowedAttributes:
if allowedAttributes[attribute][1] and attribute not in observedAttributes:
if self.allowTermination: self.errors.missingAttributeInPipelineConfigurationFile(pipeline, attribute, allowedAttributes, 'tasks', None)
else: return False
# Check that each task has a tool defined and that a tool configuration file exists for this tool.
tool = tasks[task]['tool']
if tool + '.json' not in toolFiles:
if self.allowTermination: self.errors.invalidToolInPipelineConfigurationFile(pipeline, task, tool)
else: return False
# Store the attributes for the task.
self.taskAttributes[task] = attributes
return True
# Check the contents of the nodes section.
def checkNodes(self, pipeline, nodes):
# Define the allowed nodes attributes.
allowedAttributes = {}
allowedAttributes['additional nodes'] = (dict, False, True, 'additionalNodes')
allowedAttributes['ID'] = (str, True, True, 'ID')
allowedAttributes['description'] = (str, True, True, 'description')
allowedAttributes['evaluate command'] = (dict, False, True, 'evaluateCommand')
allowedAttributes['extensions'] = (dict, False, True, 'extensions')
allowedAttributes['greedy tasks'] = (dict, False, True, 'greedyTasks')
allowedAttributes['delete files'] = (bool, False, True, 'deleteFiles')
allowedAttributes['long form argument'] = (str, False, True, 'longFormArgument')
allowedAttributes['originating edges'] = (dict, False, True, 'originatingEdges')
allowedAttributes['required'] = (bool, False, True, 'isRequired')
allowedAttributes['short form argument'] = (str, False, True, 'shortFormArgument')
allowedAttributes['tasks'] = (dict, True, True, 'tasks')
# Loop over all of the defined nodes.
for node in nodes:
# Check that node is a dictionary.
if not isinstance(node, dict):
if self.allowTermination: self.errors.nodeIsNotADictionary(pipeline)
else: return False
# Define the attributes object.
attributes = pipelineNodeAttributes()
# Keep track of the observed required values.
observedAttributes = {}
# Check that the node has an ID. This will be used to identify the node in error messages.
try: ID = node['ID']
except:
if self.allowTermination: self.errors.noIDInPipelineNode(pipeline)
else: return False
# Loop over all attributes in the node.
for attribute in node:
if attribute not in allowedAttributes:
if self.allowTermination: self.errors.invalidAttributeInNodes(pipeline, ID, attribute, allowedAttributes)
else: return False
# Check that the value given to the attribute is of the correct type. If the value is unicode,
# convert to a string first.
value = str(node[attribute]) if isinstance(node[attribute], unicode) else node[attribute]
if allowedAttributes[attribute][0] != type(value):
if self.allowTermination:
self.errors.incorrectTypeInPipelineConfigurationFile(pipeline, attribute, value, allowedAttributes[attribute][0], 'nodes')
else: return False
# Mark the attribute as seen.
observedAttributes[attribute] = True
# Store the given attribtue.
if allowedAttributes[attribute][2]: self.setAttribute(attributes, allowedAttributes[attribute][3], node[attribute])
# Having parsed all of the general attributes attributes, check that all those that are required
# are present.
for attribute in allowedAttributes:
if allowedAttributes[attribute][1] and attribute not in observedAttributes:
if self.allowTermination: self.errors.missingAttributeInPipelineConfigurationFile(pipeline, attribute, allowedAttributes, 'nodes', ID)
else: return False
# Store the attributes.
self.nodeAttributes[ID] = attributes
return True
# Check evaluate commands information.
def checkEvaluateCommands(self, pipeline):
# Define the allowed nodes attributes.
allowedAttributes = {}
allowedAttributes['command'] = (str, False, True, '')
allowedAttributes['add values'] = (dict, False, False, '')
# Loop over all of the defined nodes.
for ID in self.nodeAttributes:
if self.nodeAttributes[ID].evaluateCommand:
# Get the task/argument pair whose value is to be set as the evaulation of a command.
taskList = self.nodeAttributes[ID].tasks
# Check that the task/argument pair has not already had a command assigned.
for task in taskList:
argument = taskList[task]
if task in self.evaluateCommands:
if argument in self.evaluateCommands[task]:
if self.allowTermination: self.errors.multipleEvaluationsForArgument(pipeline, ID, task, argument)
else: return False
# Keep track of the observed required values.
observedAttributes = {}
# Loop over all the attributes.
for attribute in self.nodeAttributes[ID].evaluateCommand:
value = self.nodeAttributes[ID].evaluateCommand[attribute]
# Check that the attribute is valid.
if attribute not in allowedAttributes:
if self.allowTermination: self.errors.invalidAttributeInEvaluateCommand(pipeline, ID, attribute, allowedAttributes)
else: return False
# Record that the attribute was observed.
observedAttributes[attribute] = True
# Having parsed all of the general attributes attributes, check that all those that are required
# are present.
for attribute in allowedAttributes:
if allowedAttributes[attribute][1] and attribute not in observedAttributes:
if self.allowTermination: self.errors.missingAttributeInEvaluateCommand(pipeline, ID, attribute, allowedAttributes)
else: return False
# If the add values attribute is present, check that the task/argument it points to are valid
# and that the ID associated with the value is present in the command and is unique.
observedIDs = []
for valueDictionary in self.nodeAttributes[ID].evaluateCommand['add values']:
success, observedID = self.checkEvaluateCommandValues(pipeline, ID, valueDictionary, self.nodeAttributes[ID].evaluateCommand['command'])
if observedID:
if observedID in observedIDs:
if self.allowTermination: self.errors.nonUniqueEvaluateCommandID(pipeline, ID, observedID)
else: return False
# Store the observedID.
observedIDs.append(observedID)
# Store the command information.
for task in taskList:
argument = taskList[task]
attributes = evaluateCommandAttributes()
attributes.command = self.nodeAttributes[ID].evaluateCommand['command']
attributes.values = {}
for valueDictionary in self.nodeAttributes[ID].evaluateCommand['add values']:
attributes.values[valueDictionary['ID']] = (valueDictionary['task'], valueDictionary['argument'])
if task not in self.evaluateCommands: self.evaluateCommands[task] = {}
self.evaluateCommands[task][argument] = attributes
return True
# Check the values supplied for evaluate commands.
def checkEvaluateCommandValues(self, pipeline, ID, dictionary, command):
# Define the allowed attributes.
allowedAttributes = {}
allowedAttributes['argument'] = (str, True)
allowedAttributes['ID'] = (str, True)
allowedAttributes['task'] = (str, True)
# Keep track of the observed required values.
observedAttributes = {}
# Loop through the attributes.
for attribute in dictionary:
value = dictionary[attribute]
# Check that the attribute is valid.
if attribute not in allowedAttributes:
if self.allowTermination: self.errors.invalidAttributeInEvaluateCommandValues(pipeline, ID, attribute, allowedAttributes)
# Record that the attribute was observed.
observedAttributes[attribute] = True
# Having parsed all of the general attributes attributes, check that all those that are required
# are present.
for attribute in allowedAttributes:
if allowedAttributes[attribute][1] and attribute not in observedAttributes:
if self.allowTermination: self.errors.missingAttributeInEvaluateCommandValues(pipeline, ID, attribute, allowedAttributes)
else: return False, ''
# Having determine that the 'add values' section is complete and valid, check that the task is valid and that the
# ID defining the value is present in the command. Checking that the argument is valid is performed later, when all
# of the tool configuration files have been evaluated.
if dictionary['task'] not in self.taskAttributes.keys():
if self.allowTermination: self.errors.unknownTaskInEvaluateCommandValues(pipeline, ID, dictionary['task'])
else: return False, ''
# Check that the ID is in the command.
if dictionary['ID'] not in command:
if self.allowTermination: self.errors.unknownIDInEvaluateCommandValues(pipeline, ID, dictionary['ID'])
else: return False, ''
return True, dictionary['ID']
# Check the validity and completeness of the pipeline argument definitions.
def setPipelineArguments(self):
observedLongFormArguments = []
observedShortFormArguments = []
# Loop over all of the nodes and set the pipeline arguments.
for nodeID in self.nodeAttributes:
# The long form argument will be used as the key in this dictionary.
longFormArgument = self.nodeAttributes[nodeID].longFormArgument
# Set the other attributes only if the long form argument is present.
if longFormArgument != None:
shortFormArgument = self.nodeAttributes[nodeID].shortFormArgument
# Check that the long forma rgument hasn't already been seen in the pipeline configuration
# file. All command line arguments must be unique.
if longFormArgument in observedLongFormArguments: self.errors.nonUniquePipelineFormArgument(nodeID, longFormArgument, shortFormArgument, True)
if shortFormArgument in observedShortFormArguments: self.errors.nonUniquePipelineFormArgument(nodeID, longFormArgument, shortFormArgument, False)
observedLongFormArguments.append(longFormArgument)
observedShortFormArguments.append(shortFormArgument)
# Define the structure to hold the argument information for this pipeline.
attributes = argumentAttributes()
self.setAttribute(attributes, 'description', self.nodeAttributes[nodeID].description)
self.setAttribute(attributes, 'configNodeID', nodeID)
self.setAttribute(attributes, 'shortFormArgument', shortFormArgument)
self.setAttribute(attributes, 'isRequired', self.nodeAttributes[nodeID].isRequired)
# Store the information.
self.pipelineArguments[longFormArgument] = attributes
# Go through all of the tasks (including greedy tasks) and ensure that the given tasks are
# tasks in the pipeline. Arguments associated with the tasks are checked after the tool
# configuration files have been processed.
def getNodeTasks(self, pipeline):
observedArguments = {}
# Loop over all of the nodes.
for configNodeID in self.nodeAttributes:
self.commonNodes[configNodeID] = []
numberOfTasks = len(self.nodeAttributes[configNodeID].tasks) if self.nodeAttributes[configNodeID].tasks else 0
numberOfGreedyTasks = len(self.nodeAttributes[configNodeID].greedyTasks) if self.nodeAttributes[configNodeID].greedyTasks else 0
isLinked = True if (numberOfTasks + numberOfGreedyTasks) > 1 else False
# Parse the tasks.
if self.nodeAttributes[configNodeID].tasks:
for task in self.nodeAttributes[configNodeID].tasks:
# Check that the task is valid.
if task not in self.taskAttributes.keys():
if self.allowTermination: self.errors.invalidTaskInNode(pipeline, configNodeID, task, False)
else: return False
# Link the pipeline argument to the task/arguments listed with the node.
taskArgument = self.nodeAttributes[configNodeID].tasks[task]
longFormArgument = self.nodeAttributes[configNodeID].longFormArgument
if task not in self.taskArgument: self.taskArgument[task] = {}
# If there is no long form argument, store None in taskArguments and nothing in pipelineToTaskArgument.
if longFormArgument:
self.taskArgument[task][str(taskArgument)] = str(longFormArgument)
if str(longFormArgument) not in self.pipelineToTaskArgument: self.pipelineToTaskArgument[str(longFormArgument)] = []
self.pipelineToTaskArgument[str(longFormArgument)].append((str(task), str(taskArgument)))
else: self.taskArgument[task][str(taskArgument)] = None
# Store the task and argument.
self.commonNodes[configNodeID].append((str(task), str(taskArgument)))
# Store the task/argument pair in the observedOptions dictionary. If this task/argument pair has already been seen
if str(task) not in observedArguments: observedArguments[str(task)] = {}
if str(taskArgument) not in observedArguments[str(task)]: observedArguments[str(task)][str(taskArgument)] = []
observedArguments[str(task)][str(taskArgument)].append(str(configNodeID))
# If the taskArgument is linked to another argument, store this in the pipelineArguments structure.
if isLinked:
if task not in self.linkedTaskArguments: self.linkedTaskArguments[task] = []
if taskArgument not in self.linkedTaskArguments[task]: self.linkedTaskArguments[task].append(taskArgument)
# Then parse the greedy tasks.
if self.nodeAttributes[configNodeID].greedyTasks:
for task in self.nodeAttributes[configNodeID].greedyTasks:
# Check that the task is valid.
if task not in self.taskAttributes.keys():
if self.allowTermination: self.errors.invalidTaskInNode(pipeline, configNodeID, task, True)
else: return False
# Link the pipeline argument to the task/arguments listed with the node.
if task not in self.taskArgument: self.taskArgument[task] = {}
taskArgument = self.nodeAttributes[configNodeID].greedyTasks[task]
longFormArgument = self.nodeAttributes[configNodeID].longFormArgument
if longFormArgument:
self.taskArgument[task][str(taskArgument)] = self.nodeAttributes[configNodeID].longFormArgument
if str(longFormArgument) not in self.pipelineToTaskArgument: self.pipelineToTaskArgument[str(longFormArgument)] = []
self.pipelineToTaskArgument[str(longFormArgument)].append((str(task), str(taskArgument)))
else: self.taskArgument[task][str(taskArgument)] = None
# Store the task and argument.
self.commonNodes[configNodeID].append((str(task), str(taskArgument)))
if task not in self.greedyTasks: self.greedyTasks[task] = []
self.greedyTasks[task].append(str(taskArgument))
# Store the task/argument pair in the observedOptions dictionary. If this task/argument pair has already been seen
if str(task) not in observedArguments: observedArguments[str(task)] = {}
if str(taskArgument) not in observedArguments[str(task)]: observedArguments[str(task)][str(taskArgument)] = []
observedArguments[str(task)][str(taskArgument)].append(str(configNodeID))
# If the taskArgument is linked to another argument, store this in the pipelineArguments structure.
if isLinked:
if task not in self.linkedTaskArguments: self.linkedTaskArguments[task] = []
if taskArgument not in self.linkedTaskArguments[task]: self.linkedTaskArguments[task].append(taskArgument)
# Each node in the pipeline configuration file contains a list of task/argument pairs that take the
# same value and can thus be merged into a single node in the pipeline graph. If a task/argument pair
# appears in multiple nodes, the results can be unexpected, so this isn't permitted.
for task in observedArguments:
for argument in observedArguments[task]:
if len(observedArguments[task][argument]) > 1:
if self.allowTermination: self.errors.repeatedArgumentInNode(task, argument, observedArguments[task][argument])
else: return False
return True
# Get information about any associated extensions. Check that this only occurs for nodes with an
# filename stub and that all linked arguments that are not stubs themselves are given an extension.
def checkCommonNodes(self, tools):
isFilenameStub = False
# Loop over all of the nodes.
for configNodeID in self.commonNodes:
hasFilenameStub = False
stubArguments = []
for task, argument in self.commonNodes[configNodeID]:
tool = self.taskAttributes[task].tool
# Check if the argument is 'read json file'. If the argument has this value set, then it
# isn't referring to a specific argument, but means that the output of one tool in the node
# is a json file, and the current tool will use the json file to set parameters.
if argument == 'read json file':
# Check that one of the arguments in the node outputs a json file.
hasJsonOutput = False
for checkTask, checkArgument in self.commonNodes[configNodeID]:
if checkTask != task and checkArgument != argument:
checkTool = self.taskAttributes[checkTask].tool
isOutput = tools.getArgumentAttribute(checkTool, checkArgument, 'isOutput')
extensions = tools.getArgumentAttribute(checkTool, checkArgument, 'extensions')
for extension in extensions:
if isOutput and extension.endswith('json'): hasJsonOutput = True
# If no json files are output as part of this node, the task reading in the json will not get
# the required information, so fail.
if not hasJsonOutput: self.errors.noJsonOutput(configNodeID, task)
# First check if the argument is valid.
elif argument not in tools.getArguments(tool): self.errors.invalidToolArgument(configNodeID, task, tool, argument, tools.getArguments(tool))
else: isFilenameStub = tools.getArgumentAttribute(tool, argument, 'isFilenameStub')
# Check if any of the arguments are for filename stubs.
if isFilenameStub:
hasFilenameStub = True
stubArguments.append((task, argument))
# If the configuration file node contains an argument with a filename stub, loop over the
# task/argument pairs again and check that all non-filename stub arguments are provided with
# a valid extension.
if hasFilenameStub:
for task, argument in self.commonNodes[configNodeID]:
tool = self.taskAttributes[task].tool
isFilenameStub = tools.getArgumentAttribute(tool, argument, 'isFilenameStub')
# If the argument is also for a filename stub, no further action is necessary. If it is not,
# then check that the extensions required by the argument is specified. This is necessary
# as the argument will point to a single file, and the filename stub points to multiple files,
# so the particular file needs to be specified.
if not isFilenameStub:
# If the argument does not have an extension, terminate.
try: extension = self.nodeAttributes[configNodeID].extensions[task][argument]
except: self.errors.noExtensionInNode(configNodeID, task, argument, stubArguments)
# Store the extension.
self.linkedExtension[configNodeID] = self.nodeAttributes[configNodeID].extensions
# A task argument is only permitted in a single 'tasks' section in the configuration file. The reason for this
# is that if the argument appears in multiple configuration file nodes, the pipeline graph node containing the
# attributes of this argument will be merged with all other nodes appearing in the coniguration file node. If the
# argument then appears in another configuration file node, all pipeline graph nodes listed here will be merged
# with the already merged set. This is not desired behaviour. All nodes that should be merged will appear in the
# same configuration file node. If the argument appears again, it is because the node is to be connected to another
# task, but independent of the previous definitions. This case is handled by allowing edges to be defined. These
# edges will be included in the pipeline graph, but after all node merging has been completed.
def checkOriginatingEdges(self, pipeline):
# Loop over all of the configuration nodes.
for configNodeID in self.nodeAttributes:
# Take all 'originating edges' and store information on the task/argument pairs that should be connected
# with an edge.
if self.nodeAttributes[configNodeID].originatingEdges:
for task in self.nodeAttributes[configNodeID].originatingEdges:
argument = self.nodeAttributes[configNodeID].originatingEdges[task]
# Check that the task is valid. The validity of the argument will be checked at a later time.
if task not in self.taskAttributes.keys():
if self.allowTermination: self.errors.invalidTaskInOriginatingEdges(configNodeID, task)
# Store the values keyed on the originating task, then argument.
if task not in self.originatingEdges:
self.originatingEdges[str(task)] = {}
self.originatingConfigID[str(task)] = {}
if argument not in self.originatingEdges:
self.originatingEdges[task][str(argument)] = []
self.originatingConfigID[task][str(argument)] = str(configNodeID)
# Loop over all of the arguments in the nodes tasks and store information on the required edge.
for targetTask in self.nodeAttributes[configNodeID].tasks:
targetArgument = self.nodeAttributes[configNodeID].tasks[targetTask]
self.originatingEdges[task][argument].append((str(targetTask), str(targetArgument)))
return True
# Check for additional nodes defined in the configuration file.
def checkAdditionalNodes(self, pipeline):
# Loop over all of the configuration nodes.
for configNodeID in self.nodeAttributes:
# Take all 'additional nodes' and store information on the task/argument pairs that required a new node.
if self.nodeAttributes[configNodeID].additionalNodes:
# Record that the pipeline has additional nodes.
self.hasAdditionalNodes = True
# Loop over all the task/argument pairs.
for task in self.nodeAttributes[configNodeID].additionalNodes:
argument = self.nodeAttributes[configNodeID].additionalNodes[task]
# Check that the task is valid. The validity of the argument will be checked at a later time.
if task not in self.taskAttributes.keys():
if self.allowTermination: self.errors.invalidTaskInAdditionalNodes(configNodeID, task)
# Store the information.
if str(configNodeID) not in self.additionalNodes: self.additionalNodes[str(configNodeID)] = {}
if str(task) not in self.additionalNodes[str(configNodeID)]: self.additionalNodes[str(configNodeID)][str(task)] = []
self.additionalNodes[str(configNodeID)][str(task)].append(str(argument))
return True
# Check that each node can have some edges defined.
def checkEdgesCanBeConstructed(self, pipeline):
# Loop over all of the configuration nodes.
for configNodeID in self.nodeAttributes:
numberOfGreedyTasks = 0
numberOfOriginatingEdges = 0
numberOfAdditionalNodes = 0
numberOfTasks = len(self.nodeAttributes[configNodeID].tasks)
if self.nodeAttributes[configNodeID].originatingEdges: numberOfOriginatingEdges = len(self.nodeAttributes[configNodeID].originatingEdges)
if self.nodeAttributes[configNodeID].greedyTasks: numberOfGreedyTasks = len(self.nodeAttributes[configNodeID].greedyTasks)
if self.nodeAttributes[configNodeID].additionalNodes: numberOfAdditionalNodes = len(self.nodeAttributes[configNodeID].additionalNodes)
# If no edges can be created, terminate.
if (numberOfTasks + numberOfGreedyTasks + numberOfOriginatingEdges + numberOfAdditionalNodes) == 0:
if self.allowTermination: self.errors.nodeHasNoConnections(configNodeID)
return True
# Check that the defined category and help group are valid.
def checkCategory(self, pipeline, allowedCategories):
categories = self.attributes.categories
for category in categories:
if category not in allowedCategories:
if self.allowTermination: self.errors.invalidCategory(pipeline, category, allowedCategories, True)
else: return False
return True
# Set the workflow and the taskAttributes for a tool.
def definePipelineAttributesForTool(self, name):
attributes = taskAttributes()
attributes.tool = name
self.taskAttributes[name] = attributes
self.workflow.append(name)
# Set a value in the toolAttributes.
def setAttribute(self, attributes, attribute, value):
try: test = getattr(attributes, attribute)
# If the attribute can't be set, determine the source of the problem and provide an
# error message.
except:
# If the tool is not available.TODO
self.errors.invalidAttributeInSetAttribute(attribute, True)
self.errors.terminate()
# Set the attribute.
setattr(attributes, attribute, value)
return attributes
# Get a task attribute.
def getTaskAttribute(self, task, attribute):
try: value = getattr(self.taskAttributes[task], attribute)
except:
#TODO ERRORS
# If the task doesn't exist.
if task not in self.taskAttributes: print('config.pipeline.getTaskAttribute error', task, attribute); self.errors.terminate()
# If the attribute is not available.
if attribute not in self.taskAttributes[task]: print('config.pipeline.getTaskAttribute error attribute', task, attribute); self.errors.terminate()
return value
# Get a node attribute.
def getNodeAttribute(self, nodeID, attribute):
try: value = getattr(self.nodeAttributes[nodeID], attribute)
except:
# TODO ERRORS
if nodeID not in self.nodeAttributes: print('config.pipeline.getNodeAttribute error', nodeID, attribute); self.errors.terminate()
if attribute not in self.nodeAttributes[nodeID]:
print('config.pipeline.getNodeAttribute error attribute', nodeID, attribute); self.errors.terminate()
return value
# Get the long form argument for a command given on the command line.
def getLongFormArgument(self, graph, argument, allowTermination = True):
# Check if the argument is a pipeline argument (as defined in the configuration file).
for pipelineArgument in self.pipelineArguments:
if pipelineArgument == argument: return pipelineArgument, self.pipelineArguments[pipelineArgument].shortFormArgument
elif self.pipelineArguments[pipelineArgument].shortFormArgument == argument: return pipelineArgument, argument
if allowTermination: self.errors.unknownPipelineArgument(argument)
return None, None
# Check if an argument is a pipeline argument. If so, return the nodeID.
def isArgumentAPipelineArgument(self, argument):
try: return self.pipelineArguments[argument].ID
except: return None
# Check if a given a task and argument correspond to a pipeline argument. If so, return the
# long and short forms.
def getPipelineArgument(self, task, argument):
try: longFormArgument = self.taskArgument[task][argument]
except: return None, None
if longFormArgument == None: return None, None
return longFormArgument, self.pipelineArguments[longFormArgument].shortFormArgument
# Get the extension associated with a task/argument pair from the nodes section.
def getExtension(self, task, argument, extensions):
try: return extensions[task][argument]
except: self.errors.invalidExtensionRequest(task, argument, extensions)
# Clear a pipeline from storage.
def clearPipeline(self):
self.__init__()
# Get a pipeline attribute.
def getPipelineAttribute(self, attribute):
try: value = getattr(self.attributes, attribute)
except:
# TODO ERROR
print('pipeline.getPipelineArgument')
self.errors.terminate()
return value