Package Ganga :: Package Core :: Package JobRepository :: Module PerformanceTest
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Core.JobRepository.PerformanceTest

  1  ##!/usr/bin/env python 
  2  ################################################################################ 
  3  # Ganga Project. http://cern.ch/ganga 
  4  # 
  5  # $Id: PerformanceTest.py,v 1.1 2008-07-17 16:40:50 moscicki Exp $ 
  6  ################################################################################ 
  7   
  8  import sys, os 
  9  import time 
 10  import re 
 11  import tempfile 
 12   
 13  import PerformanceTest 
 14  _thisDir = os.path.dirname(PerformanceTest.__file__) 
 15  if not _thisDir: 
 16      _thisDir = os.getcwd() 
 17  _root = os.path.dirname(os.path.dirname(os.path.dirname(_thisDir))) 
 18  sys.path.append(_root) 
 19   
 20   
 21   
 22  from Ganga.GPIDev.Lib.Job.Job  import Job 
 23  from Ganga.Lib.Executable.Executable import Executable 
 24  from Ganga.Core.JobRepository.ARDA import repositoryFactory 
 25  from Ganga.GPIDev.Streamers.SimpleStreamer import SimpleJobStreamer 
 26  import Ganga.Runtime.plugins 
 27   
 28  #DEBUG = False 
 29  DEBUG = True 
 30   
 31  ################################################################################ 
 32  # memory testing 
33 -def getmem():
34 """Gives memory used by the calling process in kb""" 35 ss = os.popen('pmap %d | tail -1'%os.getpid(), 'r').read() 36 if ss: 37 m = re.search(r'([0-9]*)K', ss) 38 if m: 39 return int(m.group(1))
40 41
42 -def writerep(n, m, bunch = 100):
43 """writerep(n, m, bunch = 100) --> (memory increase in kb, creation time) 44 register n bunches of GPIDev jobs; each bunch contains 'bunch' jobs; 45 each job has name of 'm' characters""" 46 m1 = getmem() 47 if m1 is not None: 48 dt =_writerep(n, m, bunch) 49 m2 = getmem() 50 if m2 is not None: 51 return (m2 - m1, dt)
52
53 -def _writerep(n, m, bunch = 100):
54 jj = [] 55 for i in range(bunch): 56 j = Job() 57 j.application.args = [m*'x'+ str(i)] 58 jj.append(j) 59 from Ganga.GPI import jobs 60 rep = jobs._impl.repository 61 t1 = time.time() 62 for i in range(n): 63 rep.registerJobs(jj) 64 t2 = time.time() 65 del jj 66 return t2-t1
67 68
69 -def readrep(bunch = 100):
70 """readrep(bunch = 100) --> (memory increase in kb, retrieval time) 71 checkout all jobs in the repository as GPIDev objects by bunches; 72 each bunch contains no more than 'bunch' jobs""" 73 m1 = getmem() 74 if m1 is not None: 75 dt = _readrep(bunch) 76 m2 = getmem() 77 if m2 is not None: 78 return (m2-m1, dt)
79 80
81 -def _readrep(bunch = 100):
82 def _co(ii): 83 jj = rep.checkoutJobs(ii)
84 from Ganga.GPI import jobs 85 rep = jobs._impl.repository 86 ids = rep.getJobIds({}) 87 start = 0 88 ii = ids[start: start + bunch] 89 t1 = time.time() 90 while ii: 91 _co(ii) 92 start += bunch 93 ii = ids[start: start + bunch] 94 t2 = time.time() 95 return t2-t1 96 97
98 -def readallrep():
99 """readallrep() --> (memory increase in kb, retrieval time) 100 checkout all jobs in the repository as GPIDev objects.""" 101 m1 = getmem() 102 if m1 is not None: 103 dt = _readallrep() 104 m2 = getmem() 105 if m2 is not None: 106 return (m2-m1, dt)
107 108
109 -def _readallrep():
110 from Ganga.GPI import jobs 111 rep = jobs._impl.repository 112 t1 = time.time() 113 jj = rep.checkoutJobs({}) 114 t2 = time.time() 115 del jj 116 return t2-t1
117 118
119 -def delrep(n, bunch = 100):
120 """delrep(bunch = 100) --> (memory increase in kb, delete time) 121 delete n*bunch jobs in the repository by bunches; 122 each bunch contains no more than 'bunch' jobs""" 123 m1 = getmem() 124 if m1 is not None: 125 dt = _delrep(n, bunch) 126 m2 = getmem() 127 if m2 is not None: 128 return (m2-m1, dt)
129 130
131 -def _delrep(n, bunch = 100):
132 from Ganga.GPI import jobs 133 rep = jobs._impl.repository 134 ids = rep.getJobIds({}) 135 start = 0 136 ids = ids[start:n*bunch] 137 ii = ids[start:start + bunch] 138 t1 = time.time() 139 while ii: 140 rep.deleteJobs(ii) 141 start += bunch 142 ii = ids[start:start + bunch] 143 t2 = time.time() 144 return t2-t1
145 146 147 ################################################################################ 148 149 ARG_LEN = 100 #controls length of a job 150
151 -def _startText(ff, txt):
152 ff.write(txt) 153 t1 = time.time() 154 ff.write('operation started at %s \n' % time.ctime(t1)) 155 return t1
156
157 -def _endText(ff, t1):
158 t2 = time.time() 159 ff.write('operation finished at %s \n' % time.ctime(t2)) 160 ff.write('time used: %f seconds \n' % (t2 - t1)) 161 ff.write('-s->%f<-s-\n\n\n'% (t2 - t1))
162 163
164 -def getSplitJob(LEN = 10):
165 # top level splitting 166 mj = Job() 167 jj = [] 168 for i in range(LEN): 169 sj = Job() 170 sj.application.exe = '/bin/myexe' + str(i) 171 sj.application.args = ['/'+ ARG_LEN*'abc' + str(i)] 172 sj._setParent(mj) 173 jj.append(sj) 174 mj.subjobs = jj 175 return mj
176
177 -def registerJobs(jobList, repository):
178 for mj in jobList: 179 repository.registerJobs([mj])
180 181
182 -def runTest(NTEST, LEN, rootDir, output_dir, rep_type):
183 if DEBUG: 184 print 'from runTest: rootDir %s, output_dir %s'%(rootDir, output_dir) 185 if rep_type == "Remote": 186 repository = repositoryFactory(repositoryType = rep_type, 187 root_dir = rootDir, 188 streamer = SimpleJobStreamer(), 189 host = 'gangamd.cern.ch', 190 #host = 'lxgate41.cern.ch', 191 port = 8822, 192 login = os.getlogin(), 193 keepalive = True) 194 elif rep_type == "Local": 195 repository = repositoryFactory(repositoryType = rep_type, 196 root_dir = rootDir, 197 streamer = SimpleJobStreamer(), 198 #local_root = os.path.expanduser('~'), 199 local_root = os.path.join('/afs/cern.ch/sw/ganga/workdir', 200 os.getlogin(), 'gangadir/repository')) 201 else: 202 print "Wrong type of repository..." 203 print "Exiting ..." 204 return 205 nn = tempfile.mktemp(suffix = '.test') 206 nn = os.path.join(output_dir, os.path.basename(nn)) 207 ff = file(nn, 'w') 208 try: 209 jj = [] 210 for i in range(NTEST): 211 j = getSplitJob(LEN) 212 j.name = "MyJob" + str(i) 213 j.application.args = ['/' + ARG_LEN*'abcd'] 214 jj.append(j) 215 216 #---------------------------------------------------- 217 t1 = _startText(ff, 'registering %d jobs...' % NTEST) 218 if DEBUG: 219 print 'registering %d jobs...' % NTEST 220 try: 221 repository.registerJobs(jj) 222 except Exception, e: 223 print "EXCEPTION in registerJobs", str(e) 224 if DEBUG: 225 print "--->command status", "FAIL", "\n" 226 else: 227 if DEBUG: 228 print "--->command status", "OK", "\n" 229 _endText(ff, t1) 230 231 #---------------------------------------------------- 232 t1 = _startText(ff, 'retrieving info about ALL jobs') 233 if DEBUG: 234 print 'retrieving info about ALL jobs' 235 try: 236 #rjj = repository.checkoutJobs(map(lambda j: j.id, jj)) 237 rjj = repository.checkoutJobs({}) 238 except Exception, e: 239 print "EXCEPTION in checkoutJobs", str(e) 240 if DEBUG: 241 print "--->command status", "FAIL", "\n" 242 else: 243 if DEBUG: 244 print "--->checkout jobs", len(rjj), map(lambda j: j.id, rjj), "\n" 245 _endText(ff, t1) 246 247 # some job modification 248 for j in jj: 249 # j.application = Executable() 250 j.name = j.name + 'Changed' 251 for sj in j.subjobs: 252 sj.application.exe = '/bin/ls' 253 254 #---------------------------------------------------- 255 t1 = _startText(ff, 'commiting %d jobs...' % NTEST) 256 if DEBUG: 257 print 'commiting %d jobs...' % NTEST 258 try: 259 repository.commitJobs(jj) 260 except Exception, e: 261 print "EXCEPTION in commitJobs", str(e) 262 if DEBUG: 263 print "--->command status", "FAIL", "\n" 264 else: 265 if DEBUG: 266 print "--->command status", "OK", "\n" 267 _endText(ff, t1) 268 269 #---------------------------------------------------- 270 t1 = _startText(ff, 'deleting %d jobs...' % NTEST) 271 if DEBUG: 272 print 'deleting %d jobs...' % NTEST 273 try: 274 repository.deleteJobs(map(lambda j: j.id, jj)) 275 except Exception, e: 276 print "EXCEPTION in deleteJobs", str(e) 277 if DEBUG: 278 print "--->command status", "FAIL", "\n" 279 else: 280 if DEBUG: 281 print "--->command status", "OK", "\n" 282 _endText(ff, t1) 283 284 finally: 285 ff.close()
286 287 ################################################################################
288 -def NormPath(path):
289 if sys.platform == 'win32': 290 directory, file = os.path.split(path) 291 drive, tail = os.path.splitdrive(directory) 292 path_elem = tail.split(os.sep) 293 path = drive + os.sep 294 for i in range(len(path_elem)): 295 elem = path_elem[i] 296 if elem: 297 sub_elem = elem.split() 298 elem = sub_elem[0] 299 if len(elem) > 8 or len(sub_elem) > 1: 300 elem = elem[:6] + '~1' 301 path = os.path.join(path, elem) 302 303 path = os.path.join(path, file) 304 305 return path
306 307 ################################################################################ 308 if __name__ == '__main__': 309 NTEST = int(raw_input('Enter a number of job to test --->')) 310 LEN = int(raw_input('Enter a number of subjobs per job --->')) 311 NUSERS = int(raw_input('Enter a number of '"users"' to test --->')) 312 OUTPUT = raw_input('Enter a name of output dir --->') 313 REPTYPE= raw_input('Enter type of repository (Local[1] or Remote[2]) --->') 314 if REPTYPE == '1': 315 REPTYPE = 'Local' 316 elif REPTYPE == '2': 317 REPTYPE = 'Remote' 318 else: 319 print "Unknown type of repository. Exiting" 320 sys.exit(1) 321 dname = 'users_' + str(NUSERS) + '__jobs_' + str(NTEST) + '__subjobs_'+ str(LEN) 322 output_dir = os.path.join(os.getcwd(), OUTPUT, REPTYPE, dname) 323 if not os.path.exists(output_dir): 324 os.makedirs(output_dir) 325 print "output dir is ", output_dir 326 327 python_path = NormPath(sys.executable) 328 i = 0 329 while i < NUSERS: 330 rootDir = '/users/testframework' 331 cmd = '"import sys\nsys.path.append(\'%s\')\nfrom PerformanceTest import runTest\nrunTest(%d, %d, \'%s\',\'%s\',\'%s\')"' % (_thisDir, NTEST, LEN, rootDir, output_dir, REPTYPE) 332 if sys.version_info[:3] < (2,3,0) or sys.version_info[:3] >=(2,3,4): 333 cmd = cmd[1:-1] 334 if DEBUG: 335 print cmd 336 pid = os.spawnl(os.P_NOWAIT, python_path, python_path, "-c", cmd) 337 if DEBUG: 338 print "new user process started %d" % pid 339 i+=1 340