1 """Dashboard Backend Monitoring Service plugin
2
3 N.B. This code is under development and should not generally be used or relied upon.
4
5 """
6
7
8
9
10 from Ganga.Lib.MonitoringServices.Dashboard import CommonUtil
11
12
13 from Ganga.Lib.MonitoringServices.Dashboard.DashboardMS import DashboardMS
15 """Dashboard Backend Monitoring Service based on MSG."""
16
17 - def __init__(self, job_info, config_info):
18 """Construct the Dashboard Backend Monitoring Service."""
19 DashboardMS.__init__(self, job_info, config_info)
20 try:
21 self.backend_name = self.job_info.backend.__class__.__name__
22 except AttributeError:
23 self.backend_name = self.job_info['BACKEND']
24
25 path_to_module = "Ganga.Lib.MonitoringServices.Dashboard.%sUtil"%self.backend_name
26 __import__(path_to_module)
27 import sys
28 self.dynamic_util = sys.modules[path_to_module]
29
47
49 """Create job_info from Job object."""
50 j = self.job_info
51
52 from Ganga.Utility.Config import getConfig
53 batch = getConfig('Configuration')['Batch']
54 host = getConfig('System')['GANGA_HOSTNAME']
55
56 jj = self.job_info
57 if jj.master:
58 jj = jj.master
59
60 ji = {
61 'fqid': j.fqid,
62 'EXECUTION_BACKEND': self.dynamic_util.cl_execution_backend(j),
63 'OWNERDN': self.dynamic_util.cl_ownerdn(),
64 'JOB_ID_INSIDE_THE_TASK': self.dynamic_util.cl_job_id_inside_the_task(j),
65 'TASKNAME': self.dynamic_util.cl_task_name(j),
66 'UNIQUEJOBID': self.dynamic_util.cl_unique_job_id(j),
67 'BACKEND' : self.job_info.backend.__class__.__name__,
68 'BATCH' : batch,
69 'GANGA_HOSTNAME' : host,
70 'JOB_UUID' : jj.info.uuid,
71 'JOB_NAME' : jj.name
72 }
73 return ji
74
75
76
78 j = self.job_info
79 self._log('debug', 'submitting %s' % j.fqid)
80
82 j = self.job_info
83 self._log('debug', 'prepare %s' % j.fqid)
84
86 """Log submit event on client."""
87 j = self.job_info
88 self._log('debug', 'submit %s' % j.fqid)
89
90 if j.subjobs:
91 self._log('debug', 'Not sending unwanted message on submit for master wrapper job %s.' % j.fqid)
92 return
93
94 message = self._cl_job_status_message('submitted', 'Ganga', CommonUtil.utcnow())
95 if message['GRIDJOBID'] is None:
96
97
98
99 self._log('debug', 'Not sending redundant message on submit without grid_job_id for job %s.' % j.fqid)
100 else:
101 self._send(self.config_info['destination_job_status'], message)
102
103 if j.master:
104 j = j.master
105
106 from Ganga.GPIDev import Credentials
107 proxy = Credentials.getCredential('GridProxy')
108 ownerdn = proxy.info('-subject')
109
110 user = 'unknown'
111
112 if ownerdn.find('ERROR') == -1:
113 if ownerdn.rfind('CN=') > -1:
114 subownerdn = ownerdn[0:ownerdn.rfind('CN=')-1]
115 user = subownerdn[subownerdn.rfind('CN=')+3:].replace(' ','')
116
117 task_name = 'ganga:%s:%s' % (j.info.uuid, j.name,)
118 task_mon_link = "http://dashb-atlas-jobdev.cern.ch/dashboard/templates/index.html#user=%s&from=&till=&timeRange=lastDay&refresh=0&tid=%s&p=1&uparam[]=all" % (user, task_name)
119
120 if j.backend.__class__.__name__ == 'Panda' and len(j.backend.buildjobs) > 0 and j.backend.buildjobs[0].url is not None:
121 j.info.monitoring_links = [(task_mon_link,'dashboard'), (j.backend.buildjobs[0].url, 'panda')]
122 else:
123 j.info.monitoring_links = [(task_mon_link,'dashboard')]
124
125 - def start(self, **opts):
126 """Log start event on worker node."""
127 ji = self.job_info
128 self._log('debug', 'start %s' % ji['fqid'])
129
130 message = self._wn_job_status_message('running', 'Ganga', CommonUtil.utcnow())
131 self._send(self.config_info['destination_job_status'], message)
132
133 - def stop(self, exitcode, **opts):
134 """Log stop event on worker node."""
135 ji = self.job_info
136 self._log('debug', 'stop %s' % ji['fqid'])
137 if exitcode == 0:
138 status = 'completed'
139 else:
140 status = 'failed'
141
142 message = self._wn_job_status_message(status, 'Ganga', CommonUtil.utcnow())
143 message['JOBEXITCODE'] = exitcode
144 message['JOBEXITREASON'] = None
145 self._send(self.config_info['destination_job_status'], message)
146
148 """Log complete event on client."""
149 j = self.job_info
150 self._log('debug', 'complete %s' % j.fqid)
151
152 if j.subjobs:
153 self._log('debug', 'Not sending unwanted message on complete for master wrapper job %s.' % j.fqid)
154 return
155
156 message = self._cl_job_status_message('completed', 'LB', CommonUtil.utcnow())
157 message['GRIDEXITCODE'] = self.dynamic_util.cl_grid_exit_code(j)
158 message['GRIDEXITREASON'] = self.dynamic_util.cl_grid_exit_reason(j)
159 self._send(self.config_info['destination_job_status'], message)
160
161 - def fail(self, **opts):
174
175 - def kill(self, **opts):
176 """Log kill event on client."""
177 j = self.job_info
178 self._log('debug', 'kill %s' % j.fqid)
179
180 if j.subjobs:
181 self._log('debug', 'Not sending unwanted message on kill for master wrapper job %s.' % j.fqid)
182 return
183
184 message = self._cl_job_status_message('Cancelled', 'LB', None)
185 self._send(self.config_info['destination_job_status'], message)
186
188 j = self.job_info
189 self._log('debug', 'rollback %s' % j.fqid)
190
191
192
194
195 j = self.job_info
196 msg = {
197 'DESTCE': self.dynamic_util.cl_dest_ce(j),
198 'DESTSITE': None,
199 'DESTWN': None,
200 'EXECUTION_BACKEND': self.dynamic_util.cl_execution_backend(j),
201 'GRIDEXITCODE': None,
202 'GRIDEXITREASON': None,
203 'GRIDJOBID': self.dynamic_util.cl_grid_job_id(j),
204 'JOBEXITCODE': None,
205 'JOBEXITREASON': None,
206 'JOB_ID_INSIDE_THE_TASK': self.dynamic_util.cl_job_id_inside_the_task(j),
207 'OWNERDN': self.dynamic_util.cl_ownerdn(),
208 'REPORTER': 'ToolUI',
209 'REPORTTIME': CommonUtil.utcnow(),
210 'STATENAME': status,
211 'STATESOURCE': status_source,
212 'STATESTARTTIME': status_start_time,
213 'TASKNAME': self.dynamic_util.cl_task_name(j),
214 'UNIQUEJOBID': self.dynamic_util.cl_unique_job_id(j),
215 '___fqid' : j.fqid,
216 }
217 return msg
218
220
221 ji = self.job_info
222 msg = {
223 'DESTCE': self.dynamic_util.wn_dest_ce(ji),
224 'DESTSITE': self.dynamic_util.wn_dest_site(ji),
225 'DESTWN': self.dynamic_util.wn_dest_wn(),
226 'EXECUTION_BACKEND': ji['EXECUTION_BACKEND'],
227 'GRIDEXITCODE': None,
228 'GRIDEXITREASON': None,
229 'GRIDJOBID': self.dynamic_util.wn_grid_job_id(ji),
230 'JOBEXITCODE': None,
231 'JOBEXITREASON': None,
232 'JOB_ID_INSIDE_THE_TASK': ji['JOB_ID_INSIDE_THE_TASK'],
233 'OWNERDN': ji['OWNERDN'],
234 'REPORTER': 'JobWN',
235 'REPORTTIME': CommonUtil.utcnow(),
236 'STATENAME': status,
237 'STATESOURCE': status_source,
238 'STATESTARTTIME': status_start_time,
239 'TASKNAME': ji['TASKNAME'],
240 'UNIQUEJOBID': ji['UNIQUEJOBID'],
241 '___fqid' : ji['fqid'],
242 }
243 return msg
244