Package Ganga :: Package Core :: Package InternalServices :: Module Coordinator
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Core.InternalServices.Coordinator

  1  ################################################################################ 
  2  # Ganga - a computational task management tool for easy access to Grid resources 
  3  # http://cern.ch/ganga 
  4  # 
  5  # $Id: Coordinator.py,v 1.1.4.2 2009-07-14 14:44:17 ebke Exp $ 
  6  # 
  7  # Copyright (C) 2003-2007 The Ganga Project 
  8  # 
  9  # This file is part of Ganga.  
 10  # 
 11  # Ganga is free software; you can redistribute it and/or modify 
 12  # it under the terms of the GNU General Public License as published by 
 13  # the Free Software Foundation; either version 2 of the License, or 
 14  # (at your option) any later version. 
 15  # 
 16  # Ganga is distributed in the hope that it will be useful, 
 17  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 18  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 19  # GNU General Public License for more details. 
 20   
 21  # You should have received a copy of the GNU General Public License 
 22  # along with this program; if not, write to the Free Software 
 23  # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA 
 24  ################################################################################ 
 25   
 26  """ 
 27   Internal services coordinator : 
 28    takes care of conditional enabling/disabling of internal services (job monitoring loop, job registry, job 
 29    repository/workspace) when credentials become invalid preventing normal functioning of these services. 
 30    E.g: invalid grid proxy triggers the monitor-loop stop 
 31  """ 
 32  from Ganga.Utility.Config import getConfig 
 33  from Ganga.Utility.logging import getLogger 
 34   
 35  log = getLogger() 
 36   
 37  #the overall state of Ganga internal services 
 38  servicesEnabled = True 
 39   
40 -def isCredentialRequired (credObj):
41 """ 42 The logic to decide if a given invalid credential 43 should trigger the deactivation of Ganga internal services. 44 """ 45 46 from Ganga.Runtime import Workspace_runtime 47 from Ganga.Runtime import Repository_runtime 48 49 if credObj.__class__.__name__ == 'AfsToken': 50 return Workspace_runtime.requiresAfsToken() or Repository_runtime.requiresAfsToken() 51 52 if credObj.__class__.__name__ == 'GridProxy': 53 if Repository_runtime.requiresGridProxy() or Workspace_runtime.requiresGridProxy(): 54 return True 55 from Ganga.GPI import jobs,typename 56 return bool([j for j in jobs if typename(j.backend)=='LCG' and j.status in ['submitted','running','completing']]) 57 58 log.warning("Unknown credential object : %s" % credObj)
59
60 -def notifyInvalidCredential (credObj):
61 """ 62 The Core is notified when one of the monitored credentials is invalid 63 @see ICredential.create() 64 """ 65 66 #ignore this notification if the internal services are already stopped 67 if not servicesEnabled: 68 log.debug ("One of the monitored credential [%s] is invalid BUT the internal services are already disabled." % credObj._name) 69 return 70 71 if isCredentialRequired(credObj): 72 log.debug ("One of the required credential for the internal services is invalid: [%s]." 73 "Disabling internal services ..." % credObj._name) 74 _tl = credObj.timeleft() 75 if _tl == "-1": 76 log.error('%s has been destroyed! Could not shutdown internal services.' % credObj._name) 77 return 78 disableInternalServices() 79 log.warning('%s is about to expire! ' 80 'To protect against possible write errors all internal services has been disabled.' 81 'If you believe the problem has been solved type "reactivate()" to re-enable ' 82 'interactions within this session.' % credObj._name) 83 else: 84 log.debug ("One of the monitored credential [%s] is invalid BUT it is not required by the internal services" % credObj._name)
85 86
87 -def _diskSpaceChecker():
88 """ 89 the callback function used internally by Monitoring Component 90 Reads and calls the checking function provided in the configuration. 91 If this checking function returns "False" the internal services are disabled making Ganga read-only: 92 e.g: 93 [PollThread] 94 DiskSpaceChecker = 95 import commands 96 diskusage = commands.getoutput('df -l -P %s/workspace' % config['Configuration']['gangadir']) 97 used = diskusage.splitlines()[1].split()[4] # get disk usage (in %) 98 return int(used[:-1])<70 99 """ 100 log.debug("Checking disk space") 101 try: 102 config = getConfig('PollThread') 103 104 if config['DiskSpaceChecker']: 105 _checker = lambda : True 106 try: 107 #create the checker 108 from Ganga.Runtime import _prog 109 import new 110 ns={} 111 code = "def check():" 112 for line in config['DiskSpaceChecker'].splitlines(): 113 code +="\t%s\n" % line 114 exec code in ns 115 _checker = new.function(ns["check"].func_code, _prog.local_ns, 'check' ) 116 except Exception,e: 117 log.warning('Syntax errors in disk space checking code: %s. See [PollThread]DiskSpaceChecker' % e) 118 return False 119 120 #call the checker 121 if _checker() is False: 122 disableInternalServices() 123 log.warning('You are running out of disk space! ' 124 'To protect against possible write errors all internal services has been disabled.' 125 'If you believe the problem has been solved type "reactivate()" to re-enable ' 126 'interactions within this session.') 127 except Exception, msg: 128 log.warning('Exception in free disk space checking code: %s. See [PollThread]DiskSpaceChecker' % msg) 129 return False 130 return True
131 132
133 -def disableInternalServices():
134 """ 135 Deactivates all the internal services : 136 * monitoring loop 137 * registry/repository and workspace (or GPI entierly) 138 Currently this method is called whenever: 139 * one of the managed credentials (AFS token or Grid Proxy) is detected as beeing *invalid* by the monitoring component 140 * the user is running out of space 141 """ 142 143 global servicesEnabled 144 log.debug("Disabling the internal services") 145 #disable the mon loop 146 from Ganga.Core import monitoring_component 147 monitoring_component.disableMonitoring() 148 #flush the registries 149 from Ganga.Runtime import Repository_runtime 150 Repository_runtime.shutdown() 151 #this will disable any interactions with the registries (implicitly with the GPI) 152 servicesEnabled = False
153
154 -def enableInternalServices():
155 """ 156 activates the internal services previously disabled due to expired credentials 157 """ 158 global servicesEnabled 159 #make sure all required credentials are valid 160 missing_cred = getMissingCredentials() 161 if missing_cred: 162 log.error("The following credentials are still required: %s." 163 "Make sure you renew them before reactivating this session" % ','.join(missing_cred)) 164 return 165 #startup the registries 166 from Ganga.Runtime import Repository_runtime 167 Repository_runtime.bootstrap() 168 169 log.debug("Enabling the internal services") 170 # reenable the monitoring loop if *autostart* is set 171 from Ganga.Core import monitoring_component 172 from Ganga.Core.MonitoringComponent.Local_GangaMC_Service import config 173 if config['autostart']: 174 monitoring_component.enableMonitoring() 175 176 servicesEnabled = True 177 log.info('Internal services reactivated successfuly')
178
179 -def checkInternalServices(errMsg='Internal services disabled. Job registry is read-only.'):
180 """ 181 Check the state of internal services and return a ReadOnlyObjectError exception 182 in case the state is disabled. 183 """ 184 185 global servicesEnabled 186 from Ganga.GPIDev.Base import ReadOnlyObjectError 187 188 if not servicesEnabled: 189 raise ReadOnlyObjectError(errMsg)
190
191 -def getMissingCredentials():
192 """ 193 get a list of missing credentials 194 i.e: invalid credentials that are needed by the internal services to run 195 """ 196 from Ganga.GPIDev.Credentials import _allCredentials as availableCreds 197 return [name for name in availableCreds \ 198 if not availableCreds[name].isValid() and \ 199 isCredentialRequired(availableCreds[name])]
200
201 -def bootstrap():
202 203 global servicesEnabled 204 servicesEnabled = True 205 206 #export to GPI 207 from Ganga.Runtime.GPIexport import exportToGPI 208 exportToGPI('reactivate',enableInternalServices,'Functions')
209 210 211 212 # 213 #$Log: not supported by cvs2svn $ 214 #Revision 1.1.4.1 2009/07/08 11:18:21 ebke 215 #Initial commit of all - mostly small - modifications due to the new GangaRepository. 216 #No interface visible to the user is changed 217 # 218 #Revision 1.1 2008/07/17 16:40:50 moscicki 219 #migration of 5.0.2 to HEAD 220 # 221 #the doc and release/tools have been taken from HEAD 222 # 223 #Revision 1.3.6.4 2008/03/11 15:22:42 moscicki 224 #merge from Ganga-5-0-restructure-config-branch 225 # 226 #Revision 1.3.6.3.2.1 2008/03/07 13:36:07 moscicki 227 #removal of [DefaultJobRepository] and [FileWorkspace] 228 #new options in [Configuration] user, gangadir, repositorytype, workspacetype 229 # 230 #Revision 1.3.6.3 2008/02/12 09:25:52 amuraru 231 #fixed repositories shutdown 232 # 233 #Revision 1.3.6.2 2008/02/05 12:33:23 amuraru 234 #fixed DiskSpaceChecker alignment 235 # 236 #Revision 1.3.6.1 2007/12/10 19:24:42 amuraru 237 #merged changes from Ganga 4.4.4 238 # 239 #Revision 1.7 2007/12/05 12:42:54 amuraru 240 #Ganga/Core/InternalServices/Coordinator.py 241 # 242 #Revision 1.6 2007/12/04 12:59:03 amuraru 243 #*** empty log message *** 244 # 245 #Revision 1.5 2007/11/26 14:04:30 amuraru 246 #allow indentation using \t tab char in DiskSpaceChecker 247 # 248 #Revision 1.4 2007/10/29 14:06:00 amuraru 249 # added free disk space checker to monitoring loop 250 # 251 #Revision 1.3 2007/07/27 18:02:34 amuraru 252 #updated to comply with the latest requirement for GPI free functions docstrings 253 # 254 #Revision 1.2 2007/07/27 14:31:56 moscicki 255 #credential and clean shutdown updates from Adrian (from Ganga-4-4-0-dev-branch) 256 # 257 #Revision 1.1.2.1 2007/07/27 13:04:00 amuraru 258 #*** empty log message *** 259 # 260 # 261