1
2
3
4
5
6
7
8 import sys, os
9 import time
10 import re
11 import tempfile
12
13 import PerformanceTest
14 _thisDir = os.path.dirname(PerformanceTest.__file__)
15 if not _thisDir:
16 _thisDir = os.getcwd()
17 _root = os.path.dirname(os.path.dirname(os.path.dirname(_thisDir)))
18 sys.path.append(_root)
19
20
21
22 from Ganga.GPIDev.Lib.Job.Job import Job
23 from Ganga.Lib.Executable.Executable import Executable
24 from Ganga.Core.JobRepository.ARDA import repositoryFactory
25 from Ganga.GPIDev.Streamers.SimpleStreamer import SimpleJobStreamer
26 import Ganga.Runtime.plugins
27
28
29 DEBUG = True
30
31
32
34 """Gives memory used by the calling process in kb"""
35 ss = os.popen('pmap %d | tail -1'%os.getpid(), 'r').read()
36 if ss:
37 m = re.search(r'([0-9]*)K', ss)
38 if m:
39 return int(m.group(1))
40
41
43 """writerep(n, m, bunch = 100) --> (memory increase in kb, creation time)
44 register n bunches of GPIDev jobs; each bunch contains 'bunch' jobs;
45 each job has name of 'm' characters"""
46 m1 = getmem()
47 if m1 is not None:
48 dt =_writerep(n, m, bunch)
49 m2 = getmem()
50 if m2 is not None:
51 return (m2 - m1, dt)
52
67
68
70 """readrep(bunch = 100) --> (memory increase in kb, retrieval time)
71 checkout all jobs in the repository as GPIDev objects by bunches;
72 each bunch contains no more than 'bunch' jobs"""
73 m1 = getmem()
74 if m1 is not None:
75 dt = _readrep(bunch)
76 m2 = getmem()
77 if m2 is not None:
78 return (m2-m1, dt)
79
80
84 from Ganga.GPI import jobs
85 rep = jobs._impl.repository
86 ids = rep.getJobIds({})
87 start = 0
88 ii = ids[start: start + bunch]
89 t1 = time.time()
90 while ii:
91 _co(ii)
92 start += bunch
93 ii = ids[start: start + bunch]
94 t2 = time.time()
95 return t2-t1
96
97
99 """readallrep() --> (memory increase in kb, retrieval time)
100 checkout all jobs in the repository as GPIDev objects."""
101 m1 = getmem()
102 if m1 is not None:
103 dt = _readallrep()
104 m2 = getmem()
105 if m2 is not None:
106 return (m2-m1, dt)
107
108
117
118
120 """delrep(bunch = 100) --> (memory increase in kb, delete time)
121 delete n*bunch jobs in the repository by bunches;
122 each bunch contains no more than 'bunch' jobs"""
123 m1 = getmem()
124 if m1 is not None:
125 dt = _delrep(n, bunch)
126 m2 = getmem()
127 if m2 is not None:
128 return (m2-m1, dt)
129
130
145
146
147
148
149 ARG_LEN = 100
150
151 -def _startText(ff, txt):
152 ff.write(txt)
153 t1 = time.time()
154 ff.write('operation started at %s \n' % time.ctime(t1))
155 return t1
156
157 -def _endText(ff, t1):
158 t2 = time.time()
159 ff.write('operation finished at %s \n' % time.ctime(t2))
160 ff.write('time used: %f seconds \n' % (t2 - t1))
161 ff.write('-s->%f<-s-\n\n\n'% (t2 - t1))
162
163
176
180
181
182 -def runTest(NTEST, LEN, rootDir, output_dir, rep_type):
183 if DEBUG:
184 print 'from runTest: rootDir %s, output_dir %s'%(rootDir, output_dir)
185 if rep_type == "Remote":
186 repository = repositoryFactory(repositoryType = rep_type,
187 root_dir = rootDir,
188 streamer = SimpleJobStreamer(),
189 host = 'gangamd.cern.ch',
190
191 port = 8822,
192 login = os.getlogin(),
193 keepalive = True)
194 elif rep_type == "Local":
195 repository = repositoryFactory(repositoryType = rep_type,
196 root_dir = rootDir,
197 streamer = SimpleJobStreamer(),
198
199 local_root = os.path.join('/afs/cern.ch/sw/ganga/workdir',
200 os.getlogin(), 'gangadir/repository'))
201 else:
202 print "Wrong type of repository..."
203 print "Exiting ..."
204 return
205 nn = tempfile.mktemp(suffix = '.test')
206 nn = os.path.join(output_dir, os.path.basename(nn))
207 ff = file(nn, 'w')
208 try:
209 jj = []
210 for i in range(NTEST):
211 j = getSplitJob(LEN)
212 j.name = "MyJob" + str(i)
213 j.application.args = ['/' + ARG_LEN*'abcd']
214 jj.append(j)
215
216
217 t1 = _startText(ff, 'registering %d jobs...' % NTEST)
218 if DEBUG:
219 print 'registering %d jobs...' % NTEST
220 try:
221 repository.registerJobs(jj)
222 except Exception, e:
223 print "EXCEPTION in registerJobs", str(e)
224 if DEBUG:
225 print "--->command status", "FAIL", "\n"
226 else:
227 if DEBUG:
228 print "--->command status", "OK", "\n"
229 _endText(ff, t1)
230
231
232 t1 = _startText(ff, 'retrieving info about ALL jobs')
233 if DEBUG:
234 print 'retrieving info about ALL jobs'
235 try:
236
237 rjj = repository.checkoutJobs({})
238 except Exception, e:
239 print "EXCEPTION in checkoutJobs", str(e)
240 if DEBUG:
241 print "--->command status", "FAIL", "\n"
242 else:
243 if DEBUG:
244 print "--->checkout jobs", len(rjj), map(lambda j: j.id, rjj), "\n"
245 _endText(ff, t1)
246
247
248 for j in jj:
249
250 j.name = j.name + 'Changed'
251 for sj in j.subjobs:
252 sj.application.exe = '/bin/ls'
253
254
255 t1 = _startText(ff, 'commiting %d jobs...' % NTEST)
256 if DEBUG:
257 print 'commiting %d jobs...' % NTEST
258 try:
259 repository.commitJobs(jj)
260 except Exception, e:
261 print "EXCEPTION in commitJobs", str(e)
262 if DEBUG:
263 print "--->command status", "FAIL", "\n"
264 else:
265 if DEBUG:
266 print "--->command status", "OK", "\n"
267 _endText(ff, t1)
268
269
270 t1 = _startText(ff, 'deleting %d jobs...' % NTEST)
271 if DEBUG:
272 print 'deleting %d jobs...' % NTEST
273 try:
274 repository.deleteJobs(map(lambda j: j.id, jj))
275 except Exception, e:
276 print "EXCEPTION in deleteJobs", str(e)
277 if DEBUG:
278 print "--->command status", "FAIL", "\n"
279 else:
280 if DEBUG:
281 print "--->command status", "OK", "\n"
282 _endText(ff, t1)
283
284 finally:
285 ff.close()
286
287
289 if sys.platform == 'win32':
290 directory, file = os.path.split(path)
291 drive, tail = os.path.splitdrive(directory)
292 path_elem = tail.split(os.sep)
293 path = drive + os.sep
294 for i in range(len(path_elem)):
295 elem = path_elem[i]
296 if elem:
297 sub_elem = elem.split()
298 elem = sub_elem[0]
299 if len(elem) > 8 or len(sub_elem) > 1:
300 elem = elem[:6] + '~1'
301 path = os.path.join(path, elem)
302
303 path = os.path.join(path, file)
304
305 return path
306
307
308 if __name__ == '__main__':
309 NTEST = int(raw_input('Enter a number of job to test --->'))
310 LEN = int(raw_input('Enter a number of subjobs per job --->'))
311 NUSERS = int(raw_input('Enter a number of '"users"' to test --->'))
312 OUTPUT = raw_input('Enter a name of output dir --->')
313 REPTYPE= raw_input('Enter type of repository (Local[1] or Remote[2]) --->')
314 if REPTYPE == '1':
315 REPTYPE = 'Local'
316 elif REPTYPE == '2':
317 REPTYPE = 'Remote'
318 else:
319 print "Unknown type of repository. Exiting"
320 sys.exit(1)
321 dname = 'users_' + str(NUSERS) + '__jobs_' + str(NTEST) + '__subjobs_'+ str(LEN)
322 output_dir = os.path.join(os.getcwd(), OUTPUT, REPTYPE, dname)
323 if not os.path.exists(output_dir):
324 os.makedirs(output_dir)
325 print "output dir is ", output_dir
326
327 python_path = NormPath(sys.executable)
328 i = 0
329 while i < NUSERS:
330 rootDir = '/users/testframework'
331 cmd = '"import sys\nsys.path.append(\'%s\')\nfrom PerformanceTest import runTest\nrunTest(%d, %d, \'%s\',\'%s\',\'%s\')"' % (_thisDir, NTEST, LEN, rootDir, output_dir, REPTYPE)
332 if sys.version_info[:3] < (2,3,0) or sys.version_info[:3] >=(2,3,4):
333 cmd = cmd[1:-1]
334 if DEBUG:
335 print cmd
336 pid = os.spawnl(os.P_NOWAIT, python_path, python_path, "-c", cmd)
337 if DEBUG:
338 print "new user process started %d" % pid
339 i+=1
340