Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set output datasets to VALID in DBS3 before announcing a standard workflow #10394

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 49 additions & 3 deletions src/python/WMCore/MicroService/MSOutput/MSOutput.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from WMCore.MicroService.MSOutput.MSOutputTemplate import MSOutputTemplate
from WMCore.WMException import WMException
from WMCore.Services.AlertManager.AlertManagerAPI import AlertManagerAPI
from WMCore.Services.DBS.DBS3Writer import DBS3Writer


class MSOutputException(WMException):
Expand Down Expand Up @@ -108,6 +109,9 @@ def __init__(self, msConfig, mode, reqCache, logger=None):
for endpoint, quota in viewitems(self.msConfig['tapePledges']):
self.tapeStatus[endpoint] = dict(quota=quota, usage=0, remaining=0)

self.dbs3Writer = DBS3Writer(url=self.msConfig["dbsReadUrl"],
writeUrl=self.msConfig["dbsWriteUrl"])

msOutIndex = IndexModel('RequestName', unique=True)
msOutDBConfig = {
'database': 'msOutDB',
Expand Down Expand Up @@ -248,6 +252,44 @@ def _executeConsumer(self, summary):
self.logger.exception(msg)
self.updateReportDict(summary, "error", msg)

def setDBSStatus(self, workflow):
"""
The function to set the DBS status of outputs as VALID
:param workflow: a MSOutputTemplate object workflow
:return: the MSOutputTemplate object itself (with the necessary updates in place)
"""
if not isinstance(workflow, MSOutputTemplate):
msg = "Unsupported type object '{}' for workflows! ".format(type(workflow))
msg += "It needs to be of type: MSOutputTemplate"
raise UnsupportedError(msg)

# if anything fail along the way, set it back to False
dbsUpdateStatus = True
for dMap in workflow['OutputMap']:

res = self.dbs3Writer.setDBSStatus(dataset=dMap["Dataset"],
status=self.msConfig['dbsStatus']["valid"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here I would simply replace self.msConfig['dbsStatus']["valid"] by VALID.


if res:
dMap["DBSStatus"] = self.msConfig['dbsStatus']["valid"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understood it right, you are defining a new property for the datasets dictionary to be stored in MongoDB, right?
In that case, I would suggest to drop it from the schema, the DBS status will be one and only one, for every single dataset. So, I'd keep only the top level key DBSUpdateStatus, which would contain the overall DBS status of the output datasets. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, do you know what happens if you set a VALID dataset to VALID once again? I believe DBS won't complain, right?

In addition to that, should we take into consideration datasets that have not been produced in the workflow, thus not being available in the DBS database?

else:
# There is at least one dataset whose dbs status update is unsuccessful
dbsUpdateStatus = False

# Finally, update the MSOutput template document with either partial or
# complete dbs statuses
self.docKeyUpdate(workflow, OutputMap=workflow['OutputMap'])
workflow.updateTime()
if dbsUpdateStatus:
self.logger.info("All the DBS status updates succeeded for: %s. Marking it as 'done'",
workflow['RequestName'])
self.docKeyUpdate(workflow, DBSUpdateStatus=True)
else:
self.logger.info("DBS status updates partially successful for: %s. Keeping it 'pending'",
workflow['RequestName'])

return workflow

def makeSubscriptions(self, workflow):
"""
The common function to make the final subscriptions
Expand Down Expand Up @@ -448,28 +490,32 @@ def msOutputConsumer(self):
# Done: To build it through a pipe
# Done: To write back the updated document to MonogoDB
msPipelineRelVal = Pipeline(name="MSOutputConsumer PipelineRelVal",
funcLine=[Functor(self.makeSubscriptions),
funcLine=[Functor(self.setDBSStatus),
Functor(self.makeSubscriptions),
Functor(self.makeTapeSubscriptions),
Functor(self.docUploader,
update=True,
keys=['LastUpdate',
'TransferStatus',
'DBSUpdateStatus',
'OutputMap']),
Functor(self.docDump, pipeLine='PipelineRelVal'),
Functor(self.docCleaner)])
msPipelineNonRelVal = Pipeline(name="MSOutputConsumer PipelineNonRelVal",
funcLine=[Functor(self.makeSubscriptions),
funcLine=[Functor(self.setDBSStatus),
Functor(self.makeSubscriptions),
Functor(self.makeTapeSubscriptions),
Functor(self.docUploader,
update=True,
keys=['LastUpdate',
'TransferStatus',
'DBSUpdateStatus',
'OutputMap']),
Functor(self.docDump, pipeLine='PipelineNonRelVal'),
Functor(self.docCleaner)])

wfCounterTotal = 0
mQueryDict = {'TransferStatus': 'pending'}
mQueryDict = { "$or": [ { "TransferStatus": "pending" }, { "DBSUpdateStatus": False } ] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TransferStatus is meant to consider the overall status of a given workflow in MSOutput. In other words, TransferStatus will only be marked as done once ALL the necessary actions have already been taken and the workflow is ready to move to the next status (and move away of the MSOutput watch).

So I'd suggest to keep the original query, and make sure that its status remains pending as long as there are still actions to be taken on it.

pipeCollections = [(msPipelineRelVal, self.msOutRelValColl),
(msPipelineNonRelVal, self.msOutNonRelValColl)]
for pipeColl in pipeCollections:
Expand Down
10 changes: 7 additions & 3 deletions src/python/WMCore/MicroService/MSOutput/MSOutputTemplate.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ def docSchema(self):
'Copies': 1,
...}],
"TransferStatus": "pending"|"done,
"DBSUpdateStatus": False|True,
"RequestType": ""
}
:return: a list of tuples
Expand All @@ -116,7 +117,8 @@ def docSchema(self):
('IsRelVal', False, bool),
('OutputDatasets', [], list),
('OutputMap', [], list),
('TransferStatus', "pending", (bytes, str))]
('TransferStatus', "pending", (bytes, str)),
('DBSUpdateStatus', False, bool)]
return docTemplate

def outputMapSchema(self):
Expand All @@ -135,7 +137,8 @@ def outputMapSchema(self):
'DiskDestination': "",
'TapeDestination': "",
'DiskRuleID': "",
'TapeRuleID': ""}
'TapeRuleID': "",
'DBSStatus': ""}
:return: a list of tuples
"""
outMapTemplate = [
Expand All @@ -146,7 +149,8 @@ def outputMapSchema(self):
('DiskDestination', "", (bytes, str)),
('TapeDestination', "", (bytes, str)),
('DiskRuleID', "", (bytes, str)),
('TapeRuleID', "", (bytes, str))]
('TapeRuleID', "", (bytes, str)),
('DBSStatus', "", (bytes, str))]
return outMapTemplate

def _checkAttr(self, myDoc, update=False, throw=False, **kwargs):
Expand Down
22 changes: 22 additions & 0 deletions src/python/WMCore/Services/DBS/DBS3Reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -971,3 +971,25 @@ def getParentDatasetTrio(self, childDataset):
frozenKey = frozenset(runLumiPair)
parentFrozenData[frozenKey] = fileId
return parentFrozenData

def getDBSStatus(self, dataset):
"""
The function to get the DBS status of outputs
:param dataset: dataset name
:return: DBS status of the given dataset
"""

response = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
response = None
dbsStatus = None

try:
response = self.dbs.listDatasets(dataset=dataset, dataset_access_type='*', detail=True)
except Exception as ex:
msg = "Exception while getting the status of following dataset on DBS: {} ".format(dataset)
msg += "Error: {}".format(str(ex))
self.logger.exception(msg)

if response:
dbsStatus = response[0]['dataset_access_type']
self.logger.info("%s is %s", dataset, dbsStatus)
return dbsStatus
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here we could replace this if/else block by something like:

if response:
    dbsStatus = response[0]['dataset_access_type']
    self.logger.info("%s is %s", dataset, dbsStatus)
return dbsStatus

else:
return None
53 changes: 53 additions & 0 deletions src/python/WMCore/Services/DBS/DBS3Writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/usr/bin/env python
"""
_DBSReader_
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong copy/pasting? :-D You might just remove it as well.


Read/Write DBS Interface

"""
from __future__ import print_function, division
from builtins import str
import logging

from dbs.apis.dbsClient import DbsApi
from dbs.exceptions.dbsClientException import dbsClientException

from WMCore.Services.DBS.DBSErrors import DBSWriterError, formatEx3
from WMCore.Services.DBS.DBS3Reader import DBS3Reader


class DBS3Writer(DBS3Reader):
"""
_DBSReader_

General API for writing data to DBS
"""

def __init__(self, url, writeUrl, logger=None, **contact):

# instantiate dbs api object
try:
super(DBS3Writer, self).__init__(url=url)
self.dbs = DbsApi(writeUrl, **contact)
self.logger = logger or logging.getLogger(self.__class__.__name__)
except dbsClientException as ex:
msg = "Error in DBSWriter with DbsApi\n"
msg += "%s\n" % formatEx3(ex)
raise DBSWriterError(msg)

def setDBSStatus(self, dataset, status):
"""
The function to set the DBS status of an output dataset
:param dataset: Dataset name
:return: True if operation is successful, False otherwise
"""
try:
# This API call returns None if successful, throws exception o/w
self.dbs.updateDatasetType(dataset=dataset,
dataset_access_type=status)
return True
except Exception as ex:
msg = "Exception while setting the status of following dataset on DBS: {} ".format(dataset)
msg += "Error: {}".format(str(ex))
self.logger.exception(msg)
return False
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@
{'block_name': '/Cosmics/Commissioning2015-PromptReco-v1/RECO#004ac3ba-d09e-11e4-afad-001e67ac06a0'}],
['listBlockParents',
{'block_name': '/Cosmics/Commissioning2015-v1/RAW#942d76fe-cf0e-11e4-afad-001e67ac06a0'}],
['listDatasets',
{'dataset_access_type': '*',
'dataset': '/ggXToJPsiJPsi_JPsiToMuMu_M6p2_JPCZeroMinusPlus_TuneCP5_13TeV-pythia8-JHUGen/RunIIFall17pLHE-93X_mc2017_realistic_v3-v2/LHE',
'detail': True}],
['listDatasets',
{'dataset_access_type': '*',
'dataset': '/DYToEE_M-50_NNPDF31_TuneCP5_14TeV-powheg-pythia8/Run3Summer19DRPremix-BACKFILL_2024Scenario_106X_mcRun3_2024_realistic_v4-v6/GEN-SIM-RAW',
'detail': True}],


# Exception throwing calls

Expand Down
Loading