Package Ganga :: Package Lib :: Package MonitoringServices :: Package Dashboard :: Module AthenaMS
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Lib.MonitoringServices.Dashboard.AthenaMS

  1  """Dashboard Backend/Athena Monitoring Service plugin 
  2   
  3  N.B. This code is under development and should not generally be used or relied upon. 
  4   
  5  """ 
  6   
  7   
  8  from Ganga.Lib.MonitoringServices.Dashboard import CommonUtil 
  9  from Ganga.Lib.MonitoringServices.Dashboard import AthenaUtil 
 10   
 11   
 12  from Ganga.Lib.MonitoringServices.Dashboard.BackendMS import BackendMS 
13 -class AthenaMS(BackendMS):
14 """Dashboard Backend/Athena Monitoring Service based on MSG.""" 15
16 - def __init__(self, job_info, config_info):
17 """Construct the Dashboard Backend/Athena Monitoring Service.""" 18 BackendMS.__init__(self, job_info, config_info)
19
20 - def getSandboxModules(self):
21 """Return list of module dependencies.""" 22 import Ganga.Lib.MonitoringServices.Dashboard 23 return BackendMS.getSandboxModules(self) + [ 24 Ganga.Lib.MonitoringServices.Dashboard.AthenaMS, 25 Ganga.Lib.MonitoringServices.Dashboard.AthenaUtil, 26 ]
27 28 #----- event call-backs ----- 29
30 - def submit(self, **opts):
31 """Log submit event on client.""" 32 self._cl_send_meta_messages() 33 BackendMS.submit(self, **opts)
34
35 - def stop(self, exitcode, **opts):
36 """Log stop event on worker node.""" 37 BackendMS.stop(self, exitcode, **opts) 38 # send job-processing-attributes message if job successful 39 if exitcode == 0: 40 message = self._wn_job_processing_attributes_message() 41 self._send(self.config_info['destination_job_processing_attributes'], message)
42
43 - def complete(self, **opts):
44 """Log complete event on client.""" 45 BackendMS.complete(self, **opts) 46 self._cl_send_meta_messages()
47
48 - def fail(self, **opts):
49 """Log fail event on client.""" 50 BackendMS.fail(self, **opts) 51 self._cl_send_meta_messages()
52
53 - def kill(self, **opts):
54 """Log kill event on client.""" 55 BackendMS.kill(self, **opts) 56 self._cl_send_meta_messages()
57
58 - def _cl_send_meta_messages(self):
59 """Send task_meta and job_meta messages on client.""" 60 j = self.job_info # called on client, so job_info is Job object 61 # send task-meta message if this is a master or single job 62 if j.master is None: 63 message = self._cl_task_meta_message() 64 self._send(self.config_info['destination_task_meta'], message) 65 # send job-meta message if this is a sub or single job 66 if not j.subjobs: 67 message = self._cl_job_meta_message() 68 if message['GRIDJOBID'] is None: 69 # This is to handle the temporary workaround in 70 # Backend.master_bulk_updateMonitoringInformation() which results in two 71 # submit messages being sent, one without a grid_job_id. 72 self._log('debug', 'Not sending redundant message without grid_job_id for job %s.' % j.fqid) 73 else: 74 self._send(self.config_info['destination_job_meta'], message)
75 76 #----- message builders ----- 77
78 - def _cl_task_meta_message(self):
79 j = self.job_info # called on client, so job_info is Job object 80 msg = { 81 'APPLICATION': AthenaUtil.cl_application(j), # e.g. ATHENA 82 'APPLICATIONVERSION': AthenaUtil.cl_application_version(j), # e.g. 15.5.1 83 'INPUTDATASET': AthenaUtil.cl_input_dataset(j), # e.g. fdr08_run2.0052283.physics_Muon.merge.AOD.o3_f8_m10 84 'JSTOOL': 'Ganga', # e.g. Ganga, Panda 85 'JSTOOLUI': AthenaUtil.cl_jstoolui(), # hostname of client. e.g. lxplus246.cern.ch 86 'OUTPUTDATASET': AthenaUtil.cl_output_dataset(j),# Unknown at submission. e.g. user09.DavidTuckett.ganga.420.20091125.FZK-LCG2_SCRATCHDISK 87 'OUTPUTSE': AthenaUtil.cl_output_se(j), # Unknown at submission. e.g. FZK-LCG2_SCRATCHDISK 88 'OWNERDN': self.dynamic_util.cl_ownerdn(), # Grid certificate. e.g. /DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=dtuckett/CN=671431/CN=David Tuckett/CN=proxy 89 'REPORTER': 'ToolUI', # e.g. ToolUI, JobWN 90 'REPORTTIME': CommonUtil.utcnow(), # e.g. 2009-11-25T14:59:24.754249Z 91 'SUBMISSIONTYPE': 'direct', 92 'TARGET': AthenaUtil.cl_target(j), # e.g. CE_xxx,SITE_CSCS-LCG2_DATADISK,SITE_DESY-ZN_DATADISK 93 'TASKNAME': self.dynamic_util.cl_task_name(j), # e.g. ganga:6702b50a-8a31-4476-8189-62ea5b8e00b3:TrigStudy 94 'TASKTYPE': AthenaUtil.cl_task_type(self.config_info), # e.g. analysis, production, hammercloud etc. 95 '___fqid' : j.fqid, 96 } 97 return msg
98
99 - def _cl_job_meta_message(self):
100 j = self.job_info # called on client, so job_info is Job object 101 msg = { 102 'GRIDJOBID': self.dynamic_util.cl_grid_job_id(j), # e.g. https://grid-lb0.desy.de:9000/moqY5njFGurEuoDkkJmtBA 103 'INPUTDATASET': AthenaUtil.cl_input_dataset(j), # e.g. fdr08_run2.0052283.physics_Muon.merge.AOD.o3_f8_m10 104 'JOB_ID_INSIDE_THE_TASK': self.dynamic_util.cl_job_id_inside_the_task(j), # subjob id e.g. 0 105 'NEVENTSREQUESTED': AthenaUtil.cl_nevents_requested(j), # None or non-negative number e.g. 100 106 'OUTPUTDATASET': AthenaUtil.cl_output_dataset(j),# e.g. user09.DavidTuckett.ganga.420.20091125.FZK-LCG2_SCRATCHDISK 107 'OUTPUTSE': AthenaUtil.cl_output_se(j), # Unknown at submission. e.g. FZK-LCG2_SCRATCHDISK 108 'PILOT': 0, # 0 = not pilot, 1 = pilot 109 'PILOTNAME': None, 110 'REPORTER': 'ToolUI', # e.g. ToolUI, JobWN 111 'REPORTTIME': CommonUtil.utcnow(), # e.g. 2009-11-25T14:59:24.754249Z 112 'TARGET': AthenaUtil.cl_target(j), # e.g. CE_xxx,SITE_CSCS-LCG2_DATADISK,SITE_DESY-ZN_DATADISK 113 'TASKNAME': self.dynamic_util.cl_task_name(j), # e.g. ganga:6702b50a-8a31-4476-8189-62ea5b8e00b3:TrigStudy 114 'UNIQUEJOBID': self.dynamic_util.cl_unique_job_id(j), # Ganga uuid e.g. 1c08ff3b-904f-4f77-a481-d6fa765813cb 115 '___fqid' : j.fqid, 116 } 117 return msg
118
120 ji = self.job_info # called on worker node, so job_info is dictionary 121 athena_stats = AthenaUtil.wn_load_athena_stats() 122 msg = { 123 'GRIDJOBID': self.dynamic_util.wn_grid_job_id(ji), # e.g. https://grid-lb0.desy.de:9000/moqY5njFGurEuoDkkJmtBA 124 'JOB_ID_INSIDE_THE_TASK': ji['JOB_ID_INSIDE_THE_TASK'], # subjob id e.g. 0 125 'NEVENTSPROCESSED': athena_stats.get('totalevents'), # number of events processed. e.g. 100 126 'NFILESPROCESSED': athena_stats.get('numfiles'), # number of files processed. e.g. 2 127 'REPORTER': 'JobWN', # e.g. ToolUI, JobWN 128 'REPORTTIME': CommonUtil.utcnow(), # e.g. 2009-11-25T14:59:24.754249Z 129 'SYSTEMTIME': athena_stats.get('systemtime'), # system cpu time in seconds. e.g. 38.45 130 'TASKNAME': ji['TASKNAME'], # e.g. ganga_420_dtuckett@lxplus246.cern.ch:/afs/cern.ch/user/d/dtuckett/gangadir/repository/dtuckett/LocalAMGA 131 'UNIQUEJOBID': ji['UNIQUEJOBID'], # Ganga uuid e.g. 1c08ff3b-904f-4f77-a481-d6fa765813cb 132 'USERTIME': athena_stats.get('usertime'), # user cpu time in seconds. e.g. 479.0 133 'WALLCLOCK': athena_stats.get('wallclock'), # wallclock time in seconds. e.g. 1040 134 '___fqid' : ji['fqid'], 135 } 136 return msg
137