Package Ganga :: Package Core :: Package JobRepository :: Module ARDATest
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Core.JobRepository.ARDATest

  1  ##!/usr/bin/env python 
  2  ################################################################################ 
  3  # Ganga Project. http://cern.ch/ganga 
  4  # 
  5  # $Id: ARDATest.py,v 1.1 2008-07-17 16:40:50 moscicki Exp $ 
  6  ################################################################################ 
  7   
  8  import sys, os 
  9  import time 
 10  import tempfile 
 11   
 12  import ARDATest 
 13  _thisDir = os.path.dirname(ARDATest.__file__) 
 14  if not _thisDir: 
 15      _thisDir = os.getcwd() 
 16  _root = os.path.dirname(os.path.dirname(os.path.dirname(_thisDir))) 
 17  sys.path.append(_root) 
 18   
 19   
 20   
 21  from Ganga.GPIDev.Lib.Job.Job  import Job 
 22  #from GangaLHCb.Lib.Gaudi.Gaudi import Gaudi 
 23  from Ganga.Lib.Executable.Executable import Executable 
 24  from Ganga.Core.JobRepository.ARDA import repositoryFactory 
 25  from Ganga.GPIDev.Streamers.SimpleStreamer import SimpleJobStreamer 
 26  import Ganga.Runtime.plugins 
 27   
 28  #DEBUG = False 
 29  DEBUG = True 
 30   
 31  ################################################################################     
32 -def _startText(ff, txt):
33 ff.write(txt) 34 t1 = time.time() 35 ff.write('operation started at %s \n' % time.ctime(t1)) 36 return t1
37
38 -def _endText(ff, t1):
39 t2 = time.time() 40 ff.write('operation finished at %s \n' % time.ctime(t2)) 41 ff.write('time used: %f seconds \n' % (t2 - t1)) 42 ff.write('-s->%f<-s-\n\n\n'% (t2 - t1))
43 44
45 -def testSplitting(repository, LEN):
46 # top level splitting 47 mj = Job() 48 jj = [] 49 for i in range(LEN): 50 sj = Job() 51 sj._setParent(mj) 52 sj.application.exe = '/bin/myexe' + str(i) 53 sj.application.args = 1000*['/ab' + str(i)] 54 jj.append(sj) 55 mj.subjobs = jj 56 57 # check registration 58 repository.registerJobs([mj]) 59 for s in mj.subjobs: 60 assert(s.master is mj) 61 assert(s.id != None) 62 63 # check ci/co 64 #repository.commitJobs([j._impl]) 65 mid = mj.id 66 j = repository.checkoutJobs([mid])[0] 67 assert(len(j.subjobs) == LEN) 68 69 # another ci/co check 70 j.subjobs[1].application.exe = '/bin/ls' 71 j.application.exe = '/bin/pwd' 72 repository.commitJobs([j]) 73 j = repository.checkoutJobs([mid])[0] 74 75 assert(j.subjobs[1].application.exe == '/bin/ls') 76 assert(j.application.exe == '/bin/pwd') 77 78 # check set status 79 status_list = [((mid, j.subjobs[1].id),'running'), 80 ((mid, j.subjobs[2].id),'running'), 81 ((mid, j.subjobs[3].id),'running')] 82 repository.setJobsStatus(status_list) 83 84 # check get status 85 md = repository.getJobsStatus(map(lambda x: x[0], status_list)) 86 for i in range(len(md)): 87 assert (md[i][0] == status_list[i][0]) 88 assert (md[i][1] == status_list[i][1]) 89 90 # check getting job status in another way 91 ttt = {'table_path':repository._getSubJobPath((mid,)), 'attributes':{}} 92 md = repository.getJobsStatus(ttt) 93 for i in range(len(status_list)): 94 if md[i][0] == status_list[i][0]: 95 assert (md[i][1] == status_list[i][1]) 96 97 # check delete job 98 repository.deleteJobs([mj.id])
99 100
101 -def runTest(NTEST, rootDir, output_dir, rep_type):
102 if DEBUG: 103 print 'from runTest: rootDir %s, output_dir %s'%(rootDir, output_dir) 104 if rep_type == "Remote": 105 repository = repositoryFactory(repositoryType = rep_type, 106 root_dir = rootDir, 107 streamer = SimpleJobStreamer(), 108 host = 'lxgate41.cern.ch', 109 port = 8822, 110 login = os.getlogin(), 111 keepalive = True) 112 elif rep_type == "Local": 113 repository = repositoryFactory(repositoryType = rep_type, 114 root_dir = rootDir, 115 streamer = SimpleJobStreamer(), 116 local_root = os.path.expanduser('~')) 117 else: 118 print "Wrong type of repository..." 119 print "Exiting ..." 120 return 121 nn = tempfile.mktemp(suffix = '.test') 122 nn = os.path.join(output_dir, os.path.basename(nn)) 123 ff = file(nn, 'w') 124 try: 125 jj = [] 126 for i in range(NTEST): 127 j = Job() 128 #j.application = Gaudi() 129 j.name = "MyJob" + str(i) 130 j.application.args = 1000*['/abc'] 131 jj.append(j) 132 133 #---------------------------------------------------- 134 t1 = _startText(ff, 'registering %d jobs...' % NTEST) 135 if DEBUG: 136 print 'registering %d jobs...' % NTEST 137 try: 138 repository.registerJobs(jj) 139 except Exception, e: 140 print "EXCEPTION in registerJobs", str(e) 141 if DEBUG: 142 print "--->command status", "FAIL", "\n" 143 else: 144 if DEBUG: 145 print "--->command status", "OK", "\n" 146 _endText(ff, t1) 147 148 #---------------------------------------------------- 149 t1 = _startText(ff, 'testing splitting of %d jobs...' % NTEST) 150 if DEBUG: 151 print 'testing splitting of %d jobs...' % NTEST 152 try: 153 for i in range(NTEST): 154 testSplitting(repository, LEN = 10) 155 except Exception, e: 156 print "EXCEPTION in testSplitting", str(e) 157 if DEBUG: 158 print "--->command status", "FAIL", "\n" 159 else: 160 if DEBUG: 161 print "--->command status", "OK", "\n" 162 _endText(ff, t1) 163 164 #---------------------------------------------------- 165 t1 = _startText(ff, 'retrieving info about first 10 jobs...') 166 if DEBUG: 167 print 'retrieving info about first 10 jobs...' 168 try: 169 rjj = repository.checkoutJobs(map(lambda j: j.id, jj[:10])) 170 except Exception, e: 171 print "EXCEPTION in checkoutJobs", str(e) 172 if DEBUG: 173 print "--->command status", "FAIL", "\n" 174 else: 175 if DEBUG: 176 print "--->checkout jobs", map(lambda j: j.id, rjj), "\n" 177 _endText(ff, t1) 178 179 #---------------------------------------------------- 180 t1 = _startText(ff, 'retrieving info about ALL jobs') 181 if DEBUG: 182 print 'retrieving info about ALL jobs' 183 try: 184 rjj = repository.checkoutJobs({}) 185 except Exception, e: 186 print "EXCEPTION in checkoutJobs", str(e) 187 if DEBUG: 188 print "--->command status", "FAIL", "\n" 189 else: 190 if DEBUG: 191 print "--->checkout jobs", len(rjj), map(lambda j: j.id, rjj), "\n" 192 _endText(ff, t1) 193 194 for j in jj: 195 j.application = Executable() 196 try: 197 j.updateStatus('submitting') 198 except: 199 pass 200 201 #---------------------------------------------------- 202 t1 = _startText(ff, 'commiting %d jobs...' % NTEST) 203 if DEBUG: 204 print 'commiting %d jobs...' % NTEST 205 try: 206 repository.commitJobs(jj) 207 except Exception, e: 208 print "EXCEPTION in commitJobs", str(e) 209 if DEBUG: 210 print "--->command status", "FAIL", "\n" 211 else: 212 if DEBUG: 213 print "--->command status", "OK", "\n" 214 _endText(ff, t1) 215 216 #---------------------------------------------------- 217 t1 = _startText(ff, 'setting status for %d jobs...' % NTEST) 218 if DEBUG: 219 print 'setting status for %d jobs...' % NTEST 220 try: 221 repository.setJobsStatus(map(lambda j: (j.id, 'submitted'), jj)) 222 except Exception, e: 223 print "EXCEPTION in setJobsStatus", str(e) 224 if DEBUG: 225 print "--->command status", "FAIL", "\n" 226 else: 227 if DEBUG: 228 print "--->command status", "OK", "\n" 229 _endText(ff, t1) 230 231 #---------------------------------------------------- 232 t1 = _startText(ff, 'getting status of first 10 jobs...') 233 if DEBUG: 234 print 'getting status of first 10 jobs...' 235 try: 236 rjj = repository.getJobsStatus(map(lambda j: j.id, jj[:10])) 237 except Exception, e: 238 print "EXCEPTION in getJobsStatus", str(e) 239 if DEBUG: 240 print "--->command status", "FAIL", "\n" 241 else: 242 if DEBUG: 243 print "--->command output", len(rjj), rjj, "\n" 244 _endText(ff, t1) 245 246 #---------------------------------------------------- 247 t1 = _startText(ff, 'getting id of jobs with particular attributes...') 248 if DEBUG: 249 print 'getting id of jobs with particular attributes...' 250 try: 251 rjj = repository.getJobIds({'status':'submitted', 'application':'Executable'}) 252 except Exception, e: 253 print "EXCEPTION in getJobIds", str(e) 254 if DEBUG: 255 print "--->command status", "FAIL", "\n" 256 else: 257 if DEBUG: 258 print "--->command output", len(rjj), rjj, "\n" 259 _endText(ff, t1) 260 261 262 t1 = _startText(ff, 'retrieving info about ALL jobs') 263 rjj = repository.checkoutJobs({}) 264 if DEBUG: 265 print 'retrieving info about ALL jobs' 266 jj_id = map(lambda j: j.id, jj) 267 st_lst = [] 268 for j in rjj: 269 if j.id in jj_id: 270 st_lst.append((j.id, j.status)) 271 print "--->command output", len(st_lst), st_lst, "\n" 272 _endText(ff, t1) 273 274 275 t1 = _startText(ff, 'deleting %d jobs...' % NTEST) 276 if DEBUG: 277 print 'deleting %d jobs...' % NTEST 278 try: 279 repository.deleteJobs(map(lambda j: j.id, jj)) 280 except Exception, e: 281 print "EXCEPTION in deleteJobs", str(e) 282 if DEBUG: 283 print "--->command status", "FAIL", "\n" 284 else: 285 if DEBUG: 286 print "--->command status", "OK", "\n" 287 _endText(ff, t1) 288 289 finally: 290 ff.close()
291 292 ################################################################################
293 -def NormPath(path):
294 if sys.platform == 'win32': 295 directory, file = os.path.split(path) 296 drive, tail = os.path.splitdrive(directory) 297 path_elem = tail.split(os.sep) 298 path = drive + os.sep 299 for i in range(len(path_elem)): 300 elem = path_elem[i] 301 if elem: 302 sub_elem = elem.split() 303 elem = sub_elem[0] 304 if len(elem) > 8 or len(sub_elem) > 1: 305 elem = elem[:6] + '~1' 306 path = os.path.join(path, elem) 307 308 path = os.path.join(path, file) 309 310 return path
311 312 ################################################################################ 313 if __name__ == '__main__': 314 NTEST = int(raw_input('Enter a number of job to test --->')) 315 NUSERS = int(raw_input('Enter a number of '"users"' to test --->')) 316 OUTPUT = raw_input('Enter a name of output dir --->') 317 REPTYPE= raw_input('Enter type of repository (Local[1] or Remote[2]) --->') 318 if REPTYPE == '1': 319 REPTYPE = 'Local' 320 elif REPTYPE == '2': 321 REPTYPE = 'Remote' 322 else: 323 print "Unknown type of repository. Exiting" 324 sys.exit(1) 325 dname = 'users_' + str(NUSERS) + '__jobs_' + str(NTEST) 326 output_dir = os.path.join(os.getcwd(), OUTPUT, REPTYPE, dname) 327 if not os.path.exists(output_dir): 328 os.makedirs(output_dir) 329 print "output dir is ", output_dir 330 331 python_path = NormPath(sys.executable) 332 i = 0 333 while i < NUSERS: 334 rootDir = '/testdir/GangaTest/user' 335 cmd = '"import sys\nsys.path.append(\'%s\')\nfrom ARDATest import runTest\nrunTest(%d, \'%s\',\'%s\',\'%s\')"' % (_thisDir, NTEST, rootDir, output_dir, REPTYPE) 336 if sys.version_info[:3] < (2,3,0) or sys.version_info[:3] >=(2,3,4): 337 cmd = cmd[1:-1] 338 if DEBUG: 339 print cmd 340 pid = os.spawnl(os.P_NOWAIT, python_path, python_path, "-c", cmd) 341 if DEBUG: 342 print "new user process started %d" % pid 343 i+=1 344