Package Ganga :: Package Lib :: Package MonitoringServices :: Package ARDADashboard :: Package LCG :: Module ARDADashboardLCG
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Lib.MonitoringServices.ARDADashboard.LCG.ARDADashboardLCG

  1  # benjamin.gaidioz@cern.ch 
  2  # the DashboardAPI module is loaded without any URL for the monalisa 
  3  # host, configuration is the default one. This is a bit bad. 
  4   
  5  import os 
  6  import popen2 
  7   
  8  from Ganga.GPIDev.Adapters.IMonitoringService import IMonitoringService 
  9  from types import DictionaryType, IntType 
 10  from Ganga.Lib.MonitoringServices.ARDADashboard.DashboardAPI import DashboardAPI 
 11   
 12  #out = open(os.path.join(os.getcwd(), 'dashboard.log'),'w') 
13 -def printInfo(s):
14 pass
15 # out.write(str(s) + os.linesep) 16 # out.flush() 17 18
19 -def printError(s):
20 pass
21 # out.write(str(s) + os.linesep) 22 # out.flush() 23 24
25 -def safe_getenv(varname):
26 try: 27 return os.environ[varname] 28 except: 29 return 'unknown'
30
31 -def get_output(commands):
32 for command in commands: 33 printInfo("running %s" % command) 34 p = popen2.Popen3(command) 35 retcode = p.wait() 36 if retcode == 0: 37 lines = p.fromchild.readlines() 38 s = '' 39 for l in lines: 40 if s != '': 41 s += ' ' 42 s += l.split('\n')[0] 43 return s 44 else: 45 lines = p.fromchild.readlines() 46 printInfo("get_output: %s" % lines) 47 return 'unknown'
48 49
50 -class ARDADashboardLCG(IMonitoringService):
51 52 gridJobId = None 53 gangaJobId = None 54 gangaTaskId = None 55 dashboard = None 56 gridBackend = None 57 gridCertificate = None 58 VO = None 59 taskPrefix = 'ganga' 60 _complete = False 61 62 _logger = None 63
64 - def __init__(self, job_info):
65 IMonitoringService.__init__(self, job_info) 66 67 if type(job_info) is DictionaryType: 68 # we are on the worker node. We just need 69 # to get values from the dictionary 70 71 try: 72 self.gangaJobId = job_info['gangaJobId'] 73 self.gangaTaskId = job_info['gangaTaskId'] 74 self.gridBackend = job_info['gridBackend'] 75 self.gridCertificate = job_info['gridCertificate'] 76 self.VO = job_info['VO'] 77 self._complete = True 78 except KeyError,msg: 79 # too bad, we will not monitor the job 80 return 81 # on the WN, we get the job ID from envar 82 self.gridJobId = safe_getenv('EDG_WL_JOBID') 83 if self.gridJobId == 'unknown': 84 self.gridJobId = safe_getenv('GLITE_WL_JOBID') 85 if self.gridJobId == 'unknown': 86 self._complete = False 87 88 89 else: 90 # we are in the client session. job_info is a Job() 91 92 from Ganga.Utility.logging import getLogger 93 self._logger = getLogger() 94 95 job = job_info 96 97 self.gridBackend = getattr(job,'backend')._name 98 if self.gridBackend not in ['LCG']: 99 self._logger.debug('not sending monitoring because not in LCG') 100 return 101 102 self._logger.debug(job.backend) 103 self._logger.debug(job.backend.id) 104 self.gridJobId = job.backend.id 105 106 # we compute the "jobID" and "taskID" 107 # (which is the gangaJobId followed by the user@repository 108 109 # the repository unique ID: 110 from Ganga.Utility.Config import getConfig, ConfigError 111 config = getConfig('Configuration') 112 rep_type = config['repositorytype'] 113 rep_login = config['user'] 114 if 'Local' in rep_type: 115 from Ganga.Runtime import Repository_runtime 116 rep_dir = Repository_runtime.getLocalRoot() 117 sys_config = getConfig('System') 118 rep_hostname = sys_config['GANGA_HOSTNAME'] 119 rep_location = rep_hostname + ':' + rep_dir 120 elif 'Remote' in rep_type: 121 remote_config = getConfig(rep_type+"_Repository") 122 rep_host = remote_config['host'] 123 rep_port = remote_config['port'] 124 rep_location = rep_host + ':' + rep_port 125 else: 126 return 127 repository_id = rep_login + '@' + rep_location 128 129 master_job = job.master 130 131 if master_job is not None: 132 master_id = master_job.id 133 self._logger.debug('found master: %d' % master_id) 134 self.gangaTaskId = self.taskPrefix + '_' + str(master_id) + '_' + repository_id 135 self.gangaJobId = str(job.id) 136 else: 137 self.gangaTaskId = self.taskPrefix + '_' + str(job.id) + '_' + repository_id 138 self.gangaJobId = '0' 139 140 self._logger.debug('task_id = %s' % self.gangaTaskId) 141 self._logger.debug('task_job_id = %s' % self.gangaJobId) 142 143 backendConfig = getConfig(self.gridBackend) 144 try: 145 self.VO = backendConfig['VirtualOrganisation'] 146 except KeyError: 147 self._logger.debug('VirtualOrganisation not configured') 148 # we need it, it's too dangerous if we are not sure 149 return 150 151 from Ganga.GPIDev.Credentials import getCredential 152 153 proxy = getCredential('GridProxy') 154 self.gridCertificate = proxy.info('-subject') 155 if self.gridCertificate is None: 156 self._logger.debug('error: grid certificate not known') 157 return 158 159 if self.gridJobId is None: 160 self._logger.debug('normal: grid job ID is None') 161 162 self._logger.debug('job is complete') 163 self._complete = True 164 165 # we can now initialize the dashboard communication thing 166 if self.gridJobId is not None: 167 try: 168 self.dashboard = DashboardAPI(self.gangaTaskId, self.gangaJobId + '_' + self.gridJobId) 169 except TypeError: 170 self.dashboard = DashboardAPI(self.gangaTaskId, self.gangaJobId + '_' + '_'.join(self.gridJobId))
171 172 173
174 - def getJobInfo(self):
175 176 # the method which returns useful info to have on the WN. We send basically 177 # everything, just in case. 178 if self._complete: 179 dict = { 180 'gangaJobId':self.gangaJobId, 181 'gangaTaskId':self.gangaTaskId, 182 'gridBackend':self.gridBackend, 183 'gridCertificate':self.gridCertificate, 184 'VO':self.VO 185 } 186 self._logger.debug(dict) 187 return dict 188 else: 189 return {}
190
191 - def getSandboxModules(self):
192 193 # it would be nice if this would be more readable. 194 195 import Ganga.Lib.MonitoringServices.ARDADashboard.LCG 196 import ApMon 197 import ApMon.apmon 198 import ApMon.Logger 199 return IMonitoringService.getSandboxModules(self) + [Ganga, 200 Ganga.Lib, 201 Ganga.Lib.MonitoringServices, 202 Ganga.Lib.MonitoringServices.ARDADashboard, 203 Ganga.Lib.MonitoringServices.ARDADashboard.DashboardAPI, 204 Ganga.GPIDev, 205 Ganga.GPIDev.Adapters, 206 Ganga.GPIDev.Adapters.IMonitoringService, 207 ApMon, 208 ApMon.apmon, 209 ApMon.Logger, 210 ApMon.ProcInfo, 211 Ganga.Lib.MonitoringServices.ARDADashboard.LCG, 212 Ganga.Lib.MonitoringServices.ARDADashboard.LCG.ARDADashboardLCG, 213 ]
214
215 - def submit(self,**opts):
216 raise Exception('not implemented')
217
218 - def start(self, **opts):
219 # we are in principle in the WN. We need the CE (we try both possibilities) 220 printInfo("monitor start event") 221 if self._complete: 222 hostqueue = get_output(['edg-brokerinfo getCE','glite-brokerinfo getCE']) 223 224 if hostqueue == 'unknown': 225 printInfo("brokerinfo command returns nothing") 226 hostqueue = safe_getenv('GANGA_LCG_CE') 227 self.dashboard.publish(SyncCE=hostqueue) 228 229 #FIXED: A TYPO? replaced executable by application 230 # optional 231 if self.application is not None: 232 self.dashboard.publish(ExeStart=self.application)
233
234 - def progress(self, **opts):
235 # we are in the WN. But it's application dependent 236 return
237
238 - def stop(self,exitcode,**opts):
239 if self._complete: 240 if type(exitcode) is IntType: 241 self.dashboard.publish(JobExitCode=exitcode)
242