Package Ganga :: Package Lib :: Package MonitoringServices :: Package Dashboard :: Module BackendMS
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Lib.MonitoringServices.Dashboard.BackendMS

  1  """Dashboard Backend Monitoring Service plugin 
  2   
  3  N.B. This code is under development and should not generally be used or relied upon. 
  4   
  5  """ 
  6   
  7  #TODO: report LB state change times 
  8   
  9   
 10  from Ganga.Lib.MonitoringServices.Dashboard import CommonUtil 
 11   
 12   
 13  from Ganga.Lib.MonitoringServices.Dashboard.DashboardMS import DashboardMS 
14 -class BackendMS(DashboardMS):
15 """Dashboard Backend Monitoring Service based on MSG.""" 16
17 - def __init__(self, job_info, config_info):
18 """Construct the Dashboard Backend Monitoring Service.""" 19 DashboardMS.__init__(self, job_info, config_info) 20 try: 21 self.backend_name = self.job_info.backend.__class__.__name__ 22 except AttributeError: 23 self.backend_name = self.job_info['BACKEND'] 24 25 path_to_module = "Ganga.Lib.MonitoringServices.Dashboard.%sUtil"%self.backend_name 26 __import__(path_to_module) 27 import sys 28 self.dynamic_util = sys.modules[path_to_module]
29
30 - def getSandboxModules(self):
31 """Return list of module dependencies.""" 32 import Ganga.Lib.MonitoringServices.Dashboard 33 34 if self.backend_name in ['LSF', 'PBS', 'SGE']: 35 return DashboardMS.getSandboxModules(self) + [ 36 Ganga.Lib.MonitoringServices.Dashboard.CommonUtil, 37 Ganga.Lib.MonitoringServices.Dashboard.BackendMS, 38 Ganga.Lib.MonitoringServices.Dashboard.BatchUtil, 39 self.dynamic_util, 40 ] 41 else: 42 return DashboardMS.getSandboxModules(self) + [ 43 Ganga.Lib.MonitoringServices.Dashboard.CommonUtil, 44 Ganga.Lib.MonitoringServices.Dashboard.BackendMS, 45 self.dynamic_util, 46 ]
47
48 - def getJobInfo(self):
49 """Create job_info from Job object.""" 50 j = self.job_info # called on client, so job_info is Job object 51 52 from Ganga.Utility.Config import getConfig 53 batch = getConfig('Configuration')['Batch'] 54 host = getConfig('System')['GANGA_HOSTNAME'] 55 56 jj = self.job_info 57 if jj.master: 58 jj = jj.master 59 60 ji = { 61 'fqid': j.fqid, 62 'EXECUTION_BACKEND': self.dynamic_util.cl_execution_backend(j), 63 'OWNERDN': self.dynamic_util.cl_ownerdn(), 64 'JOB_ID_INSIDE_THE_TASK': self.dynamic_util.cl_job_id_inside_the_task(j), 65 'TASKNAME': self.dynamic_util.cl_task_name(j), 66 'UNIQUEJOBID': self.dynamic_util.cl_unique_job_id(j), 67 'BACKEND' : self.job_info.backend.__class__.__name__, 68 'BATCH' : batch, 69 'GANGA_HOSTNAME' : host, 70 'JOB_UUID' : jj.info.uuid, 71 'JOB_NAME' : jj.name 72 } 73 return ji
74 75 #----- event call-backs ----- 76
77 - def submitting(self, **opts):
78 j = self.job_info # called on client, so job_info is Job object 79 self._log('debug', 'submitting %s' % j.fqid)
80
81 - def prepare(self, **opts):
82 j = self.job_info # called on client, so job_info is Job object 83 self._log('debug', 'prepare %s' % j.fqid)
84
85 - def submit(self, **opts):
86 """Log submit event on client.""" 87 j = self.job_info # called on client, so job_info is Job object 88 self._log('debug', 'submit %s' % j.fqid) 89 # ignore master wrapper jobs 90 if j.subjobs: 91 self._log('debug', 'Not sending unwanted message on submit for master wrapper job %s.' % j.fqid) 92 return 93 # send Ganga submitted job-status message 94 message = self._cl_job_status_message('submitted', 'Ganga', CommonUtil.utcnow()) 95 if message['GRIDJOBID'] is None: 96 # This is to handle the temporary workaround in 97 # IBackend.master_bulk_updateMonitoringInformation() which results in two 98 # submit messages being sent, one without a grid_job_id. 99 self._log('debug', 'Not sending redundant message on submit without grid_job_id for job %s.' % j.fqid) 100 else: 101 self._send(self.config_info['destination_job_status'], message) 102 103 if j.master: 104 j = j.master 105 106 from Ganga.GPIDev import Credentials 107 proxy = Credentials.getCredential('GridProxy') 108 ownerdn = proxy.info('-subject') 109 110 user = 'unknown' 111 #if no error in the proxy -> get the second CN value from right to left 112 if ownerdn.find('ERROR') == -1: 113 if ownerdn.rfind('CN=') > -1: 114 subownerdn = ownerdn[0:ownerdn.rfind('CN=')-1] 115 user = subownerdn[subownerdn.rfind('CN=')+3:].replace(' ','') 116 117 task_name = 'ganga:%s:%s' % (j.info.uuid, j.name,) 118 task_mon_link = "http://dashb-atlas-jobdev.cern.ch/dashboard/templates/index.html#user=%s&from=&till=&timeRange=lastDay&refresh=0&tid=%s&p=1&uparam[]=all" % (user, task_name) 119 120 if j.backend.__class__.__name__ == 'Panda' and len(j.backend.buildjobs) > 0 and j.backend.buildjobs[0].url is not None: 121 j.info.monitoring_links = [(task_mon_link,'dashboard'), (j.backend.buildjobs[0].url, 'panda')] 122 else: 123 j.info.monitoring_links = [(task_mon_link,'dashboard')]
124
125 - def start(self, **opts):
126 """Log start event on worker node.""" 127 ji = self.job_info # called on worker node, so job_info is dictionary 128 self._log('debug', 'start %s' % ji['fqid']) 129 # send Ganga running job-status message 130 message = self._wn_job_status_message('running', 'Ganga', CommonUtil.utcnow()) 131 self._send(self.config_info['destination_job_status'], message)
132
133 - def stop(self, exitcode, **opts):
134 """Log stop event on worker node.""" 135 ji = self.job_info # called on worker node, so job_info is dictionary 136 self._log('debug', 'stop %s' % ji['fqid']) 137 if exitcode == 0: 138 status = 'completed' 139 else: 140 status = 'failed' 141 # send Ganga completed or failed job-status message 142 message = self._wn_job_status_message(status, 'Ganga', CommonUtil.utcnow()) 143 message['JOBEXITCODE'] = exitcode 144 message['JOBEXITREASON'] = None #TODO: how can we know this? 145 self._send(self.config_info['destination_job_status'], message)
146
147 - def complete(self, **opts):
148 """Log complete event on client.""" 149 j = self.job_info # called on client, so job_info is Job object 150 self._log('debug', 'complete %s' % j.fqid) 151 # ignore master wrapper jobs 152 if j.subjobs: 153 self._log('debug', 'Not sending unwanted message on complete for master wrapper job %s.' % j.fqid) 154 return 155 # send LB Done job-status message 156 message = self._cl_job_status_message('completed', 'LB', CommonUtil.utcnow()) 157 message['GRIDEXITCODE'] = self.dynamic_util.cl_grid_exit_code(j) 158 message['GRIDEXITREASON'] = self.dynamic_util.cl_grid_exit_reason(j) 159 self._send(self.config_info['destination_job_status'], message)
160
161 - def fail(self, **opts):
162 """Log fail event on client.""" 163 j = self.job_info # called on client, so job_info is Job object 164 self._log('debug', 'fail %s' % j.fqid) 165 # ignore master wrapper jobs 166 if j.subjobs: 167 self._log('debug', 'Not sending unwanted message on fail for master wrapper job %s.' % j.fqid) 168 return 169 # send LB Done or Aborted job-status message 170 message = self._cl_job_status_message('failed', 'LB', CommonUtil.utcnow()) 171 message['GRIDEXITCODE'] = self.dynamic_util.cl_grid_exit_code(j) 172 message['GRIDEXITREASON'] = self.dynamic_util.cl_grid_exit_reason(j) 173 self._send(self.config_info['destination_job_status'], message)
174
175 - def kill(self, **opts):
176 """Log kill event on client.""" 177 j = self.job_info # called on client, so job_info is Job object 178 self._log('debug', 'kill %s' % j.fqid) 179 # ignore master wrapper jobs 180 if j.subjobs: 181 self._log('debug', 'Not sending unwanted message on kill for master wrapper job %s.' % j.fqid) 182 return 183 # send LB Cancelled job-status message 184 message = self._cl_job_status_message('Cancelled', 'LB', None) 185 self._send(self.config_info['destination_job_status'], message)
186
187 - def rollback(self, **opts):
188 j = self.job_info # called on client, so job_info is Job object 189 self._log('debug', 'rollback %s' % j.fqid)
190 191 #----- message builders ----- 192
193 - def _cl_job_status_message(self, status, status_source, status_start_time=None):
194 # Not null: EXECUTION_BACKEND, GRIDJOBID, JOB_ID_INSIDE_THE_TASK, TASKNAME, UNIQUEJOBID 195 j = self.job_info # called on client, so job_info is Job object 196 msg = { 197 'DESTCE': self.dynamic_util.cl_dest_ce(j), # Actual CE. e.g. ce-3-fzk.gridka.de:2119/jobmanager-pbspro-atlasXS 198 'DESTSITE': None, # Actual site. e.g. FZK-LCG2 199 'DESTWN': None, # Actual worker node hostname. e.g. c01-102-103.gridka.de 200 'EXECUTION_BACKEND': self.dynamic_util.cl_execution_backend(j), # Backend. e.g. LCG 201 'GRIDEXITCODE': None, # e.g. 0 202 'GRIDEXITREASON': None, # e.g. Job terminated successfully 203 'GRIDJOBID': self.dynamic_util.cl_grid_job_id(j), # e.g. https://grid-lb0.desy.de:9000/moqY5njFGurEuoDkkJmtBA 204 'JOBEXITCODE': None, # e.g. 0 205 'JOBEXITREASON': None, 206 'JOB_ID_INSIDE_THE_TASK': self.dynamic_util.cl_job_id_inside_the_task(j), # subjob id e.g. 0 207 'OWNERDN': self.dynamic_util.cl_ownerdn(), # Grid certificate. e.g. /DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=dtuckett/CN=671431/CN=David Tuckett/CN=proxy 208 'REPORTER': 'ToolUI', # e.g. ToolUI, JobWN 209 'REPORTTIME': CommonUtil.utcnow(), # e.g. 2009-11-25T14:59:24.754249Z 210 'STATENAME': status, # e.g. submitted, Done (Success) 211 'STATESOURCE': status_source, # e.g. Ganga, LB 212 'STATESTARTTIME': status_start_time, # e.g. 2009-11-25T14:32:51.428988Z 213 'TASKNAME': self.dynamic_util.cl_task_name(j), # e.g. ganga:6702b50a-8a31-4476-8189-62ea5b8e00b3:TrigStudy 214 'UNIQUEJOBID': self.dynamic_util.cl_unique_job_id(j), # Ganga uuid e.g. 1c08ff3b-904f-4f77-a481-d6fa765813cb 215 '___fqid' : j.fqid, 216 } 217 return msg
218
219 - def _wn_job_status_message(self, status, status_source, status_start_time):
220 # Not null: EXECUTION_BACKEND, GRIDJOBID, JOB_ID_INSIDE_THE_TASK, TASKNAME, UNIQUEJOBID 221 ji = self.job_info # called on worker node, so job_info is dictionary 222 msg = { 223 'DESTCE': self.dynamic_util.wn_dest_ce(ji), 224 'DESTSITE': self.dynamic_util.wn_dest_site(ji), 225 'DESTWN': self.dynamic_util.wn_dest_wn(), 226 'EXECUTION_BACKEND': ji['EXECUTION_BACKEND'], 227 'GRIDEXITCODE': None, 228 'GRIDEXITREASON': None, 229 'GRIDJOBID': self.dynamic_util.wn_grid_job_id(ji), 230 'JOBEXITCODE': None, 231 'JOBEXITREASON': None, 232 'JOB_ID_INSIDE_THE_TASK': ji['JOB_ID_INSIDE_THE_TASK'], 233 'OWNERDN': ji['OWNERDN'], 234 'REPORTER': 'JobWN', 235 'REPORTTIME': CommonUtil.utcnow(), 236 'STATENAME': status, 237 'STATESOURCE': status_source, 238 'STATESTARTTIME': status_start_time, 239 'TASKNAME': ji['TASKNAME'], 240 'UNIQUEJOBID': ji['UNIQUEJOBID'], 241 '___fqid' : ji['fqid'], 242 } 243 return msg
244