diff --git a/AAAOps/XfedKibana/XRDFED-kibana-probe.py b/AAAOps/XfedKibana/XRDFED-kibana-probe.py
index fffd610..1206a09 100755
--- a/AAAOps/XfedKibana/XRDFED-kibana-probe.py
+++ b/AAAOps/XfedKibana/XRDFED-kibana-probe.py
@@ -23,7 +23,7 @@
import threading
import tempfile
-html_dir = '/var/www/html/aaa-probe/' # will create per-service xml files here
+html_dir = '/root/ogarzonm/' # will create per-service xml files here
#CERN_eosfile_rucio='/atlas/rucio/user/ivukotic:user.ivukotic.xrootd.cern-prod-1M'
@@ -132,9 +132,11 @@ def xrd_info(redirector):
[redirector,
"query","1", # 1:kXR_QStats
"a"]) # a_ll stats
- if not errtext:
+ if not out:
+ out = "1"
+ if not errtext:
try:
- dom = xml.dom.minidom.parseString(out)
+ dom = xml.dom.minidom.parseString(out)
root_node = dom.documentElement
if root_node.tagName == 'statistics':
v_attr = root_node.getAttributeNode('ver')
diff --git a/AAAOps/XfedKibana/XRDFED-kibana-probe_to_JSON-Copy1.py b/AAAOps/XfedKibana/XRDFED-kibana-probe_to_JSON-Copy1.py
new file mode 100644
index 0000000..ff061dc
--- /dev/null
+++ b/AAAOps/XfedKibana/XRDFED-kibana-probe_to_JSON-Copy1.py
@@ -0,0 +1,338 @@
+#!/usr/bin/python
+# functional probe and SLS extractor for the "federation" xroot services
+# highlights:
+# - stateless (i.e. run from cron whenever needed)
+# - will try to prevent parallel runs via lockfile
+# - multithreaded, one thread per service to be tested
+# - overall runtime cap at 10min
+# - could extract some statistics from xroot directly, but these are ever-increasing counters
+# Problems:
+# - need to update the code whenever a service is addded/deleted/changed
+# - uses "random" files on various Xroot services all over the world, these are (for now) the same as used by the experiments but these might change..
+
+import xml.dom.minidom
+import subprocess
+import os
+import sys
+import signal
+import re
+import time
+import Lemon.XMLAPI
+import socket
+import atexit
+import threading
+import tempfile
+import json
+
+html_dir = '/var/www/html/aaa-probe/' # will create per-service json files here
+
+LOCKFILE='/var/lock/subsys/xrdfed-kibana-probe'
+
+class Alarm(Exception):
+ pass
+
+def alarm_handler(signum, frame):
+ print "ERROR: caught overall timeout after "+str(timeout_sec)+"s\n"
+ clear_lock()
+ sys.exit(2)
+ raise Alarm
+
+def clear_lock():
+ try:
+ os.unlink(LOCKFILE)
+ except Exception,e:
+ print "could not remove lockfile:"+str(e)
+
+def env_setup():
+ os.environ['X509_USER_CERT']='/root/.globus/slsprobe-cert.pem'
+ os.environ['X509_USER_KEY']='/root/.globus/slsprobe-key.pem'
+ os.environ['X509_USER_PROXY']='/root/.globus/slsprobe.proxy'
+ os.environ['KRB5CCNAME']='FILE:/dev/null'
+ os.environ['PATH']=os.environ['PATH']+":/opt/globus/bin/"
+
+def get_proxy():
+ dev_null = open('/dev/null', 'rw')
+ (proxyfd,proxy)=tempfile.mkstemp(prefix='x509_xrdfed_',suffix='.pem')
+ os.close(proxyfd)
+ os.environ['X509_USER_PROXY']=proxy
+ ret = subprocess.call(['grid-proxy-init','-pwstdin'],stdin=dev_null,)
+ if ret > 0:
+ raise Exception("Cannot get X509 proxy")
+ dev_null.close()
+
+def cleanup_proxy():
+ try:
+ os.unlink(os.environ['X509_USER_PROXY'])
+ except Exception,e:
+ print "could not remove proxy file:"+str(e)
+
+def try_lock():
+ ret = subprocess.call(['lockfile','-5','-r2',LOCKFILE])
+ if ret > 0:
+ print "could not create lockfile"
+ return False
+ return True
+
+def prepare_dictionary(servicename):
+ dic={'serviceName':servicename}
+ return dic
+def dnsalias_to_nodes(redirector):
+ (host,port) = redirector.split(':')
+ all_hosts = []
+ data=socket.getaddrinfo(host,port,0, 0, socket.SOL_TCP )
+ for addr in data:
+ (family, socktype, proto, canonname, sockaddr) = addr
+ (hostname, aliaslist, ipaddrlist) = socket.gethostbyaddr(sockaddr[0])
+ if not hostname in all_hosts:
+ all_hosts.append(hostname)
+ return all_hosts
+
+def xrdcp_test(redirector,file):
+ (errtext,out,err,elapsed) = run_xrd_commands("xrdcp",
+ ["-d","2",
+ "-f",
+ "-DIReadCacheSize","0",
+ "-DIRedirCntTimeout","180",
+ "root://"+redirector+'/'+file,
+ '/dev/null'])
+ return (errtext,err,elapsed)
+
+def xrd_info(redirector):
+ version = "(unknown)"
+ (errtext,out,err,elapsed) = run_xrd_commands("xrd",
+ [redirector,
+ "query","1", # 1:kXR_QStats
+ "a"]) # a_ll stats
+ if not errtext:
+ try:
+ dom = xml.dom.minidom.parseString(out)
+ root_node = dom.documentElement
+ if root_node.tagName == 'statistics':
+ v_attr = root_node.getAttributeNode('ver')
+ version = v_attr.nodeValue
+ except Exception,e:
+ errtext = "ERROR: cannot parse answer:"+str(e)
+ return (errtext,version,out)
+
+def run_xrd_commands(cmd,args):
+ dev_null = open('/dev/null', 'r')
+ errtxt = ''
+ elapsed = -1.0
+ xrd_args = [ 'perl','-e',"alarm 180; exec @ARGV", cmd, # one-line wrapper that *actually* kills the command
+ "-DIConnectTimeout","30",
+ "-DITransactionTimeout","60",
+ "-DIRequestTimeout","60" ] + args
+ try:
+ start = time.time()
+ proc = subprocess.Popen(xrd_args,
+ stdin=dev_null,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ (out, err) = proc.communicate()
+ ret = proc.returncode
+ elapsed = (time.time() - start)
+ err_redir_index = err.rfind('Received redirection to')
+ err_index3010 = err.rfind('(error code: 3010') # (permission denied) may be sort-of-OK - we are talking to final storage already - UK
+ err_index3005 = err.rfind('(error code: 3005') # (no user mapping) - INFN
+ if err_redir_index >= 0 and (err_index3010 >= 0 or err_index3005 >= 0):
+ errtxt = ''
+ else:
+ if(ret > 0):
+ errtxt = "client-side error - exit code "+str(ret)+"\n"
+ err_index = err.rfind('Last server error')
+ if err_index >= 0:
+ err_end_index=err.find("\n",err_index)
+ errtxt = errtxt + err[err_index:err_end_index]
+ except Exception,e:
+ errtext = errtxt + "Exception: "+str(e)
+ dev_null.close()
+
+ return (errtxt,out,err,elapsed)
+
+def test_redirector(servicename, redirector, file_below=None, file_above=None, extra_notes=""):
+ servicename=servicename.upper()
+ availability = 'available'
+ availinfo = ''
+
+ # prepare the dictionary.
+ dicci = prepare_dictionary(servicename)
+ notes_text = "Redirector:"+redirector
+
+ # run the functional tests - first some simple check to get the version, if OK look for files
+ (err_info,version,dump_info) = xrd_info(redirector)
+ if(err_info):
+ print('error_info_true')
+ availability = 'unavailable'
+ availinfo=availinfo+" Error getting info from redirector "+err_info
+ dicci["xrdcp_below_time"] = 0
+ #dicci["status"] = "unavailable"
+
+ else:
+ availinfo="Version check: "+version
+ if (file_below):
+ notes_text = notes_text + "File 'below': " + file_below
+ (err_below,dump_below,elapsed_below) = xrdcp_test(redirector, file_below)
+ if err_below:
+ availability = 'degraded'
+ availinfo=availinfo+" Error below redirector "+err_below
+ dump_sane = re.sub('---*','__',dump_below)
+ c = "Detailed output for file BELOW "+redirector+":"+file_below+" "+err_below+" "+dump_sane
+ #dicci['comment'] = c
+ else:
+ availinfo=availinfo+" File below: OK "
+ dicci['xrdcp_below_time'] = str(elapsed_below)
+ else:
+ availinfo=availinfo+" File below: not tested."
+ if(file_above):
+ notes_text = notes_text + "File 'elsewhere': " + file_above
+ (err_above,dump_above,elapsed_above) = xrdcp_test(redirector, file_above)
+ if err_above :
+ #We've changed availability from number to string so this below won't work; Marian commented out on 2015-11-06
+ #availability = availability * 0.8 # less important if some remote site is failing..
+ availinfo=availinfo+" Error above redirector "+err_above
+ # sanitize the raw output in order to not trigger XML errors.. in a comment.
+ dump_sane = re.sub('---*','__',dump_above)
+ #c = "Detailed output for file ABOVE "+redirector+":"+file_above+"\n"+
+ #err_above+"\n"
+ #+dump_sane
+ #dicci = {**dicci, **{'comment': c}}
+ #serviceUpdate.appendChild(c)
+ #need_xml_link=1
+ else:
+ availinfo=availinfo+" File above: OK "
+ #nValue = doc.createElement("numericvalue")
+ #nValue.setAttribute("name", "xrdcp_above_time")
+ #nValue.setAttribute("desc", "Time to copy a file elsewhere in the federation")
+ #nValue.appendChild(doc.createTextNode(str(elapsed_above)))
+ dicci['xrdcp_above_time'] = str(elapsed_above)
+ #data.appendChild(nValue)
+ else:
+ availinfo=availinfo+" File above: not tested."
+
+ # save functional test info to XML
+ #if need_xml_link:
+ # myhostname = socket.gethostname()
+ # notes_text = notes_text + "Details for failed test: http://" + myhostname + "/aaa-probe/" + servicename + ".xml
\n" + "Details for recently failed test : http://vocms039.cern.ch/aaa-probe/err/
\n"
+ availinfo = availinfo + " " + notes_text
+ dicci['status']= str(availability)
+ #dicci['availabilityinfo']=availinfo
+ return dicci
+
+
+def main():
+ debug = 0
+ atexit.register(clear_lock)
+ if len(sys.argv) > 1:
+ if sys.argv[1] == '-d':
+ debug=1
+ if not try_lock():
+ sys.exit(1)
+ if not os.path.exists(html_dir):
+ os.makedirs(html_dir)
+ env_setup()
+ # get a proxy cert
+ # get_proxy()
+
+ timeout_sec = 10 * 60 # limit overall runtime to 10min
+ signal.signal(signal.SIGALRM, alarm_handler)
+
+ ATLASLINK="%BR%Monitoring:%BR%\n http://atl-prod07.slac.stanford.edu:8080/display?page=xrd_report/aggregated/total_xrootd_lgn %BR%\n http://dashb-atlas-xrootd-transfers.cern.ch/ui %BR%\nhttp://dashb-atlas-ssb.cern.ch/dashboard/request.py/siteview#currentView=FAX+redirectors&highlight=false %BR%\n"
+ CMSLINK="%BR%Monitoring:%BR%\n http://xrootd.t2.ucsd.edu/dashboard/ %BR%\n http://dashb-cms-xrootd-transfers.cern.ch/ui %BR%\n"
+ FILEABOVE="/store/mc/SAM/GenericTTbar/AODSIM/CMSSW_9_2_6_91X_mcRun1_realistic_v2-v1/00000/A64CCCF2-5C76-E711-B359-0CC47A78A3F8.root"
+ FILEBELOW="/store/mc/SAM/GenericTTbar/AODSIM/CMSSW_9_2_6_91X_mcRun1_realistic_v2-v1/00000/A64CCCF2-5C76-E711-B359-0CC47A78A3F8.root"
+
+ services = {
+ "XRDFED_CMS-GLOBAL01-NEW":{'redirector':'cms-xrd-global01.cern.ch:1094',
+ 'file_below': FILEABOVE,
+ 'file_above': FILEBELOW,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-GLOBAL02-NEW":{'redirector':'cms-xrd-global02.cern.ch:1094',
+ 'file_below': FILEABOVE,
+ 'file_above': FILEBELOW,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-US-FNAL":{'redirector':'cmsxrootd2.fnal.gov:1094',
+ 'file_below': FILEABOVE,
+ 'file_above': FILEBELOW,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-US-UNL":{'redirector':'xrootd.unl.edu:1094',
+ 'file_below': FILEABOVE,
+ 'file_above': FILEBELOW,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-EU-BARI":{'redirector':'xrootd.ba.infn.it:1094',
+ 'file_below': FILEBELOW,
+ 'file_above': FILEABOVE,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-EU-LLR":{'redirector':'llrxrd-redir.in2p3.fr:1094',
+ 'file_below': FILEBELOW,
+ 'file_above': FILEABOVE,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-EU-PISA":{'redirector':'xrootd-redic.pi.infn.it:1094',
+ 'file_below': FILEBELOW,
+ 'file_above': FILEABOVE,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-GLOBAL":{'redirector':'cms-xrd-global.cern.ch:1094',
+ 'file_below': FILEABOVE,
+ 'file_above': FILEBELOW,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-US":{'redirector':'cmsxrootd.fnal.gov:1094',
+ 'file_below': FILEABOVE,
+ 'file_above': FILEBELOW,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-EU":{'redirector':'xrootd-cms.infn.it:1094',
+ 'file_below': FILEBELOW,
+ 'file_above': FILEABOVE,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-EU-IPv6":{ 'redirector':'xrootd-cms-redir-01.cr.cnaf.infn.it:1094',
+ 'file_below': FILEBELOW,
+ 'file_above': FILEABOVE,
+ 'extra_notes':CMSLINK},
+ "XRDFED_CMS-TRANSIT":{'redirector':'cms-xrd-transit.cern.ch:1094',
+ 'file_below': FILEBELOW,
+ 'file_above': FILEABOVE,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-TRANSIT01":{'redirector':'vocms031.cern.ch:1094',
+ 'file_below': FILEBELOW,
+ 'file_above': FILEABOVE,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-TRANSIT02":{'redirector':'vocms032.cern.ch:1094',
+ 'file_below': FILEBELOW,
+ 'file_above': FILEABOVE,
+ 'extra_notes':CMSLINK},
+
+ }
+ signal.alarm(timeout_sec)
+ try:
+ with open('KIBANA_PROBES.js', 'w') as outfile:
+ for xrd in services:
+ services[xrd].update(servicename=xrd)
+ if debug:
+ dic = test_redirector(** services[xrd])
+ json.dump(dic, outfile)
+ outfile.write('\n')
+ else:
+ t = threading.Thread(target=test_redirector,
+ kwargs = services[xrd]) # read: "run a thread with the test function and all the parameters above as arguments"
+ t.start()
+
+
+ except Alarm:
+ print "ERROR: caught overall timeout after "+str(timeout_sec)+"s\n"
+ clear_lock()
+ sys.exit(2)
+ signal.alarm(0)
+ # not cleaning up the proxy files (are shared via the ENV, and we don't want an extra thread to just remove that file, or wait for the individual tests to finish...
+
+if __name__ == '__main__':
+ main()
diff --git a/AAAOps/XfedKibana/XRDFED-kibana-probe_to_JSON-Copy2.py b/AAAOps/XfedKibana/XRDFED-kibana-probe_to_JSON-Copy2.py
new file mode 100644
index 0000000..88f3c8e
--- /dev/null
+++ b/AAAOps/XfedKibana/XRDFED-kibana-probe_to_JSON-Copy2.py
@@ -0,0 +1,349 @@
+#!/usr/bin/python
+# functional probe and SLS extractor for the "federation" xroot services
+# highlights:
+# - stateless (i.e. run from cron whenever needed)
+# - will try to prevent parallel runs via lockfile
+# - multithreaded, one thread per service to be tested
+# - overall runtime cap at 10min
+# - could extract some statistics from xroot directly, but these are ever-increasing counters
+# Problems:
+# - need to update the code whenever a service is addded/deleted/changed
+# - uses "random" files on various Xroot services all over the world, these are (for now) the same as used by the experiments but these might change..
+
+import xml.dom.minidom
+import subprocess
+import os
+import sys
+import signal
+import re
+import time
+import Lemon.XMLAPI
+import socket
+import atexit
+import threading
+import tempfile
+import json
+import shutil
+
+html_dir = '/root/ogarzonm/' # will create per-service json files here
+
+LOCKFILE='/var/lock/subsys/xrdfed-kibana-probe'
+
+class Alarm(Exception):
+ pass
+
+def alarm_handler(signum, frame):
+ print "ERROR: caught overall timeout after "+str(timeout_sec)+"s\n"
+ clear_lock()
+ sys.exit(2)
+ raise Alarm
+
+def clear_lock():
+ try:
+ os.unlink(LOCKFILE)
+ except Exception,e:
+ print "could not remove lockfile:"+str(e)
+
+def env_setup():
+ os.environ['X509_USER_CERT']='/root/.globus/slsprobe-cert.pem'
+ os.environ['X509_USER_KEY']='/root/.globus/slsprobe-key.pem'
+ os.environ['X509_USER_PROXY']='/root/.globus/slsprobe.proxy'
+ os.environ['KRB5CCNAME']='FILE:/dev/null'
+ os.environ['PATH']=os.environ['PATH']+":/opt/globus/bin/"
+
+def get_proxy():
+ dev_null = open('/dev/null', 'rw')
+ (proxyfd,proxy)=tempfile.mkstemp(prefix='x509_xrdfed_',suffix='.pem')
+ os.close(proxyfd)
+ os.environ['X509_USER_PROXY']=proxy
+ ret = subprocess.call(['grid-proxy-init','-pwstdin'],stdin=dev_null,)
+ if ret > 0:
+ raise Exception("Cannot get X509 proxy")
+ dev_null.close()
+
+def cleanup_proxy():
+ try:
+ os.unlink(os.environ['X509_USER_PROXY'])
+ except Exception,e:
+ print "could not remove proxy file:"+str(e)
+
+def try_lock():
+ ret = subprocess.call(['lockfile','-5','-r2',LOCKFILE])
+ if ret > 0:
+ print "could not create lockfile"
+ return False
+ return True
+
+def prepare_dictionary(servicename):
+ dic={'service':servicename}
+ return dic
+def dnsalias_to_nodes(redirector):
+ (host,port) = redirector.split(':')
+ all_hosts = []
+ data=socket.getaddrinfo(host,port,0, 0, socket.SOL_TCP )
+ for addr in data:
+ (family, socktype, proto, canonname, sockaddr) = addr
+ (hostname, aliaslist, ipaddrlist) = socket.gethostbyaddr(sockaddr[0])
+ if not hostname in all_hosts:
+ all_hosts.append(hostname)
+ return all_hosts
+
+def xrdcp_test(redirector,file):
+ (errtext,out,err,elapsed) = run_xrd_commands("xrdcp",
+ ["-d","1",
+ "-f",
+ "-DIReadCacheSize","0",
+ "-DIRedirCntTimeout","180",
+ "root://"+redirector+'/'+file,
+ '/dev/null'])
+ return (errtext,err,elapsed)
+
+def xrd_info(redirector):
+ version = "(unknown)"
+ (errtext,out,err,elapsed) = run_xrd_commands("xrd",
+ [redirector,
+ "query","1", # 1:kXR_QStats
+ "a"]) # a_ll stats
+ if not errtext:
+ try:
+ dom = xml.dom.minidom.parseString(out)
+ root_node = dom.documentElement
+ if root_node.tagName == 'statistics':
+ v_attr = root_node.getAttributeNode('ver')
+ version = v_attr.nodeValue
+ except Exception,e:
+ errtext = "ERROR: cannot parse answer:"+str(e)
+ return (errtext,version,out)
+
+def run_xrd_commands(cmd,args):
+ dev_null = open('/dev/null', 'r')
+ errtxt = ''
+ elapsed = -1.0
+ xrd_args = [ 'perl','-e',"alarm 180; exec @ARGV", cmd, # one-line wrapper that *actually* kills the command
+ "-DIConnectTimeout","30",
+ "-DITransactionTimeout","60",
+ "-DIRequestTimeout","60" ] + args
+ err = ''
+ out = ''
+ try:
+ ran_try = True
+ start = time.time()
+ proc = subprocess.Popen(xrd_args,
+ stdin=dev_null,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ (out, err) = proc.communicate()
+
+ ret = proc.returncode
+ elapsed = (time.time() - start)
+ err_redir_index = err.rfind('Received redirection to')
+ err_index3010 = err.rfind('(error code: 3010') # (permission denied) may be sort-of-OK - we are talking to final storage already - UK
+ err_index3005 = err.rfind('(error code: 3005') # (no user mapping) - INFN
+ if err_redir_index >= 0 and (err_index3010 >= 0 or err_index3005 >= 0):
+ errtxt = ''
+ else:
+ if(ret > 0):
+ errtxt = "client-side error - exit code "+str(ret)+"\n"
+ err_index = err.rfind('Last server error')
+ if err_index >= 0:
+ err_end_index=err.find("\n",err_index)
+ errtxt = errtxt + err[err_index:err_end_index]
+ except Exception,e:
+ errtext = errtxt + "Exception: "+str(e)
+ dev_null.close()
+ return (errtxt,out,err,elapsed)
+
+def test_redirector(servicename, redirector, file_below=None, file_above=None, extra_notes=""):
+ servicename=servicename.upper()
+ notes_text = "Redirector:"+redirector
+ availability = 'available'
+ availinfo = ''
+ c = 'No comment'
+
+ # prepare the dictionary.
+ dicci = prepare_dictionary(servicename)
+ dicci['Host'] = redirector.split(':')[0]
+
+ # run the functional tests - first some simple check to get the version, if OK look for files
+ (err_info,version,dump_info) = xrd_info(redirector)
+ if(err_info):
+
+ availability = 'unavailable'
+ availinfo=availinfo+" Error getting info from redirector "+err_info
+ dicci["xrdcp_below_time"] = 0
+ #dicci["status"] = "unavailable"
+
+ else:
+ availinfo="Version check: "+version
+ if (file_below):
+ notes_text = notes_text + "File 'below': " + file_below
+ (err_below,dump_below,elapsed_below) = xrdcp_test(redirector, file_below)
+ if err_below:
+ availability = 'degraded'
+ availinfo=availinfo+" Error below redirector "+err_below
+ dump_sane = re.sub('---*','__',dump_below)
+ c = "Detailed output for file BELOW "+redirector+":"+file_below+" "+err_below+" "+dump_sane
+ #dicci['comment'] = c
+ else:
+ availinfo=availinfo+" File below: OK "
+ dicci['xrdcp_below_time'] = str(elapsed_below)
+ else:
+ availinfo=availinfo+" File below: not tested."
+ if(file_above):
+ notes_text = notes_text + "File 'elsewhere': " + file_above
+ (err_above,dump_above,elapsed_above) = xrdcp_test(redirector, file_above)
+ if err_above :
+ #We've changed availability from number to string so this below won't work; Marian commented out on 2015-11-06
+ #availability = availability * 0.8 # less important if some remote site is failing..
+ availinfo=availinfo+" Error above redirector "+err_above
+ # sanitize the raw output in order to not trigger XML errors.. in a comment.
+ dump_sane = re.sub('---*','__',dump_above)
+ c = "Detailed output for file ABOVE "+redirector+":"+file_above+" "+err_above+" "+dump_sane
+ #dicci = {**dicci, **{'comment': c}}
+ #serviceUpdate.appendChild(c)
+ #need_xml_link=1
+ else:
+ availinfo=availinfo+" File above: OK "
+ #nValue = doc.createElement("numericvalue")
+ #nValue.setAttribute("name", "xrdcp_above_time")
+ #nValue.setAttribute("desc", "Time to copy a file elsewhere in the federation")
+ #nValue.appendChild(doc.createTextNode(str(elapsed_above)))
+ dicci['xrdcp_above_time'] = str(elapsed_above)
+ #data.appendChild(nValue)
+ else:
+ availinfo=availinfo+" File above: not tested."
+
+ # save functional test info to XML
+ #if need_xml_link:
+ # myhostname = socket.gethostname()
+ # notes_text = notes_text + "Details for failed test: http://" + myhostname + "/aaa-probe/" + servicename + ".xml
\n" + "Details for recently failed test : http://vocms039.cern.ch/aaa-probe/err/
\n"
+ availinfo = availinfo + " " + notes_text
+ dicci['status']= str(availability)
+ if availability == 'unavailable' or availability == 'degraded':
+ dicci ['availInfo'] = availinfo
+ dicci ['Comment'] = c
+ else:
+ dicci['Version'] = version
+ #dicci['availabilityinfo']=availinfo
+ with open(html_dir +'KIBANA_PROBES.json', 'a') as f:
+ json.dump(dicci, f)
+ f.write('\n')
+
+
+def main():
+ debug = 0
+ atexit.register(clear_lock)
+ if len(sys.argv) > 1:
+ if sys.argv[1] == '-d':
+ debug=1
+ if not try_lock():
+ sys.exit(1)
+ if not os.path.exists(html_dir):
+ os.makedirs(html_dir)
+ env_setup()
+ # get a proxy cert
+ # get_proxy()
+
+ timeout_sec = 10 * 60 # limit overall runtime to 10min
+ signal.signal(signal.SIGALRM, alarm_handler)
+
+ ATLASLINK="%BR%Monitoring:%BR%\n http://atl-prod07.slac.stanford.edu:8080/display?page=xrd_report/aggregated/total_xrootd_lgn %BR%\n http://dashb-atlas-xrootd-transfers.cern.ch/ui %BR%\nhttp://dashb-atlas-ssb.cern.ch/dashboard/request.py/siteview#currentView=FAX+redirectors&highlight=false %BR%\n"
+ CMSLINK="%BR%Monitoring:%BR%\n http://xrootd.t2.ucsd.edu/dashboard/ %BR%\n http://dashb-cms-xrootd-transfers.cern.ch/ui %BR%\n"
+ FILEABOVE="/store/mc/SAM/GenericTTbar/AODSIM/CMSSW_9_2_6_91X_mcRun1_realistic_v2-v1/00000/A64CCCF2-5C76-E711-B359-0CC47A78A3F8.root"
+ FILEBELOW="/store/mc/SAM/GenericTTbar/AODSIM/CMSSW_9_2_6_91X_mcRun1_realistic_v2-v1/00000/A64CCCF2-5C76-E711-B359-0CC47A78A3F8.root"
+
+ services = {
+ "XRDFED_CMS-GLOBAL01-NEW":{'redirector':'cms-xrd-global01.cern.ch:1094',
+ 'file_below': FILEABOVE,
+ 'file_above': FILEBELOW,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-GLOBAL02-NEW":{'redirector':'cms-xrd-global02.cern.ch:1094',
+ 'file_below': FILEABOVE,
+ 'file_above': FILEBELOW,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-US-FNAL":{'redirector':'cmsxrootd2.fnal.gov:1094',
+ 'file_below': FILEABOVE,
+ 'file_above': FILEBELOW,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-US-UNL":{'redirector':'xrootd.unl.edu:1094',
+ 'file_below': FILEABOVE,
+ 'file_above': FILEBELOW,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-EU-BARI":{'redirector':'xrootd.ba.infn.it:1094',
+ 'file_below': FILEBELOW,
+ 'file_above': FILEABOVE,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-EU-LLR":{'redirector':'llrxrd-redir.in2p3.fr:1094',
+ 'file_below': FILEBELOW,
+ 'file_above': FILEABOVE,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-EU-PISA":{'redirector':'xrootd-redic.pi.infn.it:1094',
+ 'file_below': FILEBELOW,
+ 'file_above': FILEABOVE,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-GLOBAL":{'redirector':'cms-xrd-global.cern.ch:1094',
+ 'file_below': FILEABOVE,
+ 'file_above': FILEBELOW,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-US":{'redirector':'cmsxrootd.fnal.gov:1094',
+ 'file_below': FILEABOVE,
+ 'file_above': FILEBELOW,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-EU":{'redirector':'xrootd-cms.infn.it:1094',
+ 'file_below': FILEBELOW,
+ 'file_above': FILEABOVE,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-EU-IPv6":{ 'redirector':'xrootd-cms-redir-01.cr.cnaf.infn.it:1094',
+ 'file_below': FILEBELOW,
+ 'file_above': FILEABOVE,
+ 'extra_notes':CMSLINK},
+ "XRDFED_CMS-TRANSIT":{'redirector':'cms-xrd-transit.cern.ch:1094',
+ 'file_below': FILEBELOW,
+ 'file_above': FILEABOVE,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-TRANSIT01":{'redirector':'vocms031.cern.ch:1094',
+ 'file_below': FILEBELOW,
+ 'file_above': FILEABOVE,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-TRANSIT02":{'redirector':'vocms032.cern.ch:1094',
+ 'file_below': FILEBELOW,
+ 'file_above': FILEABOVE,
+ 'extra_notes':CMSLINK},
+
+ }
+ signal.alarm(timeout_sec)
+ #shutil.copyfile(html_dir+'KIBANA_PROBES.json', html_dir+'KIBANA_PROBES_2.json')
+ os.remove(html_dir+'KIBANA_PROBES.json')
+ #shutil.copyfile(html_dir+'KIBANA_PROBES.json', html_dir+'KIBANA_PROBES_2.json')
+ try:
+
+ for xrd in services:
+ services[xrd].update(servicename=xrd)
+ if debug:
+ test_redirector(** services[xrd])
+ else:
+ t = threading.Thread(target=test_redirector, kwargs = services[xrd]) # read: "run a thread with the test function and all the parameters above as arguments"
+ t.start()
+ except Alarm:
+ print "ERROR: caught overall timeout after "+str(timeout_sec)+"s\n"
+ clear_lock()
+ sys.exit(2)
+ signal.alarm(0)
+ #shutil.copyfile(html_dir+'KIBANA_PROBES.json', html_dir+'KIBANA_PROBES_2.json')
+ # not cleaning up the proxy files (are shared via the ENV, and we don't want an extra thread to just remove that file, or wait for the individual tests to finish...
+
+if __name__ == '__main__':
+ main()
+
diff --git a/AAAOps/XfedKibana/XRDFED-kibana-probe_to_JSON.py b/AAAOps/XfedKibana/XRDFED-kibana-probe_to_JSON.py
new file mode 100755
index 0000000..55dd8f8
--- /dev/null
+++ b/AAAOps/XfedKibana/XRDFED-kibana-probe_to_JSON.py
@@ -0,0 +1,466 @@
+#!/usr/bin/python
+# functional probe and SLS extractor for the "federation" xroot services
+# highlights:
+# - stateless (i.e. run from cron whenever needed)
+# - will try to prevent parallel runs via lockfile
+# - multithreaded, one thread per service to be tested
+# - overall runtime cap at 10min
+# - could extract some statistics from xroot directly, but these are ever-increasing counters
+# Problems:
+# - need to update the code whenever a service is addded/deleted/changed
+# - uses "random" files on various Xroot services all over the world, these are (for now) the same as used by the experiments but these might change..
+
+import xml.dom.minidom
+import subprocess
+import os
+import sys
+import signal
+import re
+import time
+import Lemon.XMLAPI
+import socket
+import atexit
+import threading
+import tempfile
+import json
+
+html_dir = '/var/www/html/aaa-probe/' # will create per-service xml files here
+
+#CERN_eosfile_rucio='/atlas/rucio/user/ivukotic:user.ivukotic.xrootd.cern-prod-1M'
+
+LOCKFILE='/var/lock/subsys/xrdfed-kibana-probe'
+
+class Alarm(Exception):
+ pass
+
+def alarm_handler(signum, frame):
+ print "ERROR: caught overall timeout after "+str(timeout_sec)+"s\n"
+ clear_lock()
+ sys.exit(2)
+ raise Alarm # should not reach this..
+
+def clear_lock():
+ try:
+ os.unlink(LOCKFILE)
+ except Exception,e:
+ print "could not remove lockfile:"+str(e)
+
+def env_setup():
+ # GSI
+ os.environ['X509_USER_CERT']='/root/.globus/slsprobe-cert.pem'
+ os.environ['X509_USER_KEY']='/root/.globus/slsprobe-key.pem'
+ #os.environ['X509_USER_CERT']='/etc/grid-security/hostcert.pem'
+ #os.environ['X509_USER_KEY']='/etc/grid-security/hostkey.pem'
+ #set this only if get_proxy() is not used and proxy is refreshed by other mechanism e.g. from cronttab
+ os.environ['X509_USER_PROXY']='/root/.globus/slsprobe.proxy'
+ # these work for SSL
+ #os.environ['XrdSecSSLUSERKEY']='/etc/grid-security/hostkey.pem '
+ #os.environ['XrdSecSSLUSERCERT']='/etc/grid-security/hostcert.pem '
+ # specifically tell it to *not* use Kerberos (in case we run this interactively)
+ os.environ['KRB5CCNAME']='FILE:/dev/null'
+ os.environ['PATH']=os.environ['PATH']+":/opt/globus/bin/"
+
+
+def get_proxy():
+ dev_null = open('/dev/null', 'rw')
+ (proxyfd,proxy)=tempfile.mkstemp(prefix='x509_xrdfed_',suffix='.pem')
+ os.close(proxyfd)
+ os.environ['X509_USER_PROXY']=proxy
+
+ ret = subprocess.call(['grid-proxy-init','-pwstdin'],
+ stdin=dev_null,
+ ) # most of the proxy details will get printed in "xrdcp" output anyway
+ if ret > 0:
+ raise Exception("Cannot get X509 proxy")
+ dev_null.close()
+
+
+def cleanup_proxy():
+ try:
+ os.unlink(os.environ['X509_USER_PROXY'])
+ except Exception,e:
+ print "could not remove proxy file:"+str(e)
+
+def try_lock():
+ # use "lockfile" directly since wayy easier
+ ret = subprocess.call(['lockfile','-5','-r2',LOCKFILE])
+ if ret > 0:
+ print "could not create lockfile"
+ return False
+ return True
+
+"""
+
+def prepare_xml(servicename):
+ doc = xml.dom.minidom.Document()
+ serviceUpdate = doc.createElementNS("http://sls.cern.ch/SLS/XML/update","serviceupdate")
+ serviceUpdate.setAttribute("xmlns", "http://sls.cern.ch/SLS/XML/update")
+ doc.appendChild(serviceUpdate)
+ id = doc.createElement("id")
+ id.appendChild(doc.createTextNode(servicename))
+ serviceUpdate.appendChild(id)
+ data = doc.createElement("data")
+ serviceUpdate.appendChild(data)
+ timeStampTmp = time.strftime("%Y-%m-%dT%H:%M:%S%z")
+ timeStamp=timeStampTmp[0:-2]+':'+timeStampTmp[-2:] # xml needs a colon, nobody else..
+ tValue = doc.createElement("timestamp")
+ tValue.appendChild(doc.createTextNode(timeStamp))
+ serviceUpdate.appendChild(tValue)
+ return (doc,data, serviceUpdate)
+
+"""
+
+def prepare_dictionary(servicename):
+ timeStampTmp = time.strftime("%Y-%m-%dT%H:%M:%S%z")
+ timeStamp=timeStampTmp[0:-2]+':'+timeStampTmp[-2:]
+ dic={'serviceupdate':{
+ 'xmlns': "http://sls.cern.ch/SLS/XML/update"
+ 'data': '',
+ 'id':servicename,
+ 'timestamp': timeStamp
+ }
+
+
+ }
+ return dic
+def dnsalias_to_nodes(redirector):
+ (host,port) = redirector.split(':')
+ all_hosts = []
+ data=socket.getaddrinfo(host,port,0, 0, socket.SOL_TCP )
+ for addr in data:
+ (family, socktype, proto, canonname, sockaddr) = addr
+ (hostname, aliaslist, ipaddrlist) = socket.gethostbyaddr(sockaddr[0])
+ if not hostname in all_hosts:
+ all_hosts.append(hostname)
+ return all_hosts
+
+def xrdcp_test(redirector,file):
+ (errtext,out,err,elapsed) = run_xrd_commands("xrdcp",
+ ["-d","2",
+ "-f",
+ "-DIReadCacheSize","0",
+ "-DIRedirCntTimeout","180",
+ "root://"+redirector+'/'+file,
+ '/dev/null'])
+ return (errtext,err,elapsed)
+
+def xrd_info(redirector):
+ version = "(unknown)"
+ (errtext,out,err,elapsed) = run_xrd_commands("xrd",
+ [redirector,
+ "query","1", # 1:kXR_QStats
+ "a"]) # a_ll stats
+ if not errtext:
+ try:
+ dom = xml.dom.minidom.parseString(out)
+ root_node = dom.documentElement
+ if root_node.tagName == 'statistics':
+ v_attr = root_node.getAttributeNode('ver')
+ version = v_attr.nodeValue
+ except Exception,e:
+ errtext = "ERROR: cannot parse answer:"+str(e)
+ return (errtext,version,out)
+
+def run_xrd_commands(cmd,args):
+ dev_null = open('/dev/null', 'r')
+ errtxt = ''
+ elapsed = -1.0
+ xrd_args = [ 'perl','-e',"alarm 180; exec @ARGV", cmd, # one-line wrapper that *actually* kills the command
+ "-DIConnectTimeout","30",
+ "-DITransactionTimeout","60",
+ "-DIRequestTimeout","60" ] + args
+ try:
+ start = time.time()
+ proc = subprocess.Popen(xrd_args,
+ stdin=dev_null,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ (out, err) = proc.communicate()
+ ret = proc.returncode
+ elapsed = (time.time() - start)
+ err_redir_index = err.rfind('Received redirection to')
+ err_index3010 = err.rfind('(error code: 3010') # (permission denied) may be sort-of-OK - we are talking to final storage already - UK
+ err_index3005 = err.rfind('(error code: 3005') # (no user mapping) - INFN
+ if err_redir_index >= 0 and (err_index3010 >= 0 or err_index3005 >= 0):
+ errtxt = ''
+ else:
+ if(ret > 0):
+ errtxt = "client-side error - exit code "+str(ret)+"\n"
+ err_index = err.rfind('Last server error')
+ if err_index >= 0:
+ err_end_index=err.find("\n",err_index)
+ errtxt = errtxt + err[err_index:err_end_index]
+ except Exception,e:
+ errtext = errtxt + "Exception: "+str(e)
+ dev_null.close()
+
+ return (errtxt,out,err,elapsed)
+
+
+
+def test_redirector(servicename, redirector, file_below=None, file_above=None, extra_notes=""):
+ servicename=servicename.upper()
+ #availability = 100.0
+ availability = 'available'
+ availinfo = ''
+ need_xml_link=0
+
+ # prepare the dictionary.
+ dicci = prepare_dictionary(servicename)
+ #(doc, data, serviceUpdate) = prepare_xml(servicename)
+ notes_text = "Redirector:"+redirector+"
"
+
+ # run the functional tests - first some simple check to get the version, if OK look for files
+ (err_info,version,dump_info) = xrd_info(redirector)
+ if(err_info):
+ #availability = 0.0
+ availability = 'unavailable'
+ availinfo=availinfo+"
Error getting info from redirector
"+err_info
+ c="Detailed output for INFO for "+redirector+"\n"+
+ err_info+"\n"
+ +dump_info
+ #serviceUpdate.appendChild(c)
+ dicci = {**dicci, **{'comment': c}}
+ numericvalue = {
+
+ 'name': "xrdcp_below_time",
+ 'desc' : "Time to copy a file below redirector",
+ 'value': '0.000'
+ }
+ #nValue.setAttribute("name", "xrdcp_below_time")
+ #nValue.setAttribute("desc", "Time to copy a file below redirector")
+ #nValue.appendChild(doc.createTextNode("0.000"))
+ dicci['serviceupdate']['data'] = numericvalue
+ #data.appendChild(nValue)
+
+ else:
+ availinfo="Version check: "+version+"\n"
+ if (file_below):
+ notes_text = notes_text + "File 'below': " + file_below + "
"
+ (err_below,dump_below,elapsed_below) = xrdcp_test(redirector, file_below)
+ if err_below :
+ #availability = 66.0
+ availability = 'degraded'
+ availinfo=availinfo+"
Error below redirector
\n"+err_below
+ # sanitize the raw output in order to not trigger XML errors..
+ dump_sane = re.sub('---*','__',dump_below)
+ c = "Detailed output for file BELOW "+redirector+":"+file_below+"\n"+
+ err_below+"\n"
+ +dump_sane
+ dicci = {**dicci, **{'comment': c}}
+ need_xml_link=1
+ else:
+ availinfo=availinfo+"
File below: OK
"
+ numericvalue = {
+ 'name': "xrdcp_below_time",
+ "desc": "Time to copy a file below redirector",
+ "elapsed_below": str(elapsed_below)
+ }
+ #nValue = doc.createElement("numericvalue")
+ #nValue.setAttribute("name", "xrdcp_below_time")
+ #nValue.setAttribute("desc", "Time to copy a file below redirector")
+ #nValue.appendChild(doc.createTextNode(str(elapsed_below)))
+ dicci['serviceupdate']['data'] = numericvalue
+ #data.appendChild(nValue)
+ else:
+ availinfo=availinfo+"
File below: not tested."
+ if(file_above):
+ notes_text = notes_text + "File 'elsewhere': " + file_above + "
"
+ (err_above,dump_above,elapsed_above) = xrdcp_test(redirector, file_above)
+ if err_above :
+ #We've changed availability from number to string so this below won't work; Marian commented out on 2015-11-06
+ #availability = availability * 0.8 # less important if some remote site is failing..
+ availinfo=availinfo+"
Error above redirector
"+err_above
+ # sanitize the raw output in order to not trigger XML errors.. in a comment.
+ dump_sane = re.sub('---*','__',dump_above)
+ c = "Detailed output for file ABOVE "+redirector+":"+file_above+"\n"+
+ err_above+"\n"
+ +dump_sane
+ dicci = {**dicci, **{'comment': c}}
+ #serviceUpdate.appendChild(c)
+ need_xml_link=1
+ else:
+ availinfo=availinfo+"
File above: OK
"
+ numericvalue = {
+ 'name': "xrdcp_below_time",
+ "desc": "Time to copy a file below redirector",
+ "elapsed_below": str(elapsed_below)
+ }
+ #nValue = doc.createElement("numericvalue")
+ #nValue.setAttribute("name", "xrdcp_above_time")
+ #nValue.setAttribute("desc", "Time to copy a file elsewhere in the federation")
+ #nValue.appendChild(doc.createTextNode(str(elapsed_above)))
+ dicci['serviceupdate']['data'] = numericvalue
+ #data.appendChild(nValue)
+ else:
+ availinfo=availinfo+"
File above: not tested."
+
+ # save functional test info to XML
+ if need_xml_link:
+ myhostname = socket.gethostname()
+ notes_text = notes_text + "Details for failed test: http://" + myhostname + "/aaa-probe/" + servicename + ".xml
\n" + "Details for recently failed test : http://vocms039.cern.ch/aaa-probe/err/
\n"
+ availinfo = availinfo + "
" + notes_text
+ dicci = {**dicci, **{'status': str(availability)}}
+ dicci = {**dicci, **{'status': availinfo}}
+ #availabilityF = doc.createElement("status")
+ #availabilityF.appendChild(doc.createTextNode(str(availability)))
+ #serviceUpdate.appendChild(availabilityF)
+ #availabilityInfo = doc.createElement("availabilityinfo")
+ #availabilityInfo.appendChild(doc.createTextNode(availinfo))
+ #serviceUpdate.appendChild(availabilityInfo)
+
+ # commented out by Engin
+ #notes = doc.createElement("notes")
+ #notes.appendChild(doc.createTextNode(notes_text + extra_notes))
+ #serviceUpdate.appendChild(notes)
+
+ # collect LEMON stuff, save as numericvalues in XML
+ # Problem: need to add up for all the hosts in the alias
+ # 2015-11-09: commented out by MarianZ; note I removed functions getLemonMetric and collect_lemon
+ #collect_lemon(redirector=redirector,doc=doc,data=data)
+
+ # try to get some "readable XML" that still is accepted by SLS:
+ #uglyXml = doc.toprettyxml(indent=" ",encoding="utf-8")
+ #text_re = re.compile('>\n\s+([^<>\s].*?)\n\s+', re.DOTALL)
+ #prettyXml = text_re.sub('>\g<1>', uglyXml)
+
+ # write XML to file
+ #try:
+ #xmlFileH = open(xmlFile, 'w')
+ #xmlFileH.write(prettyXml);
+ #xmlFileH.close()
+ #except Exception, e:
+ #print "cannot write new availability file "+xmlFile+":",e
+ jsonFile = html_dir + '/' + servicename +'.json'
+ with open(jsonFile, 'w') as fp:
+ json.dump(dicci, fp)
+
+
+def main():
+ debug = 0
+ atexit.register(clear_lock)
+ if len(sys.argv) > 1:
+ if sys.argv[1] == '-d':
+ debug=1
+ if not try_lock():
+ sys.exit(1)
+ if not os.path.exists(html_dir):
+ os.makedirs(html_dir)
+ env_setup()
+ # get a proxy cert
+ # get_proxy()
+
+ timeout_sec = 10 * 60 # limit overall runtime to 10min
+ signal.signal(signal.SIGALRM, alarm_handler)
+
+ ATLASLINK="%BR%Monitoring:%BR%\n http://atl-prod07.slac.stanford.edu:8080/display?page=xrd_report/aggregated/total_xrootd_lgn %BR%\n http://dashb-atlas-xrootd-transfers.cern.ch/ui %BR%\nhttp://dashb-atlas-ssb.cern.ch/dashboard/request.py/siteview#currentView=FAX+redirectors&highlight=false %BR%\n"
+ CMSLINK="%BR%Monitoring:%BR%\n http://xrootd.t2.ucsd.edu/dashboard/ %BR%\n http://dashb-cms-xrootd-transfers.cern.ch/ui %BR%\n"
+ #FILEABOVE="/store/test/xrootd/T2_US_Nebraska/store/mc/SAM/GenericTTbar/GEN-SIM-RECO/CMSSW_5_3_1_START53_V5-v1/0013/CE4D66EB-5AAE-E111-96D6-003048D37524.root"
+ #FILEBELOW="/store/test/xrootd/T2_CH_CERN/store/mc/SAM/GenericTTbar/GEN-SIM-RECO/CMSSW_5_3_1_START53_V5-v1/0013/CE4D66EB-5AAE-E111-96D6-003048D37524.root"
+ #FILEABOVE="/store/mc/SAM/GenericTTbar/GEN-SIM-RECO/CMSSW_5_3_1_START53_V5-v1/0013/CE4D66EB-5AAE-E111-96D6-003048D37524.root"
+ #FILEBELOW="/store/mc/SAM/GenericTTbar/GEN-SIM-RECO/CMSSW_5_3_1_START53_V5-v1/0013/CE4D66EB-5AAE-E111-96D6-003048D37524.root"
+ FILEABOVE="/store/mc/SAM/GenericTTbar/AODSIM/CMSSW_9_2_6_91X_mcRun1_realistic_v2-v1/00000/A64CCCF2-5C76-E711-B359-0CC47A78A3F8.root"
+ FILEBELOW="/store/mc/SAM/GenericTTbar/AODSIM/CMSSW_9_2_6_91X_mcRun1_realistic_v2-v1/00000/A64CCCF2-5C76-E711-B359-0CC47A78A3F8.root"
+
+ services = {
+ "XRDFED_CMS-GLOBAL01-NEW":{'redirector':'cms-xrd-global01.cern.ch:1094',
+ 'file_below': FILEABOVE,
+ 'file_above': FILEBELOW,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-GLOBAL02-NEW":{'redirector':'cms-xrd-global02.cern.ch:1094',
+ 'file_below': FILEABOVE,
+ 'file_above': FILEBELOW,
+ 'extra_notes':CMSLINK},
+
+# "XRDFED_CMS-GLOBAL01":{'redirector':'xrdcmsglobal01.cern.ch:1094',
+# 'file_below': FILEABOVE,
+# 'file_above': FILEBELOW,
+# 'extra_notes':CMSLINK},
+
+# "XRDFED_CMS-GLOBAL02":{'redirector':'xrdcmsglobal02.cern.ch:1094',
+# 'file_below': FILEABOVE,
+# 'file_above': FILEBELOW,
+# 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-US-FNAL":{'redirector':'cmsxrootd2.fnal.gov:1094',
+ 'file_below': FILEABOVE,
+ 'file_above': FILEBELOW,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-US-UNL":{'redirector':'xrootd.unl.edu:1094',
+ 'file_below': FILEABOVE,
+ 'file_above': FILEBELOW,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-EU-BARI":{'redirector':'xrootd.ba.infn.it:1094',
+ 'file_below': FILEBELOW,
+ 'file_above': FILEABOVE,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-EU-LLR":{'redirector':'llrxrd-redir.in2p3.fr:1094',
+ 'file_below': FILEBELOW,
+ 'file_above': FILEABOVE,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-EU-PISA":{'redirector':'xrootd-redic.pi.infn.it:1094',
+ 'file_below': FILEBELOW,
+ 'file_above': FILEABOVE,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-GLOBAL":{'redirector':'cms-xrd-global.cern.ch:1094',
+ 'file_below': FILEABOVE,
+ 'file_above': FILEBELOW,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-US":{'redirector':'cmsxrootd.fnal.gov:1094',
+ 'file_below': FILEABOVE,
+ 'file_above': FILEBELOW,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-EU":{'redirector':'xrootd-cms.infn.it:1094',
+ 'file_below': FILEBELOW,
+ 'file_above': FILEABOVE,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-EU-IPv6":{ 'redirector':'xrootd-cms-redir-01.cr.cnaf.infn.it:1094',
+ 'file_below': FILEBELOW,
+ 'file_above': FILEABOVE,
+ 'extra_notes':CMSLINK},
+ "XRDFED_CMS-TRANSIT":{'redirector':'cms-xrd-transit.cern.ch:1094',
+ 'file_below': FILEBELOW,
+ 'file_above': FILEABOVE,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-TRANSIT01":{'redirector':'vocms031.cern.ch:1094',
+ 'file_below': FILEBELOW,
+ 'file_above': FILEABOVE,
+ 'extra_notes':CMSLINK},
+
+ "XRDFED_CMS-TRANSIT02":{'redirector':'vocms032.cern.ch:1094',
+ 'file_below': FILEBELOW,
+ 'file_above': FILEABOVE,
+ 'extra_notes':CMSLINK},
+
+ }
+ signal.alarm(timeout_sec)
+ try:
+ for xrd in services:
+ services[xrd].update(servicename=xrd) # would like to get the servicename= parameter for the function
+ if debug:
+ test_redirector(** services[xrd]) # sequentially
+ else:
+ t = threading.Thread(target=test_redirector,
+ kwargs = services[xrd]) # read: "run a thread with the test function and all the parameters above as arguments"
+ t.start()
+
+
+ except Alarm:
+ print "ERROR: caught overall timeout after "+str(timeout_sec)+"s\n"
+ clear_lock()
+ sys.exit(2)
+ signal.alarm(0)
+ # not cleaning up the proxy files (are shared via the ENV, and we don't want an extra thread to just remove that file, or wait for the individual tests to finish...
+
+if __name__ == '__main__':
+ main()
diff --git a/consistency_check/Check_dbs_invalidations_file.py b/consistency_check/Check_dbs_invalidations_file.py
new file mode 100644
index 0000000..4b4d818
--- /dev/null
+++ b/consistency_check/Check_dbs_invalidations_file.py
@@ -0,0 +1,33 @@
+import os
+import warnings
+warnings.filterwarnings('ignore')
+
+def file_read(fname):
+ content_array = []
+ with open(fname) as f:
+ for line in f:
+ content_array.append(line)
+ return content_array
+
+def which_to_invalidate_in_dbs(fname):
+ invalidate = []
+ files = file_read(fname)
+ for line_ in files:
+ line_ = line_[:-1]
+ os.system("xrdfs cms-xrd-global.cern.ch locate -d -m "+line_+" > aux.txt")
+ sites = file_read("aux.txt")
+ #:print(sites[0])
+ if len(sites) == 0:
+ invalidate.append(line_)
+ return invalidate
+
+def main(list_of_files_):
+ with open('Invalidate_in_DBS.txt', 'w') as f:
+ for item in list_of_files_:
+ f.write("%s\n" % item)
+
+if __name__ == '__main__':
+
+ list_of_files = which_to_invalidate_in_dbs('check_in_dbs.txt')
+ main(list_of_files)
+ print(list_of_files)
diff --git a/consistency_check/Check_files_in_rucio.py b/consistency_check/Check_files_in_rucio.py
new file mode 100644
index 0000000..cbc68aa
--- /dev/null
+++ b/consistency_check/Check_files_in_rucio.py
@@ -0,0 +1,130 @@
+import urllib.request as request
+import urllib
+import json
+import requests
+import os
+import pandas
+from pandas.io.json import json_normalize
+from bs4 import BeautifulSoup as soup
+#import urllib.request.urlopen as uReq
+import warnings
+warnings.filterwarnings('ignore')
+
+class x509RESTSession(object):
+ datasvc = "https://cmsweb.cern.ch/phedex/datasvc/json/prod"
+ datasvc_xml = "https://cmsweb.cern.ch/phedex/datasvc/xml/prod"
+
+ def __init__(self):
+ self._session = requests.Session()
+ home = os.path.expanduser('~/')
+ #os.system("openssl rsa -in "+home+".globus/userkey.pem -out "+home+".globus/userkey2.pem; chmod 0600 "+home+".globus/userkey2.pem")
+ self._session.cert = (home+'.globus/usercert.pem', home+'.globus/userkey2.pem')
+ self._session.verify = os.getenv('X509_CERT_DIR')
+
+ def data(self, dataset):
+ res = self._session.get("%s/data" % self.datasvc, params={'dataset': dataset})
+ resjson = json.loads(res.content)
+ out = []
+ for _instance in resjson["phedex"]["dbs"]:
+ for _dataset in _instance["dataset"]:
+ for _block in _dataset["block"]:
+ for _file in _block["file"]:
+ out.append(
+ {
+ "Dataset": _dataset["name"],
+ "File_name": _file["lfn"],
+ "File_checksum": _file["checksum"]
+
+ }
+ )
+ df = pandas.io.json.json_normalize(out)
+ return df
+ #format_dates(df, ["Time_file_was_created", "Time_block_was_created"])
+
+ def dbsinfo(self, dataset):
+
+ res = self._session.get("https://cmsweb.cern.ch/dbs/prod/global/DBSReader/files?detail=1&dataset="+dataset)
+ resjson = json.loads(res.content)
+ out = []
+ for _instance in resjson:
+ out.append(
+ {
+ "Dataset": _instance["dataset"],
+ "Is_valid": _instance["is_file_valid"],
+ "File_name": _instance["logical_file_name"],
+ "File_checksum": _instance["check_sum"],
+ "last_modified_by": _instance ["last_modified_by"]
+
+ }
+ )
+
+ df = pandas.io.json.json_normalize(out)
+ return df
+
+ def load_html(self, url):
+ uClient = request.urlopen(url)
+ web_site = uClient.read()
+ uClient.close()
+ page = soup(web_site, "html.parser")
+ return page
+
+ def jsonmethod(self, method, **params):
+ return self.getjson(url=self.jsonurl.join(method), params=params)
+
+def get_datasets(web):
+ datasets = []
+ for table in web.findAll('table'):
+ tr = table.findAll('tr')
+ for i in range(len(tr)):
+ if i > 0:
+ casilla = tr[i].findAll('td')
+ if (len(casilla[0].findAll('a')) > 0):
+ datasets.append(str(casilla[1].text))
+ else:
+ datasets.append(str(casilla[0].text))
+ return datasets
+
+def main(sesion, web):
+ datasets = get_datasets(web)
+ print(datasets)
+ invalidate_in_phedex = []
+ invalidate_in_dbs = []
+ dataset_empty_dbs = []
+ dataset_empty_phedex = []
+ for _dataset in datasets:
+ phedex = sesion.data(dataset = _dataset)
+ dbs = sesion.dbsinfo(dataset = _dataset)
+ array = []
+ if dbs.empty:
+ pass
+ else:
+
+ invalidated_in_dbs_by_unified = dbs.loc[dbs['last_modified_by'].str.contains('ogarzonm')]
+ invalidated_in_dbs_by_unified = invalidated_in_dbs_by_unified.loc[dbs['File_name'].str.contains('NANOAOD')]
+ invalidated_in_dbs_by_unified = invalidated_in_dbs_by_unified.loc[invalidated_in_dbs_by_unified['Is_valid'] == 0]
+ array = invalidated_in_dbs_by_unified[['File_name']].to_numpy()
+ print(array)
+ for i in range(len(array)):
+ invalidate_in_phedex.append(array[i])
+
+
+
+
+
+ with open('check_in_dbs.txt', 'w') as f:
+ for item in invalidate_in_dbs:
+ f.write("%s\n" % item)
+ with open('invalidate_in_phedex.txt', 'w') as f:
+ for item in invalidate_in_phedex:
+ f.write("%s\n" % item)
+ with open('dataset_empty_dbs.txt', 'w') as f:
+ for item in dataset_empty_dbs:
+ f.write("%s\n" % item)
+ with open('dataset_empty_phedex.txt', 'w') as f:
+ for item in dataset_empty_phedex:
+ f.write("%s\n" % item)
+
+if __name__ == '__main__':
+ sesion = x509RESTSession()
+ web_info = sesion.load_html(url='https://cms-unified.web.cern.ch/cms-unified/assistance.html')
+ main(sesion, web_info)
diff --git a/consistency_check/File_mismatch_WS.py b/consistency_check/File_mismatch_WS.py
new file mode 100644
index 0000000..4d4621e
--- /dev/null
+++ b/consistency_check/File_mismatch_WS.py
@@ -0,0 +1,134 @@
+import urllib.request as request
+import urllib
+import json
+import requests
+import os
+import pandas
+from pandas.io.json import json_normalize
+from bs4 import BeautifulSoup as soup
+#import urllib.request.urlopen as uReq
+import warnings
+warnings.filterwarnings('ignore')
+
+class x509RESTSession(object):
+ datasvc = "https://cmsweb.cern.ch/phedex/datasvc/json/prod"
+ datasvc_xml = "https://cmsweb.cern.ch/phedex/datasvc/xml/prod"
+
+ def __init__(self):
+ self._session = requests.Session()
+ home = os.path.expanduser('~/')
+ #os.system("openssl rsa -in "+home+".globus/userkey.pem -out "+home+".globus/userkey2.pem; chmod 0600 "+home+".globus/userkey2.pem")
+ self._session.cert = (home+'.globus/usercert.pem', home+'.globus/userkey2.pem')
+ self._session.verify = os.getenv('X509_CERT_DIR')
+
+ def data(self, dataset):
+ res = self._session.get("%s/data" % self.datasvc, params={'dataset': dataset})
+ resjson = json.loads(res.content)
+ out = []
+ for _instance in resjson["phedex"]["dbs"]:
+ for _dataset in _instance["dataset"]:
+ for _block in _dataset["block"]:
+ for _file in _block["file"]:
+ out.append(
+ {
+ "Dataset": _dataset["name"],
+ "File_name": _file["lfn"],
+ "File_checksum": _file["checksum"]
+
+ }
+ )
+ df = pandas.io.json.json_normalize(out)
+ return df
+ #format_dates(df, ["Time_file_was_created", "Time_block_was_created"])
+
+ def dbsinfo(self, dataset):
+
+ res = self._session.get("https://cmsweb.cern.ch/dbs/prod/global/DBSReader/files?detail=1&dataset="+dataset)
+ resjson = json.loads(res.content)
+ out = []
+ for _instance in resjson:
+ out.append(
+ {
+ "Dataset": _instance["dataset"],
+ "Is_valid": _instance["is_file_valid"],
+ "File_name": _instance["logical_file_name"],
+ "File_checksum": _instance["check_sum"],
+ "last_modified_by": _instance ["last_modified_by"]
+
+ }
+ )
+
+ df = pandas.io.json.json_normalize(out)
+ return df
+
+ def load_html(self, url):
+ uClient = request.urlopen(url)
+ web_site = uClient.read()
+ uClient.close()
+ page = soup(web_site, "html.parser")
+ return page
+
+ def jsonmethod(self, method, **params):
+ return self.getjson(url=self.jsonurl.join(method), params=params)
+
+def get_datasets(web):
+ datasets = []
+ for table in web.findAll('table'):
+ tr = table.findAll('tr')
+ for i in range(len(tr)):
+ if i > 0:
+ casilla = tr[i].findAll('td')
+ if (len(casilla[0].findAll('a')) > 0):
+ datasets.append(str(casilla[1].text))
+ else:
+ datasets.append(str(casilla[0].text))
+ return datasets
+
+def main(sesion, web):
+ datasets = get_datasets(web)
+ print(datasets)
+ invalidate_in_phedex = []
+ invalidate_in_dbs = []
+ dataset_empty_dbs = []
+ dataset_empty_phedex = []
+ for _dataset in datasets:
+ phedex = sesion.data(dataset = _dataset)
+ dbs = sesion.dbsinfo(dataset = _dataset)
+ if dbs.empty or phedex.empty:
+ if dbs.empty:
+ dataset_empty_dbs.append(_dataset)
+ if phedex.empty:
+ dataset_empty_phedex.append(_dataset)
+ else:
+ dbs_valid = dbs.loc[dbs['Is_valid'] == 1]
+ if len(phedex['File_name']) == len(dbs_valid['File_name']):
+ pass
+ elif len(phedex['File_name']) > len(dbs_valid['File_name']):
+ invalidated_in_dbs_by_unified = dbs.loc[dbs['last_modified_by'].str.contains('unified')]
+ invalidated_in_dbs_by_unified = invalidated_in_dbs_by_unified.loc[invalidated_in_dbs_by_unified['Is_valid'] == 0]
+ phedex['invalidated_by_unified_in_dbs'] = phedex["File_name"].isin(invalidated_in_dbs_by_unified["File_name"])
+ array = phedex.loc[phedex["invalidated_by_unified_in_dbs"] == True, 'File_name'].to_numpy()
+ for i in range(len(array)):
+ invalidate_in_phedex.append(array[i])
+ elif len(phedex['File_name']) < len(dbs_valid['File_name']):
+ dbs_valid["in_phedex"] = dbs_valid["File_name"].isin(phedex["File_name"])
+ array = dbs_valid.loc[dbs_valid["in_phedex"] == False, 'File_name'].to_numpy()
+ for i in range(len(array)):
+ invalidate_in_dbs.append(array[i])
+ with open('check_in_dbs.txt', 'w') as f:
+ for item in invalidate_in_dbs:
+ f.write("%s\n" % item)
+ with open('invalidate_in_phedex.txt', 'w') as f:
+ for item in invalidate_in_phedex:
+ f.write("%s\n" % item)
+ with open('dataset_empty_dbs.txt', 'w') as f:
+ for item in dataset_empty_dbs:
+ f.write("%s\n" % item)
+ with open('dataset_empty_phedex.txt', 'w') as f:
+ for item in dataset_empty_phedex:
+ f.write("%s\n" % item)
+
+if __name__ == '__main__':
+ sesion = x509RESTSession()
+ web_info = sesion.load_html(url='https://cms-unified.web.cern.ch/cms-unified/assistance.html')
+ main(sesion, web_info)
diff --git a/consistency_check/README.md b/consistency_check/README.md
index 9804fa2..6c910c2 100644
--- a/consistency_check/README.md
+++ b/consistency_check/README.md
@@ -18,3 +18,15 @@ twiki: https://twiki.cern.ch/twiki/bin/view/CMSPublic/CompOpsTransferTeamConsist
```
~/TransferTeam/consistency_check/BDV/BDVParser.sh --verbose --db ~/param/DBParam:Prod/Meric --node T2_CH_CERN --day 20 --output output_dir
```
+#### How to run File_mismatch_WS.py
+
+* Log in lxplus7 vm as usual and create a proxy (See https://github.com/CMSCompOps/TransferTeam/blob/master/scripts/setup.sh).
+* Create and activate a python3 environment. Update the pandas library and then run the script:
+```
+$ python3 -m venv env
+$ source ./env/bin/activate
+$ pip3 install update pandas
+$ python3 File_mismatch_WS.py
+```
+
+* You will get four files: invalidate_in_dbs.txt, invalidate_in_phedex.txt, dataset_empty_dbs.txt and dataset_empty_phedex.txt. Proceed as necessary.
diff --git a/consistency_check/env_set_up.sh b/consistency_check/env_set_up.sh
new file mode 100644
index 0000000..5eea30e
--- /dev/null
+++ b/consistency_check/env_set_up.sh
@@ -0,0 +1,13 @@
+source ~/TransferTeam/scripts/setup.sh
+cd /afs/cern.ch/user/o/ogarzonm/Fork_TransferTeam/TransferTeam/consistency_check/
+python3 -m venv env
+source ./env/bin/activate
+pip3 install update pandas
+pip3 install requests
+python3 File_mismatch_WS.py
+python Check_dbs_invalidations_file.py
+source ~/TransferTeam/scripts/setup_in_rucio.sh
+awk '{system("rucio list-file-replicas cms:"$1)}' check_in_dbs.txt > files_in_rucio_test.txt
+cut -f3 -d '|' files_in_rucio_test.txt > files_in_rucio.txt
+sed -i 's/ //g' files_in_rucio.txt
+diff check_in_dbs.txt files_in_rucio.txt | grep /store/ > files_not_yet_in_rucio.txt