Package Ganga :: Package Lib :: Package MonitoringServices :: Package Dashboard :: Module LCGAthenaMS
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Lib.MonitoringServices.Dashboard.LCGAthenaMS

  1  """Dashboard LCG/Athena Monitoring Service plugin 
  2   
  3  N.B. This code is under development and should not generally be used or relied upon. 
  4   
  5  """ 
  6   
  7  #TODO: disable if backend is not LCG/Athena 
  8   
  9   
 10  from Ganga.Lib.MonitoringServices.Dashboard import CommonUtil 
 11  from Ganga.Lib.MonitoringServices.Dashboard import LCGUtil 
 12  from Ganga.Lib.MonitoringServices.Dashboard import LCGAthenaUtil 
 13   
 14   
 15  from Ganga.Lib.MonitoringServices.Dashboard.LCGMS import LCGMS 
16 -class LCGAthenaMS(LCGMS):
17 """Dashboard LCG/Athena Monitoring Service based on MSG.""" 18
19 - def __init__(self, job_info, config_info):
20 """Construct the Dashboard LCG/Athena Monitoring Service.""" 21 LCGMS.__init__(self, job_info, config_info)
22
23 - def getSandboxModules(self):
24 """Return list of module dependencies.""" 25 import Ganga.Lib.MonitoringServices.Dashboard 26 return LCGMS.getSandboxModules(self) + [ 27 Ganga.Lib.MonitoringServices.Dashboard.LCGAthenaMS, 28 Ganga.Lib.MonitoringServices.Dashboard.LCGAthenaUtil, 29 ]
30 31 #----- event call-backs ----- 32
33 - def submit(self, **opts):
34 """Log submit event on client.""" 35 self._cl_send_meta_messages() 36 LCGMS.submit(self, **opts)
37
38 - def stop(self, exitcode, **opts):
39 """Log stop event on worker node.""" 40 LCGMS.stop(self, exitcode, **opts) 41 # send job-processing-attributes message if job successful 42 if exitcode == 0: 43 message = self._wn_job_processing_attributes_message() 44 self._send(self.config_info['destination_job_processing_attributes'], message)
45
46 - def complete(self, **opts):
47 """Log complete event on client.""" 48 LCGMS.complete(self, **opts) 49 self._cl_send_meta_messages()
50
51 - def fail(self, **opts):
52 """Log fail event on client.""" 53 LCGMS.fail(self, **opts) 54 self._cl_send_meta_messages()
55
56 - def kill(self, **opts):
57 """Log kill event on client.""" 58 LCGMS.kill(self, **opts) 59 self._cl_send_meta_messages()
60
61 - def _cl_send_meta_messages(self):
62 """Send task_meta and job_meta messages on client.""" 63 j = self.job_info # called on client, so job_info is Job object 64 # send task-meta message if this is a master or single job 65 if j.master is None: 66 message = self._cl_task_meta_message() 67 self._send(self.config_info['destination_task_meta'], message) 68 # send job-meta message if this is a sub or single job 69 if not j.subjobs: 70 message = self._cl_job_meta_message() 71 if message['GRIDJOBID'] is None: 72 # This is to handle the temporary workaround in 73 # LCG.master_bulk_updateMonitoringInformation() which results in two 74 # submit messages being sent, one without a grid_job_id. 75 self._log('debug', 'Not sending redundant message without grid_job_id for job %s.' % j.fqid) 76 else: 77 self._send(self.config_info['destination_job_meta'], message)
78 79 #----- message builders ----- 80
81 - def _cl_task_meta_message(self):
82 j = self.job_info # called on client, so job_info is Job object 83 msg = { 84 'APPLICATION': LCGAthenaUtil.cl_application(j), # e.g. ATHENA 85 'APPLICATIONVERSION': LCGAthenaUtil.cl_application_version(j), # e.g. 15.5.1 86 'INPUTDATASET': LCGAthenaUtil.cl_input_dataset(j), # e.g. fdr08_run2.0052283.physics_Muon.merge.AOD.o3_f8_m10 87 'JSTOOL': 'Ganga', # e.g. Ganga, Panda 88 'JSTOOLUI': LCGAthenaUtil.cl_jstoolui(), # hostname of client. e.g. lxplus246.cern.ch 89 'OUTPUTDATASET': LCGAthenaUtil.cl_output_dataset(j),# Unknown at submission. e.g. user09.DavidTuckett.ganga.420.20091125.FZK-LCG2_SCRATCHDISK 90 'OUTPUTSE': LCGAthenaUtil.cl_output_se(j), # Unknown at submission. e.g. FZK-LCG2_SCRATCHDISK 91 'OWNERDN': LCGUtil.cl_ownerdn(), # Grid certificate. e.g. /DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=dtuckett/CN=671431/CN=David Tuckett/CN=proxy 92 'REPORTER': 'ToolUI', # e.g. ToolUI, JobWN 93 'REPORTTIME': CommonUtil.utcnow(), # e.g. 2009-11-25T14:59:24.754249Z 94 'SUBMISSIONTYPE': 'direct', 95 'TARGET': LCGAthenaUtil.cl_target(j), # e.g. CE_xxx,SITE_CSCS-LCG2_DATADISK,SITE_DESY-ZN_DATADISK 96 'TASKNAME': LCGUtil.cl_task_name(j), # e.g. ganga:6702b50a-8a31-4476-8189-62ea5b8e00b3:TrigStudy 97 'TASKTYPE': LCGAthenaUtil.cl_task_type(self.config_info), # e.g. analysis, production, hammercloud etc. 98 '___fqid' : j.fqid, 99 } 100 return msg
101
102 - def _cl_job_meta_message(self):
103 j = self.job_info # called on client, so job_info is Job object 104 msg = { 105 'GRIDJOBID': LCGUtil.cl_grid_job_id(j), # e.g. https://grid-lb0.desy.de:9000/moqY5njFGurEuoDkkJmtBA 106 'INPUTDATASET': LCGAthenaUtil.cl_input_dataset(j), # e.g. fdr08_run2.0052283.physics_Muon.merge.AOD.o3_f8_m10 107 'JOB_ID_INSIDE_THE_TASK': LCGUtil.cl_job_id_inside_the_task(j), # subjob id e.g. 0 108 'NEVENTSREQUESTED': LCGAthenaUtil.cl_nevents_requested(j), # None or non-negative number e.g. 100 109 'OUTPUTDATASET': LCGAthenaUtil.cl_output_dataset(j),# e.g. user09.DavidTuckett.ganga.420.20091125.FZK-LCG2_SCRATCHDISK 110 'OUTPUTSE': LCGAthenaUtil.cl_output_se(j), # Unknown at submission. e.g. FZK-LCG2_SCRATCHDISK 111 'PILOT': 0, # 0 = not pilot, 1 = pilot 112 'PILOTNAME': None, 113 'REPORTER': 'ToolUI', # e.g. ToolUI, JobWN 114 'REPORTTIME': CommonUtil.utcnow(), # e.g. 2009-11-25T14:59:24.754249Z 115 'TARGET': LCGAthenaUtil.cl_target(j), # e.g. CE_xxx,SITE_CSCS-LCG2_DATADISK,SITE_DESY-ZN_DATADISK 116 'TASKNAME': LCGUtil.cl_task_name(j), # e.g. ganga:6702b50a-8a31-4476-8189-62ea5b8e00b3:TrigStudy 117 'UNIQUEJOBID': LCGUtil.cl_unique_job_id(j), # Ganga uuid e.g. 1c08ff3b-904f-4f77-a481-d6fa765813cb 118 '___fqid' : j.fqid, 119 } 120 return msg
121
123 ji = self.job_info # called on worker node, so job_info is dictionary 124 athena_stats = LCGAthenaUtil.wn_load_athena_stats() 125 msg = { 126 'GRIDJOBID': LCGUtil.wn_grid_job_id(ji), # e.g. https://grid-lb0.desy.de:9000/moqY5njFGurEuoDkkJmtBA 127 'JOB_ID_INSIDE_THE_TASK': ji['JOB_ID_INSIDE_THE_TASK'], # subjob id e.g. 0 128 'NEVENTSPROCESSED': athena_stats.get('totalevents'), # number of events processed. e.g. 100 129 'NFILESPROCESSED': athena_stats.get('numfiles'), # number of files processed. e.g. 2 130 'REPORTER': 'JobWN', # e.g. ToolUI, JobWN 131 'REPORTTIME': CommonUtil.utcnow(), # e.g. 2009-11-25T14:59:24.754249Z 132 'SYSTEMTIME': athena_stats.get('systemtime'), # system cpu time in seconds. e.g. 38.45 133 'TASKNAME': ji['TASKNAME'], # e.g. ganga_420_dtuckett@lxplus246.cern.ch:/afs/cern.ch/user/d/dtuckett/gangadir/repository/dtuckett/LocalAMGA 134 'UNIQUEJOBID': ji['UNIQUEJOBID'], # Ganga uuid e.g. 1c08ff3b-904f-4f77-a481-d6fa765813cb 135 'USERTIME': athena_stats.get('usertime'), # user cpu time in seconds. e.g. 479.0 136 'WALLCLOCK': athena_stats.get('wallclock'), # wallclock time in seconds. e.g. 1040 137 '___fqid' : ji['fqid'], 138 } 139 return msg
140