1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 import sys
27 from Ganga.GPIDev.Adapters.IMonitoringService import IMonitoringService
28
30 """ IMonitoringService container:
31 Wrapper object containing a list of IMonitoringService(s) inside and delegating
32 the interface methods to each of them. (composite design pattern)
33 This object is used automatically to transparently wrap the list of monitoring services set in the configuration
34 """
35
36 - def __init__(self, monClasses, jobInfos, configInfos):
37 """Create a new composite monitoring service based on the lists of
38 monitoring classes, jobs and configs (all the same length).
39
40 If this is called in the Ganga client, i.e. from Ganga/GPIDev/MonitoringServices,
41 then jobInfos is a list of Job (all the same), configInfos is a list of
42 Config (specific to each monitoring class).
43
44 If this is called on the worker node, i.e. from the text generated by
45 getWrapperScriptConstructorText(), the jobInfos are dictionaries (specific
46 to each monitoring class) and configInfos are dictionaries of effective
47 config options (specific to each monitoring class).
48 """
49
50 if not (len(monClasses) == len(jobInfos) == len(configInfos)):
51 raise Exception("cannot create monitoring object, list of monitoring classes, jobs and configs are not the same length.")
52
53 IMonitoringService.__init__(self,jobInfos,configInfos)
54
55
56 try:
57 import Ganga.Utility.logging
58 self.logger = Ganga.Utility.logging.getLogger()
59 except ImportError:
60
61
62
63 self.logger = None
64
65
66 self.monMonServices = []
67 for i in range(len(monClasses)):
68 try:
69 monClass = monClasses[i]
70
71 if configInfos[i] is None:
72 monService = monClass(jobInfos[i])
73 else:
74 monService = monClass(jobInfos[i], configInfos[i])
75 self.monMonServices.append(monService)
76 except Exception,e:
77
78 self._log(level="warning",msg="Failed to init %s monitoring service...discarding it" % str(monClass))
79 from Ganga.Utility.logging import log_user_exception
80 log_user_exception(self.logger)
81
82
84 """Application is about to start on the worker node.
85 Called by: job wrapper.
86 """
87 ret = {}
88 for monService in self.monMonServices:
89 try:
90 monClass = str(monService.__class__)
91 ret[monClass] = monService.start(**opts)
92 except Exception,e:
93
94 self._log(level="warning",msg="%s monitoring service failed to *start*: %s" % (monClass, e))
95
96 return ret
97
99 """Application execution is in progress (called periodically, several times a second).
100 Called by: job wrapper. """
101
102 ret = {}
103 for monService in self.monMonServices:
104 try:
105 monClass = str(monService.__class__)
106 ret[monClass] = monService.progress(**opts)
107 except Exception,e:
108
109 self._log(level="warning",msg="%s monitoring service failed to *progress*: %s" % (monClass, e))
110
111 return ret
112
113
114 - def stop(self,exitcode,**opts):
115 """Application execution finished.
116 Called by: job wrapper. """
117
118 ret = {}
119 for monService in self.monMonServices:
120 try:
121 monClass = str(monService.__class__)
122 ret[monClass] = monService.stop(exitcode,**opts)
123 except Exception,e:
124
125 self._log(level="warning",msg="%s monitoring service failed to *stop*: %s" % (monClass, e))
126 return ret
127
129 """Preparation of a job.
130 Called by: ganga client. """
131
132 ret = {}
133 for monService in self.monMonServices:
134 try:
135 monClass = str(monService.__class__)
136 ret[monClass] = monService.prepare(**opts)
137 except Exception,e:
138
139 self._log(level="warning",msg="%s monitoring service failed in job *prepare*" % monClass)
140 return ret
141
143 """Submission of a job.
144 Called by: ganga client. """
145
146 ret = {}
147 for monService in self.monMonServices:
148 try:
149 monClass = str(monService.__class__)
150 ret[monClass] = monService.submitting(**opts)
151 except Exception,e:
152
153 self._log(level="warning",msg="%s monitoring service failed in job *submitting*" % monClass)
154 return ret
155
157 """Submission of a job.
158 Called by: ganga client. """
159
160 ret = {}
161 for monService in self.monMonServices:
162 try:
163 monClass = str(monService.__class__)
164 ret[monClass] = monService.submit(**opts)
165 except Exception,e:
166
167 self._log(level="warning",msg="%s monitoring service failed in job *submit*" % monClass)
168 from Ganga.Utility.logging import log_user_exception
169 log_user_exception(self.logger)
170 return ret
171
173 """Completion of a job.
174 Called by: ganga client. """
175
176 ret = {}
177 for monService in self.monMonServices:
178 try:
179 monClass = str(monService.__class__)
180 ret[monClass] = monService.complete(**opts)
181 except Exception,e:
182
183 self._log(level="warning",msg="%s monitoring service failed in job *complete*" % monClass)
184 return ret
185
186 - def fail(self,**opts):
187 """Failure of a job.
188 Called by: ganga client. """
189
190 ret = {}
191 for monService in self.monMonServices:
192 try:
193 monClass = str(monService.__class__)
194 ret[monClass] = monService.fail(**opts)
195 except Exception,e:
196
197 self._log(level="warning",msg="%s monitoring service failed in job *fail*" % monClass)
198 return ret
199
200 - def kill(self,**opts):
201 """Killing of a job.
202 Called by: ganga client. """
203
204 ret = {}
205 for monService in self.monMonServices:
206 try:
207 monClass = str(monService.__class__)
208 ret[monClass] = monService.kill(**opts)
209 except Exception,e:
210
211 self._log(level="warning",msg="%s monitoring service failed in job *kill*" % monClass)
212 return ret
213
215 """Rollback of a job to new state (caused by error during submission).
216 Called by: ganga client. """
217
218 ret = {}
219 for monService in self.monMonServices:
220 try:
221 monClass = str(monService.__class__)
222 ret[monClass] = monService.rollback(**opts)
223 except Exception,e:
224
225 self._log(level="warning",msg="%s monitoring service failed in job *rollback*" % monClass)
226 return ret
227
249
251 """ Return a static info object which static information about the job
252 at submission time. Called by: ganga client.
253
254 The info object is passed to the contructor. Info
255 object may only contain the standard python types (such as lists,
256 dictionaries, int, strings).
257 Implementation details:
258 return the job info objects as a map for each compound Monitoring Service
259 @see getWrapperScriptConstructorText() method
260 """
261
262 infos = {}
263 for monService in self.monMonServices:
264 try:
265 monClass = str(monService.__class__)
266 infos[monClass] = monService.getJobInfo()
267 except Exception,e:
268
269 self._log(level="warning",msg="%s monitoring service failed in *getJobInfo*: %s" % (monClass,e))
270 return infos
271
273 """ Return a line of python source code which creates the instance of the monitoring service object
274 to be used in the job wrapper script. This method should not be overriden.
275 """
276
277 importText = "from Ganga.Lib.MonitoringServices.Composite import CompositeMonitoringService;"
278 monClasses = ""
279 jobInfos = ""
280 configInfos = ""
281 for monService in self.monMonServices:
282 className = monService.__class__.__name__
283 fqClassName = str(monService.__class__)
284 importText = "%s from %s import %s;" % (importText, monService._mod_name,className)
285 monClasses = "%s %s," % (monClasses,className)
286 jobInfos = "%s %s," % (jobInfos,monService.getJobInfo())
287 config = monService.__class__.getConfig()
288 if config is None:
289 configInfo = None
290 else:
291 configInfo = config.getEffectiveOptions()
292 configInfos = "%s %s," % (configInfos,configInfo)
293
294 text = "def createMonitoringObject(): %s return CompositeMonitoringService([%s],[%s],[%s])\n" % \
295 (importText,monClasses,jobInfos,configInfos)
296
297 return text
298
299 - def _log(self,level='info',msg=''):
300
301 if self.logger and hasattr(self.logger,level):
302 getattr(self.logger,level)(msg)
303 else:
304
305
306
307
308 print >>sys.stderr, '[Ganga %s] %s' % (level,str(msg))
309
310
311
312