1
2
3
4
5
6
7
8 import sys, os
9 import time
10 import tempfile
11
12 import ARDATest
13 _thisDir = os.path.dirname(ARDATest.__file__)
14 if not _thisDir:
15 _thisDir = os.getcwd()
16 _root = os.path.dirname(os.path.dirname(os.path.dirname(_thisDir)))
17 sys.path.append(_root)
18
19
20
21 from Ganga.GPIDev.Lib.Job.Job import Job
22
23 from Ganga.Lib.Executable.Executable import Executable
24 from Ganga.Core.JobRepository.ARDA import repositoryFactory
25 from Ganga.GPIDev.Streamers.SimpleStreamer import SimpleJobStreamer
26 import Ganga.Runtime.plugins
27
28
29 DEBUG = True
30
31
32 -def _startText(ff, txt):
33 ff.write(txt)
34 t1 = time.time()
35 ff.write('operation started at %s \n' % time.ctime(t1))
36 return t1
37
38 -def _endText(ff, t1):
39 t2 = time.time()
40 ff.write('operation finished at %s \n' % time.ctime(t2))
41 ff.write('time used: %f seconds \n' % (t2 - t1))
42 ff.write('-s->%f<-s-\n\n\n'% (t2 - t1))
43
44
46
47 mj = Job()
48 jj = []
49 for i in range(LEN):
50 sj = Job()
51 sj._setParent(mj)
52 sj.application.exe = '/bin/myexe' + str(i)
53 sj.application.args = 1000*['/ab' + str(i)]
54 jj.append(sj)
55 mj.subjobs = jj
56
57
58 repository.registerJobs([mj])
59 for s in mj.subjobs:
60 assert(s.master is mj)
61 assert(s.id != None)
62
63
64
65 mid = mj.id
66 j = repository.checkoutJobs([mid])[0]
67 assert(len(j.subjobs) == LEN)
68
69
70 j.subjobs[1].application.exe = '/bin/ls'
71 j.application.exe = '/bin/pwd'
72 repository.commitJobs([j])
73 j = repository.checkoutJobs([mid])[0]
74
75 assert(j.subjobs[1].application.exe == '/bin/ls')
76 assert(j.application.exe == '/bin/pwd')
77
78
79 status_list = [((mid, j.subjobs[1].id),'running'),
80 ((mid, j.subjobs[2].id),'running'),
81 ((mid, j.subjobs[3].id),'running')]
82 repository.setJobsStatus(status_list)
83
84
85 md = repository.getJobsStatus(map(lambda x: x[0], status_list))
86 for i in range(len(md)):
87 assert (md[i][0] == status_list[i][0])
88 assert (md[i][1] == status_list[i][1])
89
90
91 ttt = {'table_path':repository._getSubJobPath((mid,)), 'attributes':{}}
92 md = repository.getJobsStatus(ttt)
93 for i in range(len(status_list)):
94 if md[i][0] == status_list[i][0]:
95 assert (md[i][1] == status_list[i][1])
96
97
98 repository.deleteJobs([mj.id])
99
100
101 -def runTest(NTEST, rootDir, output_dir, rep_type):
102 if DEBUG:
103 print 'from runTest: rootDir %s, output_dir %s'%(rootDir, output_dir)
104 if rep_type == "Remote":
105 repository = repositoryFactory(repositoryType = rep_type,
106 root_dir = rootDir,
107 streamer = SimpleJobStreamer(),
108 host = 'lxgate41.cern.ch',
109 port = 8822,
110 login = os.getlogin(),
111 keepalive = True)
112 elif rep_type == "Local":
113 repository = repositoryFactory(repositoryType = rep_type,
114 root_dir = rootDir,
115 streamer = SimpleJobStreamer(),
116 local_root = os.path.expanduser('~'))
117 else:
118 print "Wrong type of repository..."
119 print "Exiting ..."
120 return
121 nn = tempfile.mktemp(suffix = '.test')
122 nn = os.path.join(output_dir, os.path.basename(nn))
123 ff = file(nn, 'w')
124 try:
125 jj = []
126 for i in range(NTEST):
127 j = Job()
128
129 j.name = "MyJob" + str(i)
130 j.application.args = 1000*['/abc']
131 jj.append(j)
132
133
134 t1 = _startText(ff, 'registering %d jobs...' % NTEST)
135 if DEBUG:
136 print 'registering %d jobs...' % NTEST
137 try:
138 repository.registerJobs(jj)
139 except Exception, e:
140 print "EXCEPTION in registerJobs", str(e)
141 if DEBUG:
142 print "--->command status", "FAIL", "\n"
143 else:
144 if DEBUG:
145 print "--->command status", "OK", "\n"
146 _endText(ff, t1)
147
148
149 t1 = _startText(ff, 'testing splitting of %d jobs...' % NTEST)
150 if DEBUG:
151 print 'testing splitting of %d jobs...' % NTEST
152 try:
153 for i in range(NTEST):
154 testSplitting(repository, LEN = 10)
155 except Exception, e:
156 print "EXCEPTION in testSplitting", str(e)
157 if DEBUG:
158 print "--->command status", "FAIL", "\n"
159 else:
160 if DEBUG:
161 print "--->command status", "OK", "\n"
162 _endText(ff, t1)
163
164
165 t1 = _startText(ff, 'retrieving info about first 10 jobs...')
166 if DEBUG:
167 print 'retrieving info about first 10 jobs...'
168 try:
169 rjj = repository.checkoutJobs(map(lambda j: j.id, jj[:10]))
170 except Exception, e:
171 print "EXCEPTION in checkoutJobs", str(e)
172 if DEBUG:
173 print "--->command status", "FAIL", "\n"
174 else:
175 if DEBUG:
176 print "--->checkout jobs", map(lambda j: j.id, rjj), "\n"
177 _endText(ff, t1)
178
179
180 t1 = _startText(ff, 'retrieving info about ALL jobs')
181 if DEBUG:
182 print 'retrieving info about ALL jobs'
183 try:
184 rjj = repository.checkoutJobs({})
185 except Exception, e:
186 print "EXCEPTION in checkoutJobs", str(e)
187 if DEBUG:
188 print "--->command status", "FAIL", "\n"
189 else:
190 if DEBUG:
191 print "--->checkout jobs", len(rjj), map(lambda j: j.id, rjj), "\n"
192 _endText(ff, t1)
193
194 for j in jj:
195 j.application = Executable()
196 try:
197 j.updateStatus('submitting')
198 except:
199 pass
200
201
202 t1 = _startText(ff, 'commiting %d jobs...' % NTEST)
203 if DEBUG:
204 print 'commiting %d jobs...' % NTEST
205 try:
206 repository.commitJobs(jj)
207 except Exception, e:
208 print "EXCEPTION in commitJobs", str(e)
209 if DEBUG:
210 print "--->command status", "FAIL", "\n"
211 else:
212 if DEBUG:
213 print "--->command status", "OK", "\n"
214 _endText(ff, t1)
215
216
217 t1 = _startText(ff, 'setting status for %d jobs...' % NTEST)
218 if DEBUG:
219 print 'setting status for %d jobs...' % NTEST
220 try:
221 repository.setJobsStatus(map(lambda j: (j.id, 'submitted'), jj))
222 except Exception, e:
223 print "EXCEPTION in setJobsStatus", str(e)
224 if DEBUG:
225 print "--->command status", "FAIL", "\n"
226 else:
227 if DEBUG:
228 print "--->command status", "OK", "\n"
229 _endText(ff, t1)
230
231
232 t1 = _startText(ff, 'getting status of first 10 jobs...')
233 if DEBUG:
234 print 'getting status of first 10 jobs...'
235 try:
236 rjj = repository.getJobsStatus(map(lambda j: j.id, jj[:10]))
237 except Exception, e:
238 print "EXCEPTION in getJobsStatus", str(e)
239 if DEBUG:
240 print "--->command status", "FAIL", "\n"
241 else:
242 if DEBUG:
243 print "--->command output", len(rjj), rjj, "\n"
244 _endText(ff, t1)
245
246
247 t1 = _startText(ff, 'getting id of jobs with particular attributes...')
248 if DEBUG:
249 print 'getting id of jobs with particular attributes...'
250 try:
251 rjj = repository.getJobIds({'status':'submitted', 'application':'Executable'})
252 except Exception, e:
253 print "EXCEPTION in getJobIds", str(e)
254 if DEBUG:
255 print "--->command status", "FAIL", "\n"
256 else:
257 if DEBUG:
258 print "--->command output", len(rjj), rjj, "\n"
259 _endText(ff, t1)
260
261
262 t1 = _startText(ff, 'retrieving info about ALL jobs')
263 rjj = repository.checkoutJobs({})
264 if DEBUG:
265 print 'retrieving info about ALL jobs'
266 jj_id = map(lambda j: j.id, jj)
267 st_lst = []
268 for j in rjj:
269 if j.id in jj_id:
270 st_lst.append((j.id, j.status))
271 print "--->command output", len(st_lst), st_lst, "\n"
272 _endText(ff, t1)
273
274
275 t1 = _startText(ff, 'deleting %d jobs...' % NTEST)
276 if DEBUG:
277 print 'deleting %d jobs...' % NTEST
278 try:
279 repository.deleteJobs(map(lambda j: j.id, jj))
280 except Exception, e:
281 print "EXCEPTION in deleteJobs", str(e)
282 if DEBUG:
283 print "--->command status", "FAIL", "\n"
284 else:
285 if DEBUG:
286 print "--->command status", "OK", "\n"
287 _endText(ff, t1)
288
289 finally:
290 ff.close()
291
292
294 if sys.platform == 'win32':
295 directory, file = os.path.split(path)
296 drive, tail = os.path.splitdrive(directory)
297 path_elem = tail.split(os.sep)
298 path = drive + os.sep
299 for i in range(len(path_elem)):
300 elem = path_elem[i]
301 if elem:
302 sub_elem = elem.split()
303 elem = sub_elem[0]
304 if len(elem) > 8 or len(sub_elem) > 1:
305 elem = elem[:6] + '~1'
306 path = os.path.join(path, elem)
307
308 path = os.path.join(path, file)
309
310 return path
311
312
313 if __name__ == '__main__':
314 NTEST = int(raw_input('Enter a number of job to test --->'))
315 NUSERS = int(raw_input('Enter a number of '"users"' to test --->'))
316 OUTPUT = raw_input('Enter a name of output dir --->')
317 REPTYPE= raw_input('Enter type of repository (Local[1] or Remote[2]) --->')
318 if REPTYPE == '1':
319 REPTYPE = 'Local'
320 elif REPTYPE == '2':
321 REPTYPE = 'Remote'
322 else:
323 print "Unknown type of repository. Exiting"
324 sys.exit(1)
325 dname = 'users_' + str(NUSERS) + '__jobs_' + str(NTEST)
326 output_dir = os.path.join(os.getcwd(), OUTPUT, REPTYPE, dname)
327 if not os.path.exists(output_dir):
328 os.makedirs(output_dir)
329 print "output dir is ", output_dir
330
331 python_path = NormPath(sys.executable)
332 i = 0
333 while i < NUSERS:
334 rootDir = '/testdir/GangaTest/user'
335 cmd = '"import sys\nsys.path.append(\'%s\')\nfrom ARDATest import runTest\nrunTest(%d, \'%s\',\'%s\',\'%s\')"' % (_thisDir, NTEST, rootDir, output_dir, REPTYPE)
336 if sys.version_info[:3] < (2,3,0) or sys.version_info[:3] >=(2,3,4):
337 cmd = cmd[1:-1]
338 if DEBUG:
339 print cmd
340 pid = os.spawnl(os.P_NOWAIT, python_path, python_path, "-c", cmd)
341 if DEBUG:
342 print "new user process started %d" % pid
343 i+=1
344