Package Ganga :: Package Lib :: Package MonitoringServices :: Module Composite
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Lib.MonitoringServices.Composite

  1  ################################################################################ 
  2  # Ganga - a computational task management tool for easy access to Grid resources 
  3  # http://cern.ch/ganga 
  4  # 
  5  # $Id: Composite.py,v 1.2 2009-06-09 14:31:42 moscicki Exp $ 
  6  # 
  7  # Copyright (C) 2003-2007 The Ganga Project 
  8  # 
  9  # This file is part of Ganga.  
 10  # 
 11  # Ganga is free software; you can redistribute it and/or modify 
 12  # it under the terms of the GNU General Public License as published by 
 13  # the Free Software Foundation; either version 2 of the License, or 
 14  # (at your option) any later version. 
 15  # 
 16  # Ganga is distributed in the hope that it will be useful, 
 17  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 18  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 19  # GNU General Public License for more details. 
 20   
 21  # You should have received a copy of the GNU General Public License 
 22  # along with this program; if not, write to the Free Software 
 23  # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA 
 24  ################################################################################ 
 25   
 26  import sys 
 27  from Ganga.GPIDev.Adapters.IMonitoringService import IMonitoringService 
 28   
29 -class CompositeMonitoringService(IMonitoringService):
30 """ IMonitoringService container: 31 Wrapper object containing a list of IMonitoringService(s) inside and delegating 32 the interface methods to each of them. (composite design pattern) 33 This object is used automatically to transparently wrap the list of monitoring services set in the configuration 34 """ 35
36 - def __init__(self, monClasses, jobInfos, configInfos):
37 """Create a new composite monitoring service based on the lists of 38 monitoring classes, jobs and configs (all the same length). 39 40 If this is called in the Ganga client, i.e. from Ganga/GPIDev/MonitoringServices, 41 then jobInfos is a list of Job (all the same), configInfos is a list of 42 Config (specific to each monitoring class). 43 44 If this is called on the worker node, i.e. from the text generated by 45 getWrapperScriptConstructorText(), the jobInfos are dictionaries (specific 46 to each monitoring class) and configInfos are dictionaries of effective 47 config options (specific to each monitoring class). 48 """ 49 50 if not (len(monClasses) == len(jobInfos) == len(configInfos)): 51 raise Exception("cannot create monitoring object, list of monitoring classes, jobs and configs are not the same length.") 52 53 IMonitoringService.__init__(self,jobInfos,configInfos) 54 55 #init the logger 56 try: 57 import Ganga.Utility.logging 58 self.logger = Ganga.Utility.logging.getLogger() 59 except ImportError: 60 #on the worker node we don't have access to Ganga logging facilities 61 #so we simple print out the log message 62 #@see self._log() 63 self.logger = None 64 65 #init the monitoring services 66 self.monMonServices = [] 67 for i in range(len(monClasses)): 68 try: 69 monClass = monClasses[i] 70 # allow for existing monitoring classes which do not take config_info in constructor 71 if configInfos[i] is None: 72 monService = monClass(jobInfos[i]) 73 else: 74 monService = monClass(jobInfos[i], configInfos[i]) 75 self.monMonServices.append(monService) 76 except Exception,e: 77 #discard errors in initialization of monitoring services 78 self._log(level="warning",msg="Failed to init %s monitoring service...discarding it" % str(monClass)) 79 from Ganga.Utility.logging import log_user_exception 80 log_user_exception(self.logger)
81 82
83 - def start(self, **opts):
84 """Application is about to start on the worker node. 85 Called by: job wrapper. 86 """ 87 ret = {} 88 for monService in self.monMonServices: 89 try: 90 monClass = str(monService.__class__) 91 ret[monClass] = monService.start(**opts) 92 except Exception,e: 93 #discard errors in initialization of monitoring services 94 self._log(level="warning",msg="%s monitoring service failed to *start*: %s" % (monClass, e)) 95 96 return ret
97
98 - def progress(self,**opts):
99 """Application execution is in progress (called periodically, several times a second). 100 Called by: job wrapper. """ 101 102 ret = {} 103 for monService in self.monMonServices: 104 try: 105 monClass = str(monService.__class__) 106 ret[monClass] = monService.progress(**opts) 107 except Exception,e: 108 #discard errors in initialization of monitoring services 109 self._log(level="warning",msg="%s monitoring service failed to *progress*: %s" % (monClass, e)) 110 111 return ret
112 113
114 - def stop(self,exitcode,**opts):
115 """Application execution finished. 116 Called by: job wrapper. """ 117 118 ret = {} 119 for monService in self.monMonServices: 120 try: 121 monClass = str(monService.__class__) 122 ret[monClass] = monService.stop(exitcode,**opts) 123 except Exception,e: 124 #discard errors in initialization of monitoring services 125 self._log(level="warning",msg="%s monitoring service failed to *stop*: %s" % (monClass, e)) 126 return ret
127
128 - def prepare(self,**opts):
129 """Preparation of a job. 130 Called by: ganga client. """ 131 132 ret = {} 133 for monService in self.monMonServices: 134 try: 135 monClass = str(monService.__class__) 136 ret[monClass] = monService.prepare(**opts) 137 except Exception,e: 138 #discard errors in initialization of monitoring services 139 self._log(level="warning",msg="%s monitoring service failed in job *prepare*" % monClass) 140 return ret
141
142 - def submitting(self,**opts):
143 """Submission of a job. 144 Called by: ganga client. """ 145 146 ret = {} 147 for monService in self.monMonServices: 148 try: 149 monClass = str(monService.__class__) 150 ret[monClass] = monService.submitting(**opts) 151 except Exception,e: 152 #discard errors in initialization of monitoring services 153 self._log(level="warning",msg="%s monitoring service failed in job *submitting*" % monClass) 154 return ret
155
156 - def submit(self,**opts):
157 """Submission of a job. 158 Called by: ganga client. """ 159 160 ret = {} 161 for monService in self.monMonServices: 162 try: 163 monClass = str(monService.__class__) 164 ret[monClass] = monService.submit(**opts) 165 except Exception,e: 166 #discard errors in initialization of monitoring services 167 self._log(level="warning",msg="%s monitoring service failed in job *submit*" % monClass) 168 from Ganga.Utility.logging import log_user_exception 169 log_user_exception(self.logger) 170 return ret
171
172 - def complete(self,**opts):
173 """Completion of a job. 174 Called by: ganga client. """ 175 176 ret = {} 177 for monService in self.monMonServices: 178 try: 179 monClass = str(monService.__class__) 180 ret[monClass] = monService.complete(**opts) 181 except Exception,e: 182 #discard errors in initialization of monitoring services 183 self._log(level="warning",msg="%s monitoring service failed in job *complete*" % monClass) 184 return ret
185
186 - def fail(self,**opts):
187 """Failure of a job. 188 Called by: ganga client. """ 189 190 ret = {} 191 for monService in self.monMonServices: 192 try: 193 monClass = str(monService.__class__) 194 ret[monClass] = monService.fail(**opts) 195 except Exception,e: 196 #discard errors in initialization of monitoring services 197 self._log(level="warning",msg="%s monitoring service failed in job *fail*" % monClass) 198 return ret
199
200 - def kill(self,**opts):
201 """Killing of a job. 202 Called by: ganga client. """ 203 204 ret = {} 205 for monService in self.monMonServices: 206 try: 207 monClass = str(monService.__class__) 208 ret[monClass] = monService.kill(**opts) 209 except Exception,e: 210 #discard errors in initialization of monitoring services 211 self._log(level="warning",msg="%s monitoring service failed in job *kill*" % monClass) 212 return ret
213
214 - def rollback(self,**opts):
215 """Rollback of a job to new state (caused by error during submission). 216 Called by: ganga client. """ 217 218 ret = {} 219 for monService in self.monMonServices: 220 try: 221 monClass = str(monService.__class__) 222 ret[monClass] = monService.rollback(**opts) 223 except Exception,e: 224 #discard errors in initialization of monitoring services 225 self._log(level="warning",msg="%s monitoring service failed in job *rollback*" % monClass) 226 return ret
227
228 - def getSandboxModules(self):
229 """ Get the list of module dependencies of this monitoring module. 230 Called by: ganga client. 231 """ 232 233 #modules required by this container itself 234 import Ganga.Lib.MonitoringServices 235 modules = IMonitoringService.getSandboxModules(self) + \ 236 [Ganga, Ganga.Lib, Ganga.Lib.MonitoringServices, Ganga.Lib.MonitoringServices.Composite] 237 238 for monService in self.monMonServices: 239 try: 240 monClass = str(monService.__class__) 241 #TODO: 242 # the list might contain duplicate elements. 243 # does this cause troubles on the upper levels? 244 modules.extend(monService.getSandboxModules()) 245 except Exception,e: 246 #discard errors in initialization of monitoring services 247 self._log(level="warning",msg="%s monitoring service failed in *getSandboxModules* ... ignoring it." % monClass) 248 return modules
249
250 - def getJobInfo(self):
251 """ Return a static info object which static information about the job 252 at submission time. Called by: ganga client. 253 254 The info object is passed to the contructor. Info 255 object may only contain the standard python types (such as lists, 256 dictionaries, int, strings). 257 Implementation details: 258 return the job info objects as a map for each compound Monitoring Service 259 @see getWrapperScriptConstructorText() method 260 """ 261 262 infos = {} 263 for monService in self.monMonServices: 264 try: 265 monClass = str(monService.__class__) 266 infos[monClass] = monService.getJobInfo() 267 except Exception,e: 268 #discard errors in initialization of monitoring services 269 self._log(level="warning",msg="%s monitoring service failed in *getJobInfo*: %s" % (monClass,e)) 270 return infos
271
273 """ Return a line of python source code which creates the instance of the monitoring service object 274 to be used in the job wrapper script. This method should not be overriden. 275 """ 276 277 importText = "from Ganga.Lib.MonitoringServices.Composite import CompositeMonitoringService;" 278 monClasses = "" 279 jobInfos = "" 280 configInfos = "" 281 for monService in self.monMonServices: 282 className = monService.__class__.__name__ 283 fqClassName = str(monService.__class__) 284 importText = "%s from %s import %s;" % (importText, monService._mod_name,className) 285 monClasses = "%s %s," % (monClasses,className) 286 jobInfos = "%s %s," % (jobInfos,monService.getJobInfo()) 287 config = monService.__class__.getConfig() 288 if config is None: 289 configInfo = None 290 else: 291 configInfo = config.getEffectiveOptions() 292 configInfos = "%s %s," % (configInfos,configInfo) 293 294 text = "def createMonitoringObject(): %s return CompositeMonitoringService([%s],[%s],[%s])\n" % \ 295 (importText,monClasses,jobInfos,configInfos) 296 297 return text
298
299 - def _log(self,level='info',msg=''):
300 301 if self.logger and hasattr(self.logger,level): 302 getattr(self.logger,level)(msg) 303 else: 304 #FIXME: this is used to log the monitoring actions in wrapper script 305 # and currently we log to stdout (the wrapper scripts does not provide yet 306 # a uniform interface to log Ganga specific messages to the wrapper script log 307 # (i.e __syslog__, __jobscript__.log,etc) 308 print >>sys.stderr, '[Ganga %s] %s' % (level,str(msg))
309 310 311 # 312