@@ -270,6 +270,91 @@ def capa_gwmsmon_usage():
270270
271271
272272
273+ def capa_startd_usage ():
274+ # ##################################################### #
275+ # return dictionary of max cores used during last month #
276+ # ##################################################### #
277+ URL_GRAFANA = "https://monit-grafana.cern.ch/api/datasources/proxy/8332/_msearch?filter_path=responses.aggregations.cpus_per_site.buckets.key,responses.aggregations.cpus_per_site.buckets.max_cpus_a.value,responses.aggregations.cpus_per_site.buckets.max_cpus_b.value"
278+ HDR_GRAFANA = {'Authorization' : "Bearer eyJrIjoiWGdESVczR28ySGVVNFJMMHpRQ0FiM25EM0dKQm5HNTEiLCJuIjoiZnRzX2NsaSIsImlkIjoyNX0=" , 'Content-Type' : "application/json; charset=UTF-8" }
279+ #
280+ siteRegex = re .compile (r"T\d_[A-Z]{2,2}_\w+" )
281+ #
282+ logging .info ("Fetching max core usage count of startds via Grafana" )
283+
284+ # prepare Lucene ElasticSearch query:
285+ # ===================================
286+ today = int ( time .time () / 86400 )
287+ startTIS = ( today - 30 ) * 86400
288+ limitTIS = today * 86400
289+ queryString = ("{\" search_type\" :\" query_then_fetch\" ,\" index\" :[\" monit" +
290+ "_prod_cms_raw_si_condor_*\" ],\" ignore_unavailable\" :true" +
291+ "}\n {\" query\" :{\" bool\" :{\" must\" :[{\" match_phrase\" :{\" " +
292+ "metadata.topic\" :\" cms_raw_si_condor_startd\" }}],\" must_" +
293+ "not\" :[{\" match_phrase\" :{\" data.payload.SlotType\" :\" Dy" +
294+ "namic\" }}],\" filter\" :{\" range\" :{\" metadata.timestamp\" " +
295+ ":{\" gte\" :%d,\" lt\" :%d,\" format\" :\" epoch_second\" }}}}}," +
296+ "\" size\" :0,\" aggs\" :{\" cpus_per_site\" :{\" terms\" :{\" fie" +
297+ "ld\" :\" data.payload.GLIDEIN_CMSSite\" ,\" size\" :512},\" ag" +
298+ "gs\" :{\" cpus_per_report_a\" :{\" date_histogram\" :{\" field" +
299+ "\" :\" metadata.timestamp\" ,\" interval\" :\" 360s\" ,\" offset" +
300+ "\" :\" 0s\" },\" aggs\" :{\" cpus\" :{\" sum\" :{\" field\" :\" data" +
301+ ".payload.TotalSlotCpus\" }}}},\" cpus_per_report_b\" :{\" da" +
302+ "te_histogram\" :{\" field\" :\" metadata.timestamp\" ,\" inter" +
303+ "val\" :\" 360s\" ,\" offset\" :\" 180s\" },\" aggs\" :{\" cpus\" :{" +
304+ "\" sum\" :{\" field\" :\" data.payload.TotalSlotCpus\" }}}},\" " +
305+ "max_cpus_a\" :{\" max_bucket\" :{\" buckets_path\" :\" cpus_pe" +
306+ "r_report_a>cpus\" }},\" max_cpus_b\" :{\" max_bucket\" :{\" bu" +
307+ "ckets_path\" :\" cpus_per_report_b>cpus\" }}}}}}\n " ) % \
308+ (startTIS , limitTIS )
309+
310+ # fetch startd max core usage count from ElasticSearch:
311+ # =====================================================
312+ try :
313+ requestObj = urllib .request .Request (URL_GRAFANA ,
314+ data = queryString .encode ("utf-8" ),
315+ headers = HDR_GRAFANA , method = "POST" )
316+ with urllib .request .urlopen ( requestObj , timeout = 90 ) as responseObj :
317+ urlCharset = responseObj .headers .get_content_charset ()
318+ if urlCharset is None :
319+ urlCharset = "utf-8"
320+ myData = responseObj .read ().decode ( urlCharset )
321+ del urlCharset
322+ #
323+ # sanity check:
324+ if ( len (myData ) < 2048 ):
325+ raise ValueError ("Startd max core usage data failed sanity check" )
326+ #
327+ # decode JSON:
328+ myJson = json .loads ( myData )
329+ del myData
330+ #
331+ # loop over entries and add max one month usage to dictionary:
332+ usageDict = {}
333+ for myRspns in myJson ['responses' ]:
334+ for myBuckt in myRspns ['aggregations' ]['cpus_per_site' ]['buckets' ]:
335+ try :
336+ mySite = myBuckt ['key' ]
337+ if ( siteRegex .match ( mySite ) is None ):
338+ continue
339+ myUsage = int ( max ( myBuckt ['max_cpus_a' ]['value' ],
340+ myBuckt ['max_cpus_b' ]['value' ] ) )
341+ except KeyError :
342+ logging .warning ("Missing key in ES bucket, skipping, %s" %
343+ str (excptn ))
344+ continue
345+ logging .debug ("Core Usage for \" %s\" : %d" % (mySite , myUsage ))
346+ usageDict [ mySite ] = myUsage
347+ del myJson
348+ except urllib .error .URLError as excptn :
349+ logging .error ("Failed to query ElasticSearch via Grafana, %s" %
350+ str (excptn ))
351+ return {}
352+ #
353+ return usageDict
354+ # ########################################################################### #
355+
356+
357+
273358def capa_dynamo_quota ():
274359 # ##################################################################### #
275360 # return dictionary of current experiment disk quota settings in Dynamo #
@@ -975,10 +1060,10 @@ def capa_compare_metrics(metricDict, capacityList):
9751060 action = "store_true" ,
9761061 help = "fetch/update experiment disk quota fr" +
9771062 "om Dynamo/DDM" )
978- parserObj .add_argument ("-g " , dest = "usage" , default = False ,
1063+ parserObj .add_argument ("-u " , dest = "usage" , default = False ,
9791064 action = "store_true" ,
980- help = "fetch/update max core usage from gWMS " +
981- "mon/Grafana " )
1065+ help = "fetch/update max core usage from Elas " +
1066+ "ticSearch/startd data " )
9821067 parserObj .add_argument ("-f" , dest = "file" , default = None , const = "" ,
9831068 action = "store" , nargs = "?" ,
9841069 metavar = "filepath" ,
@@ -1041,7 +1126,7 @@ def capa_compare_metrics(metricDict, capacityList):
10411126 # fetch maximum number of cores provided by sites if requested:
10421127 usageDict = None
10431128 if ( argStruct .usage ):
1044- usageDict = capa_gwmsmon_usage ()
1129+ usageDict = capa_startd_usage ()
10451130
10461131
10471132
0 commit comments