From f2ee82fffb30a4b36ebb30a92758e0cfb435d8fe Mon Sep 17 00:00:00 2001 From: haozturk Date: Mon, 27 Feb 2023 10:06:39 +0100 Subject: [PATCH 1/3] Change tpe in makeACDC --- Unified/recoveror.py | 5 +++++ makeACDC.py | 13 ++++++------- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/Unified/recoveror.py b/Unified/recoveror.py index 9714ec818..711000f62 100755 --- a/Unified/recoveror.py +++ b/Unified/recoveror.py @@ -125,6 +125,11 @@ def singleRecovery(url, task , initial, actions, do=False): else: payload['TrustPUSitelists'] = False + if action.startswith('tpe'): + tpe = float(action.split("-")[1]) + if tpe: + payload['TimePerEvent'] = tpe + acdc_round = 0 initial_string = payload['RequestString'] if initial_string.startswith('ACDC'): diff --git a/makeACDC.py b/makeACDC.py index 00d2d2391..d9171fa22 100755 --- a/makeACDC.py +++ b/makeACDC.py @@ -7,7 +7,6 @@ import logging import os, sys from optparse import OptionParser -#from reqmgr import ReqMgrClient logging.basicConfig(level=logging.WARNING) import reqMgrClient from utils import workflowInfo @@ -40,7 +39,7 @@ def makeACDC(**args): if acdc: return acdc else: - print("Issue while creating the acdc for",task) + print(("Issue while creating the acdc for",task)) return None def main(): @@ -116,7 +115,7 @@ def main(): sys.exit(1) - for wfname,tasks in wf_and_task.items(): + for wfname,tasks in list(wf_and_task.items()): wfi = workflowInfo(url, wfname) if tasks == None: where,how_much,how_much_where = wfi.getRecoveryInfo() @@ -125,8 +124,8 @@ def main(): tasks = sorted(tasks) created = {} - print("Workflow:",wfname) - print("Tasks:",tasks) + print(("Workflow:",wfname)) + print(("Tasks:",tasks)) # FIXME: eventually, we want to be able to target each task # with different options @@ -141,7 +140,7 @@ def main(): mcore = options.mcore, xrootd = options.xrootd) if not r: - print("Error in creating ACDC for",task,"on",wfname) + print(("Error in creating ACDC for",task,"on",wfname)) break created[task] = r @@ -151,7 +150,7 @@ def main(): print("Created:") for task in created: - print(created[task],"for",task) + print((created[task],"for",task)) with open(outACDClist, 'a') as f: f.write(str(created[task])+"\n") if __name__ == '__main__': From 8cd67953b82f3c9dc5f77a33cbb75ed56512d7db Mon Sep 17 00:00:00 2001 From: haozturk Date: Mon, 27 Feb 2023 10:13:47 +0100 Subject: [PATCH 2/3] Add tpe in makeACDC --- makeACDC.py | 117 +++++++++++++++++++++++----------------------------- 1 file changed, 51 insertions(+), 66 deletions(-) diff --git a/makeACDC.py b/makeACDC.py index d9171fa22..7ccfb9fbe 100755 --- a/makeACDC.py +++ b/makeACDC.py @@ -5,66 +5,66 @@ It will copy all the original workflow parameters unless specified """ import logging -import os, sys +import sys from optparse import OptionParser + logging.basicConfig(level=logging.WARNING) import reqMgrClient from utils import workflowInfo -from collections import defaultdict +from collections import defaultdict prod_url = 'cmsweb.cern.ch' testbed_url = 'cmsweb-testbed.cern.ch' from Unified.recoveror import singleRecovery from utils import workflowInfo + + def makeACDC(**args): url = args.get('url') wfi = args.get('wfi') task = args.get('task') initial = wfi actions = [] - memory = args.get('memory',None) + memory = args.get('memory', None) if memory: - #increment = initial.request['Memory'] - memory - #actions.append( 'mem-%d'% increment ) - actions.append( 'mem-%s'% memory ) - mcore = args.get('mcore',None) + # increment = initial.request['Memory'] - memory + # actions.append( 'mem-%d'% increment ) + actions.append('mem-%s' % memory) + mcore = args.get('mcore', None) if mcore: - actions.append( 'core-%s'% mcore) - xrootd = args.get('xrootd',None) - if xrootd: - actions.append( 'xrootd-%s'% xrootd) - + actions.append('core-%s' % mcore) + tpe = args.get('tpe', None) + if tpe: + actions.append('tpe-%s' % tpe) acdc = singleRecovery(url, task, initial.request, actions, do=True) if acdc: return acdc else: - print(("Issue while creating the acdc for",task)) + print("Issue while creating the acdc for", task) return None -def main(): - #Create option parser +def main(): + # Create option parser usage = "usage: %prog (-w workflow|-f filelist) (-t TASK|--all) [--tesbed]" parser = OptionParser(usage=usage) - parser.add_option("-f","--file", dest="file", default=None, - help="Text file with a list of workflows") - parser.add_option("-w","--workflow", default=None, + parser.add_option("-f", "--file", dest="file", default=None, + help="Text file with a list of workflows") + parser.add_option("-w", "--workflow", default=None, help="Coma separated list of wf to handle") - parser.add_option("-t","--task", default=None, + parser.add_option("-t", "--task", default=None, help="Coma separated task to be recovered") - parser.add_option("-p","--path", default=None, + parser.add_option("-p", "--path", default=None, help="Coma separated list of paths to recover") - parser.add_option("-a","--all", - help="Make acdc for all tasks to be recovered",default=False, action='store_true') - parser.add_option("-m","--memory", dest="memory", default=None, type=int, - help="Memory to override the original request memory") - parser.add_option("-c","--mcore", dest="mcore", default=None, + parser.add_option("-a", "--all", + help="Make acdc for all tasks to be recovered", default=False, action='store_true') + parser.add_option("-m", "--memory", dest="memory", default=None, type=int, + help="Memory to override the original request memory") + parser.add_option("-c", "--mcore", dest="mcore", default=None, help="Multicore to override the original request multicore") - parser.add_option("-x","--xrootd", dest="xrootd", default=None, type=int, - help="Enable xrootd") - parser.add_option("-o","--out", dest="out", default='acdc_wf_list.txt', type=str, - help="Output file, to be filled with workflows for which ACDC was submitted.") + parser.add_option("--tpe", dest="tpe", default=None, + help="Time per event") parser.add_option("--testbed", default=False, action="store_true") (options, args) = parser.parse_args() @@ -72,11 +72,7 @@ def main(): global url url = testbed_url if options.testbed else prod_url - outACDClist = options.out - if os.path.isfile(outACDClist): - sys.exit("Make a new name for output file.") - - if options.all : options.task = 'all' + if options.all: options.task = 'all' if not options.task: parser.error("Provide the -t Task Name or --all") @@ -85,7 +81,7 @@ def main(): if not ((options.workflow) or (options.path) or (options.file)): parser.error("Provide the -w Workflow Name or the -p path or the -f workflow filelist") sys.exit(1) - + wfs = None wf_and_task = defaultdict(set) if options.file: @@ -96,63 +92,52 @@ def main(): ## self contained paths = options.path.split(',') for p in paths: - _,wf,t = p.split('/',2) - wf_and_task[wf].add('/%s/%s'%(wf,t)) + _, wf, t = p.split('/', 2) + wf_and_task[wf].add('/%s/%s' % (wf, t)) else: parser.error("Either provide a -f filelist or a -w workflow or -p path") sys.exit(1) if not wf_and_task: if options.task == 'all': - for wfname in wfs: + for wfname in wfs: wf_and_task[wfname] = None else: - for wfname in wfs: - wf_and_task[wfname].update( [('/%s/%s'%(wfname,task)).replace('//','/') for task in options.task.split(',')] ) + for wfname in wfs: + wf_and_task[wfname].update( + [('/%s/%s' % (wfname, task)).replace('//', '/') for task in options.task.split(',')]) if not wf_and_task: parser.error("Provide the -w Workflow Name and the -t Task Name or --all") - sys.exit(1) + sys.exit(1) - - for wfname,tasks in list(wf_and_task.items()): + for wfname, tasks in list(wf_and_task.items()): wfi = workflowInfo(url, wfname) if tasks == None: - where,how_much,how_much_where = wfi.getRecoveryInfo() + where, how_much, how_much_where = wfi.getRecoveryInfo() tasks = sorted(how_much.keys()) else: tasks = sorted(tasks) created = {} - print(("Workflow:",wfname)) - print(("Tasks:",tasks)) - - # FIXME: eventually, we want to be able to target each task - # with different options - if len(tasks) != 1: - print("WARNING: Multiple tasks were found in this workflow \ - be sure that you want to submit identical ACDCs for all tasks.") - - # create an ACDC workflow + print("Workflow:", wfname) + print("Tasks:", tasks) for task in tasks: r = makeACDC(url=url, wfi=wfi, task=task, - memory = options.memory, - mcore = options.mcore, - xrootd = options.xrootd) - if not r: - print(("Error in creating ACDC for",task,"on",wfname)) + memory=options.memory, + mcore=options.mcore, + tpe=options.tpe) + if not r: + print("Error in creating ACDC for", task, "on", wfname) break created[task] = r - - if len(created)!=len(tasks): + if len(created) != len(tasks): print("Error in creating all required ACDCs") sys.exit(1) - print("Created:") for task in created: - print((created[task],"for",task)) - with open(outACDClist, 'a') as f: f.write(str(created[task])+"\n") + print(created[task], "for", task) -if __name__ == '__main__': - main() +if __name__ == '__main__': + main() \ No newline at end of file From 6a0344bbc55a709f24c990733a14315fda5a0605 Mon Sep 17 00:00:00 2001 From: haozturk Date: Mon, 27 Feb 2023 10:17:03 +0100 Subject: [PATCH 3/3] Add tpe in makeACDC --- makeACDC.py | 29 +++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/makeACDC.py b/makeACDC.py index 7ccfb9fbe..5526d0434 100755 --- a/makeACDC.py +++ b/makeACDC.py @@ -5,9 +5,10 @@ It will copy all the original workflow parameters unless specified """ import logging -import sys +import os, sys from optparse import OptionParser +# from reqmgr import ReqMgrClient logging.basicConfig(level=logging.WARNING) import reqMgrClient from utils import workflowInfo @@ -34,9 +35,13 @@ def makeACDC(**args): mcore = args.get('mcore', None) if mcore: actions.append('core-%s' % mcore) + xrootd = args.get('xrootd', None) + if xrootd: + actions.append('xrootd-%s' % xrootd) tpe = args.get('tpe', None) if tpe: actions.append('tpe-%s' % tpe) + acdc = singleRecovery(url, task, initial.request, actions, do=True) if acdc: return acdc @@ -63,6 +68,10 @@ def main(): help="Memory to override the original request memory") parser.add_option("-c", "--mcore", dest="mcore", default=None, help="Multicore to override the original request multicore") + parser.add_option("-x", "--xrootd", dest="xrootd", default=None, type=int, + help="Enable xrootd") + parser.add_option("-o", "--out", dest="out", default='acdc_wf_list.txt', type=str, + help="Output file, to be filled with workflows for which ACDC was submitted.") parser.add_option("--tpe", dest="tpe", default=None, help="Time per event") parser.add_option("--testbed", default=False, action="store_true") @@ -72,6 +81,10 @@ def main(): global url url = testbed_url if options.testbed else prod_url + outACDClist = options.out + if os.path.isfile(outACDClist): + sys.exit("Make a new name for output file.") + if options.all: options.task = 'all' if not options.task: @@ -111,7 +124,7 @@ def main(): parser.error("Provide the -w Workflow Name and the -t Task Name or --all") sys.exit(1) - for wfname, tasks in list(wf_and_task.items()): + for wfname, tasks in wf_and_task.items(): wfi = workflowInfo(url, wfname) if tasks == None: where, how_much, how_much_where = wfi.getRecoveryInfo() @@ -122,21 +135,33 @@ def main(): created = {} print("Workflow:", wfname) print("Tasks:", tasks) + + # FIXME: eventually, we want to be able to target each task + # with different options + if len(tasks) != 1: + print("WARNING: Multiple tasks were found in this workflow \ + be sure that you want to submit identical ACDCs for all tasks.") + + # create an ACDC workflow for task in tasks: r = makeACDC(url=url, wfi=wfi, task=task, memory=options.memory, mcore=options.mcore, + xrootd=options.xrootd, tpe=options.tpe) if not r: print("Error in creating ACDC for", task, "on", wfname) break created[task] = r + if len(created) != len(tasks): print("Error in creating all required ACDCs") sys.exit(1) + print("Created:") for task in created: print(created[task], "for", task) + with open(outACDClist, 'a') as f: f.write(str(created[task]) + "\n") if __name__ == '__main__':