1 import os
2 import re
3 import tempfile
4 from types import *
5
6 from Ganga.GPIDev.Schema import *
7 from Ganga.GPIDev.Lib.File import *
8 from Ganga.GPIDev.Credentials import getCredential
9
10 from Ganga.Utility.Config import getConfig, ConfigError
11 from Ganga.Utility.logging import getLogger
12
13 from Ganga.Utility.GridShell import getShell
14
15 from Ganga.Lib.LCG.GridftpSandboxCache import GridftpFileIndex, GridftpSandboxCache
16
17 from Ganga.Lib.LCG.Utility import *
18
19
20 logger = getLogger()
21
23 '''Helper class to implement grid interaction'''
24
25
26 _attributes = ('middleware', 'credential', 'config', 'active', 'perusable')
27
54
60
62
63
64
65
66
67 prefix_hack = "${%s_LOCATION}/bin/" % self.middleware
68
69
70
71 if not binary:
72 prefix_hack = 'python '+prefix_hack
73
74 return prefix_hack
75
77
78
79
80 submit_option = ''
81
82 msg = 'use VO defined '
83
84 voms = self.__get_proxy_voname__()
85
86
87
88 if self.config['ConfigVO']:
89 if self.middleware == 'EDG':
90 submit_option = '--config-vo %s' % self.config['ConfigVO']
91 if not os.path.exists(self.config['ConfigVO']):
92
93 raise ConfigError('')
94 else:
95 msg += 'in %s.' % self.config['ConfigVO']
96 else:
97 logger.warning('ConfigVO configuration ignored by %s middleware. Set Config instead.' % self.middleware)
98
99 elif voms:
100 msg += 'in voms proxy: %s.' % voms
101
102 elif self.config['VirtualOrganisation']:
103 submit_option = '--vo %s' % self.config['VirtualOrganisation']
104 msg += 'in Ganga config: %s.' % self.config['VirtualOrganisation']
105
106 else:
107 logger.warning('No Virtual Organisation specified in the configuration. The plugin has been disabeled.')
108 return None
109
110
111
112
113 if self.config['Config']:
114 submit_option += ' --config %s' % self.config['Config']
115
116 submit_option = ' %s ' % submit_option
117
118 logger.debug(msg)
119
120 return submit_option
121
123 match_log = re.search(regxp_logfname,cmd_output)
124
125 logfile = None
126 if match_log:
127 logfile = match_log.group(1)
128 return logfile
129
138
154
164
166 '''Gets the LFC_HOST: from current shell or querying BDII on demand'''
167 lfc_host = None
168
169 if self.shell.env.has_key('LFC_HOST'):
170 lfc_host = self.shell.env['LFC_HOST']
171
172 if not lfc_host:
173 lfc_host = self.__get_default_lfc__()
174
175 return lfc_host
176
178 '''Gets the default lfc host from lcg-infosites'''
179
180 cmd = 'lcg-infosites'
181
182 logger.debug('%s lfc-infosites called ...' % self.middleware)
183
184 rc, output, m = self.shell.cmd1('%s --vo %s lfc' % (cmd,self.config['VirtualOrganisation']),allowed_exit=[0,255])
185
186 if rc != 0:
187
188 return None
189 else:
190 lfc_list = output.strip().split('\n')
191 return lfc_list[0]
192
194 '''Parsing the glite-wms-job-status log to get the glite jobs which have been removed from the WMS'''
195
196 logfile = self.__resolve_gridcmd_log_path__('(.*-job-status.*\.log)',cmd_output)
197
198 glite_ids = []
199
200 if logfile:
201
202 f = open(logfile,'r')
203 output = f.readlines()
204 f.close()
205
206 re_jid = re.compile('^Unable to retrieve the status for: (https:\/\/\S+:9000\/[0-9A-Za-z_\.\-]+)\s*$')
207 re_key = re.compile('^.*(no matching jobs found)\s*$')
208
209 m_jid = None
210 m_key = None
211 myjid = ''
212 for line in output:
213 m_jid = re_jid.match(line)
214 if m_jid:
215 myjid = m_jid.group(1)
216 m_jid = None
217
218 if myjid:
219 m_key = re_key.match(line)
220 if m_key:
221 glite_ids.append(myjid)
222 myjid = ''
223
224 return glite_ids
225
243
245 '''Returns a list of computing elements can run the job'''
246
247 re_ce = re.compile('^\s*\-\s*(\S+\:2119\/\S+)\s*$')
248
249 matched_ces = []
250
251 if self.middleware == 'EDG':
252 cmd = 'edg-job-list-match'
253 exec_bin = False
254 else:
255 cmd = 'glite-wms-job-list-match -a'
256 exec_bin = True
257
258 if not self.active:
259 logger.warning('LCG plugin not active.')
260 return
261
262 if not self.credential.isValid('01:00'):
263 logger.warning('GRID proxy lifetime shorter than 1 hour')
264 return
265
266 submit_opt = self.__set_submit_option__()
267
268 if not submit_opt:
269 return matched_ces
270 else:
271 cmd += submit_opt
272
273 cmd = '%s --noint %s' % (cmd, jdlpath)
274
275 logger.debug('job list match command: %s' % cmd)
276
277 rc, output, m = self.shell.cmd1('%s%s' % (self.__get_cmd_prefix_hack__(binary=exec_bin),cmd), allowed_exit=[0,255])
278
279 for l in output.split('\n'):
280
281 matches = re_ce.match(l)
282
283 if matches:
284 matched_ces.append(matches.group(1))
285
286 if ce:
287 if matched_ces.count(ce) > 0:
288 matched_ces = [ ce ]
289 else:
290 matched_ces = [ ]
291
292 logger.debug('== matched CEs ==')
293 for myce in matched_ces:
294 logger.debug(myce)
295 logger.debug('== matched CEs ==')
296
297 return matched_ces
298
299 - def submit(self, jdlpath, ce=None):
300 '''Submit a JDL file to LCG'''
301
302
303 if self.middleware == 'EDG':
304 cmd = 'edg-job-submit'
305 exec_bin = False
306 else:
307 cmd = 'glite-wms-job-submit -a'
308 exec_bin = True
309
310 if not self.active:
311 logger.warning('LCG plugin not active.')
312 return
313
314 if not self.credential.isValid('01:00'):
315 logger.warning('GRID proxy lifetime shorter than 1 hour')
316 return
317
318 submit_opt = self.__set_submit_option__()
319
320 if not submit_opt:
321 return
322 else:
323 cmd += submit_opt
324
325 if ce:
326 cmd = cmd + ' -r %s' % ce
327
328 cmd = '%s --nomsg %s < /dev/null' % (cmd,jdlpath)
329
330 logger.debug('job submit command: %s' % cmd)
331
332 rc, output, m = self.shell.cmd1('%s%s' % (self.__get_cmd_prefix_hack__(binary=exec_bin),cmd),
333 allowed_exit=[0,255],
334 timeout=self.config['SubmissionTimeout'])
335
336 if output: output = "%s" % output.strip()
337
338
339 match = re.search('.*(https:\/\/\S+:9000\/[0-9A-Za-z_\.\-]+)',output)
340
341 if match:
342 logger.debug('job id: %s' % match.group(1))
343 if self.middleware == 'GLITE' and self.perusable:
344 logger.info("Enabling perusal")
345 per_rc, per_out, per_m=self.shell.cmd1("glite-wms-job-perusal --set -f stdout %s" % match.group(1))
346
347
348 self.__clean_gridcmd_log__('(.*-job-submit.*\.log)',output)
349 return match.group(1)
350
351 else:
352 logger.warning('Job submission failed.')
353 self.__print_gridcmd_log__('(.*-job-submit.*\.log)',output)
354 return
355
357 '''Native bulk cancellation supported by GLITE middleware.'''
358
359 idsfile = tempfile.mktemp('.jids')
360 file(idsfile,'w').write('\n'.join(jobids)+'\n')
361
362 if self.middleware == 'EDG':
363 logger.warning('EDG middleware doesn\'t support bulk cancellation.')
364 return False
365 else:
366 cmd = 'glite-wms-job-cancel'
367 exec_bin = True
368
369 if not self.active:
370 logger.warning('LCG plugin not active.')
371 return False
372
373 if not self.credential.isValid('01:00'):
374 logger.warning('GRID proxy lifetime shorter than 1 hour')
375 return False
376
377 if not self.__set_submit_option__():
378 return False
379
380 cmd = '%s --noint -i %s' % (cmd, idsfile)
381
382 logger.debug('job cancel command: %s' % cmd)
383
384 rc, output, m = self.shell.cmd1('%s%s' % (self.__get_cmd_prefix_hack__(binary=exec_bin),cmd),allowed_exit=[0,255])
385
386 if rc != 0:
387 logger.warning('Job cancellation failed.')
388 self.__print_gridcmd_log__('(.*-job-cancel.*\.log)',output)
389 return False
390 else:
391
392 self.__clean_gridcmd_log__('(.*-job-cancel.*\.log)',output)
393 return True
394
395 - def status(self,jobids,is_collection=False):
396 '''Query the status of jobs on the grid'''
397
398 if not jobids: return ([],[])
399
400 idsfile = tempfile.mktemp('.jids')
401 file(idsfile,'w').write('\n'.join(jobids)+'\n')
402
403 if self.middleware == 'EDG':
404 cmd = 'edg-job-status'
405 exec_bin = False
406 else:
407 cmd = 'glite-wms-job-status'
408
409 exec_bin = True
410 if self.config['IgnoreGliteScriptHeader']:
411 exec_bin = False
412
413 if is_collection:
414 cmd = '%s -v 3' % cmd
415
416 if not self.active:
417 logger.warning('LCG plugin not active.')
418 return ([],[])
419 if not self.credential.isValid('01:00'):
420 logger.warning('GRID proxy lifetime shorter than 1 hour')
421 return ([],[])
422
423 cmd = '%s --noint -i %s' % (cmd,idsfile)
424 logger.debug('job status command: %s' % cmd)
425
426 rc, output, m = self.shell.cmd1('%s%s' % (self.__get_cmd_prefix_hack__(binary=exec_bin),cmd),
427 allowed_exit=[0,255],
428 timeout=self.config['StatusPollingTimeout'])
429 os.remove(idsfile)
430
431 missing_glite_jids = []
432 if rc != 0:
433 missing_glite_jids = self.__resolve_no_matching_jobs__(output)
434
435 if missing_glite_jids:
436 logger.info('some jobs removed from WMS, will set corresponding Ganga job to \'failed\' status')
437 logger.debug('jobs removed from WMS: %s' % repr(missing_glite_jids))
438 else:
439 self.__print_gridcmd_log__('(.*-job-status.*\.log)',output)
440
441
442 self.__clean_gridcmd_log__('(.*-job-status.*\.log)',output)
443
444 re_id = re.compile('^\s*Status info for the Job : (https://.*\S)\s*$')
445 re_status = re.compile('^\s*Current Status:\s+(.*\S)\s*$')
446
447
448
449
450 re_exit = re.compile('^\s*Exit code:\s+(.*\S)\s*$')
451 re_reason = re.compile('^\s*Status Reason:\s+(.*\S)\s*$')
452 re_dest = re.compile('^\s*Destination:\s+(.*\S)\s*$')
453
454
455 re_master = re.compile('^BOOKKEEPING INFORMATION:\s*$')
456 re_node = re.compile('^- Nodes information.*\s*$')
457
458
459 re_nodename = re.compile('^\s*NodeName\s*=\s*"(gsj_[0-9]+)";\s*$')
460
461 info = []
462 is_master = False
463 is_node = False
464
465 for line in output.split('\n'):
466
467 match = re_master.match(line)
468 if match:
469 is_master = True
470 is_node = False
471
472 continue
473
474 match = re_node.match(line)
475 if match:
476 is_master = False
477 is_node = True
478 continue
479
480 match = re_id.match(line)
481 if match:
482 info += [{ 'id' : match.group(1),
483 'name' : '',
484 'is_node': False,
485 'status' : '',
486 'exit' : '',
487 'reason' : '',
488 'destination' : '' }]
489 if is_node:
490 info[-1]['is_node'] = True
491
492
493
494 continue
495
496 match = re_nodename.match(line)
497 if match and is_node:
498 info[-1]['name'] = match.group(1)
499
500 continue
501
502 match = re_status.match(line)
503 if match:
504 info[-1]['status'] = match.group(1)
505 continue
506
507 match = re_exit.match(line)
508 if match:
509 info[-1]['exit'] = match.group(1)
510 continue
511
512 match = re_reason.match(line)
513 if match:
514 info[-1]['reason'] = match.group(1)
515 continue
516
517 match = re_dest.match(line)
518 if match:
519 info[-1]['destination'] = match.group(1)
520 continue
521
522 return (info, missing_glite_jids)
523
525 '''Fetch the logging info of the given job and save the output in the job's outputdir'''
526
527 idsfile = tempfile.mktemp('.jids')
528 file(idsfile,'w').write('\n'.join(jobids)+'\n')
529
530 if self.middleware == 'EDG':
531 cmd = 'edg-job-get-logging-info -v %d' % verbosity
532 exec_bin = False
533 else:
534 cmd = 'glite-wms-job-logging-info -v %d' % verbosity
535
536 exec_bin = True
537 if self.config['IgnoreGliteScriptHeader']:
538 exec_bin = False
539
540 if not self.active:
541 logger.warning('LCG plugin not active.')
542 return False
543 if not self.credential.isValid('01:00'):
544 logger.warning('GRID proxy lifetime shorter than 1 hour')
545 return False
546
547 log_output = directory+'/__jobloginfo__.log'
548
549 cmd = '%s --noint -o %s -i %s' % (cmd, log_output, idsfile)
550
551 logger.debug('job logging info command: %s' % cmd)
552
553 rc, output, m = self.shell.cmd1('%s%s' % (self.__get_cmd_prefix_hack__(binary=exec_bin),cmd),allowed_exit=[0,255])
554 os.remove(idsfile)
555
556 if rc != 0:
557 self.__print_gridcmd_log__('(.*-logging-info.*\.log)',output)
558 return False
559 else:
560
561 self.__clean_gridcmd_log__('(.*-logging-info.*\.log)',output)
562
563 return log_output
564
565 - def get_output(self,jobid,directory,wms_proxy=False):
566 '''Retrieve the output of a job on the grid'''
567
568 if self.middleware == 'EDG':
569 cmd = 'edg-job-get-output'
570 exec_bin = False
571 else:
572 cmd = 'glite-wms-job-output'
573 exec_bin = True
574
575 if self.config['Config']:
576 cmd += ' --config %s' % self.config['Config']
577
578 if not self.active:
579 logger.warning('LCG plugin is not active.')
580 return (False,None)
581 if not self.credential.isValid('01:00'):
582 logger.warning('GRID proxy lifetime shorter than 1 hour')
583 return (False,None)
584
585 cmd = '%s --noint --dir %s %s' % (cmd,directory,jobid)
586
587 logger.debug('job get output command: %s' % cmd)
588
589 rc, output, m = self.shell.cmd1('%s%s' % (self.__get_cmd_prefix_hack__(binary=exec_bin),cmd),allowed_exit=[0,255])
590
591 match = re.search('directory:\n\s*(\S+)\s*\n',output)
592
593 if not match:
594 logger.warning('Job output fetch failed.')
595 self.__print_gridcmd_log__('(.*-output.*\.log)',output)
596 return (False, 'cannot fetch job output')
597
598
599 self.__clean_gridcmd_log__('(.*-output.*\.log)',output)
600
601 outdir = match.group(1)
602
603
604
605
606 import urlparse
607 jid_hash = urlparse.urlparse(jobid)[2][1:]
608
609 if outdir.count(jid_hash):
610 if self.shell.system('mv %s/* %s' % (outdir,directory)) == 0:
611 try:
612 os.rmdir(outdir)
613 except Exception, msg:
614 logger.warning( "Error trying to remove the empty directory %s:\n%s" % ( outdir, msg ) )
615 else:
616 logger.warning( "Error moving output from %s to %s.\nOutput is left in %s." % (outdir,directory,outdir) )
617 else:
618 pass
619
620 return self.__get_app_exitcode__(directory)
621
623 '''Cancel multiple jobs in one LCG job cancellation call'''
624
625
626 if not jobids: return True
627
628 idsfile = tempfile.mktemp('.jids')
629 file(idsfile,'w').write('\n'.join(jobids)+'\n')
630
631
632 if self.middleware == 'EDG':
633 cmd = 'edg-job-cancel'
634 exec_bin = False
635 else:
636 cmd = 'glite-wms-job-cancel'
637 exec_bin = True
638
639 if not self.active:
640 logger.warning('LCG plugin is not active.')
641 return False
642 if not self.credential.isValid('01:00'):
643 logger.warning('GRID proxy lifetime shorter than 1 hour')
644 return False
645
646
647 cmd = '%s --noint -i %s' % (cmd,idsfile)
648
649 logger.debug('job cancel command: %s' % cmd)
650
651 rc, output, m = self.shell.cmd1('%s%s' % (self.__get_cmd_prefix_hack__(binary=exec_bin),cmd),allowed_exit=[0,255])
652
653 if rc == 0:
654
655 self.__clean_gridcmd_log__('(.*-job-cancel.*\.log)',output)
656 return True
657 else:
658 logger.warning( "Failed to cancel jobs.\n%s" % output )
659 self.__print_gridcmd_log__('(.*-job-cancel.*\.log)',output)
660 return False
661
663 '''Cancel a job'''
664
665 if self.middleware == 'EDG':
666 cmd = 'edg-job-cancel'
667 exec_bin = False
668 else:
669 cmd = 'glite-wms-job-cancel'
670 exec_bin = True
671
672 if not self.active:
673 logger.warning('LCG plugin is not active.')
674 return False
675 if not self.credential.isValid('01:00'):
676 logger.warning('GRID proxy lifetime shorter than 1 hour')
677 return False
678
679 cmd = '%s --noint %s' % (cmd,jobid)
680
681 logger.debug('job cancel command: %s' % cmd)
682
683 rc, output, m = self.shell.cmd1('%s%s' % (self.__get_cmd_prefix_hack__(binary=exec_bin),cmd),allowed_exit=[0,255])
684
685 if rc == 0:
686
687 self.__clean_gridcmd_log__('(.*-job-cancel.*\.log)',output)
688 return True
689 else:
690 logger.warning( "Failed to cancel job %s.\n%s" % ( jobid, output ) )
691 self.__print_gridcmd_log__('(.*-job-cancel.*\.log)',output)
692 return False
693
695 '''Parsing job status report from CREAM CE status query'''
696
697 jobInfoDict = {}
698
699 re_jid = re.compile('^\s+JobID\=\[(https:\/\/.*[:0-9]?\/CREAM.*)\]$')
700 re_log = re.compile('^\s+(\S+.*\S+)\s+\=\s+\[(.*)\]$')
701
702 re_jts = re.compile('^\s+Job status changes:$')
703 re_ts = re.compile('^\s+Status\s+\=\s+\[(.*)\]\s+\-\s+\[(.*)\]\s+\(([0-9]+)\)$')
704 re_cmd = re.compile('^\s+Issued Commands:$')
705
706
707 re_jnf = re.compile('^.*job not found.*$')
708
709 jid = None
710
711 for jlog in log.split('******')[1:]:
712
713 for l in jlog.split('\n'):
714 l.strip()
715
716 m = re_jid.match(l)
717
718 if m:
719 jid = m.group(1)
720 jobInfoDict[jid] = {}
721 continue
722
723 if re_jnf.match(l):
724 break
725
726 m = re_log.match(l)
727 if m:
728 att = m.group(1)
729 val = m.group(2)
730 jobInfoDict[jid][att] = val
731 continue
732
733 if re_jts.match(l):
734 jobInfoDict[jid]['Timestamps'] = {}
735 continue
736
737 m = re_ts.match(l)
738 if m:
739 s = m.group(1)
740 t = int(m.group(3))
741 jobInfoDict[jid]['Timestamps'][s] = t
742 continue
743
744 if re_cmd.match(l):
745 break
746
747 return jobInfoDict
748
750 '''checking if CREAM CE environment is set properly'''
751
752 if self.middleware.upper() != 'GLITE':
753 logger.warning('CREAM CE job operation not supported')
754 return False
755
756 if not self.active:
757 logger.warning('LCG plugin not active.')
758 return False
759
760 if not self.credential.isValid('01:00'):
761 logger.warning('GRID proxy lifetime shorter than 1 hour')
762 return False
763
764 return True
765
767 '''
768 Checking if the delegation id to given CE is still valid.
769 '''
770
771 id = None
772
773 try:
774 id, t_expire = self.proxy_id[ce]
775
776
777 now = time.time()
778
779
780 if t_expire - now <= 1800:
781 logger.debug('existing proxy going to expire in 1800 seconds.')
782 id = None
783 else:
784 logger.debug('reusing valid proxy %s on %s' % (id,ce))
785
786 except KeyError:
787 pass
788
789 return id
790
792 '''CREAM CE proxy delegation'''
793
794 if not self.__cream_ui_check__():
795 return
796
797 if not ce:
798 logger.warning('No CREAM CE endpoint specified')
799 return
800
801 mydelid = self.cream_check_delegated_proxy(ce)
802
803 if not mydelid:
804
805 logger.debug('making new proxy delegation to %s' % ce)
806
807 t_expire = 0
808
809 exec_bin = True
810
811 cmd = 'glite-ce-delegate-proxy'
812
813 cmd += ' -e %s' % ce.split('/cream')[0]
814
815 mydelid = '%s_%s' % (self.credential.identity(), get_uuid())
816
817 cmd = '%s %s' % (cmd, mydelid)
818
819 logger.debug('proxy delegation command: %s' % cmd)
820
821 rc, output, m = self.shell.cmd1('%s%s' % (self.__get_cmd_prefix_hack__(binary=exec_bin),cmd),
822 allowed_exit=[0,255],
823 timeout=self.config['SubmissionTimeout'])
824 if rc != 0:
825
826 logger.error('proxy delegation error: %s' % output)
827 mydelid = ''
828 else:
829
830
831 t_expire = time.time() + float( self.credential.timeleft(units="seconds", force_check=True) )
832
833 logger.debug('new proxy at %s valid until %s' % ( ce, time.ctime(t_expire) ) )
834
835 self.proxy_id[ce] = [mydelid, t_expire]
836
837 return mydelid
838
840 '''CREAM CE direct job submission'''
841
842 if not self.__cream_ui_check__():
843 return
844
845 if not ce:
846 logger.warning('No CREAM CE endpoint specified')
847 return
848
849 cmd = 'glite-ce-job-submit'
850 exec_bin = True
851
852 mydelid = self.cream_proxy_delegation(ce)
853
854 if mydelid:
855 cmd = cmd + ' -D %s' % mydelid
856 else:
857 cmd = cmd + ' -a'
858
859 cmd = cmd + ' -r %s' % ce
860
861 cmd = '%s --nomsg %s < /dev/null' % (cmd,jdlpath)
862
863 logger.debug('job submit command: %s' % cmd)
864
865 rc, output, m = self.shell.cmd1('%s%s' % (self.__get_cmd_prefix_hack__(binary=exec_bin),cmd),
866 allowed_exit=[0,255],
867 timeout=self.config['SubmissionTimeout'])
868
869 if output: output = "%s" % output.strip()
870
871 match = re.search('^(https:\/\/\S+:8443\/[0-9A-Za-z_\.\-]+)$',output)
872
873 if match:
874 logger.debug('job id: %s' % match.group(1))
875 return match.group(1)
876 else:
877 logger.warning('Job submission failed.')
878 return
879
881 '''CREAM CE job status query'''
882
883 if not self.__cream_ui_check__():
884 return ([],[])
885
886 if not jobids: return ([],[])
887
888 idsfile = tempfile.mktemp('.jids')
889 file(idsfile,'w').write('##CREAMJOBS##\n' + '\n'.join(jobids)+'\n')
890
891 cmd = 'glite-ce-job-status'
892 exec_bin = True
893
894 cmd = '%s -L 2 -n -i %s' % (cmd,idsfile)
895 logger.debug('job status command: %s' % cmd)
896
897 rc, output, m = self.shell.cmd1('%s%s' % (self.__get_cmd_prefix_hack__(binary=exec_bin),cmd),
898 allowed_exit=[0,255],
899 timeout=self.config['StatusPollingTimeout'])
900 jobInfoDict = {}
901 if rc == 0 and output:
902 jobInfoDict = self.__cream_parse_job_status__(output)
903
904 return jobInfoDict
905
907 '''CREAM CE job purging'''
908
909 if not self.__cream_ui_check__():
910 return False
911
912 idsfile = tempfile.mktemp('.jids')
913 file(idsfile,'w').write('##CREAMJOBS##\n' + '\n'.join(jobids)+'\n')
914
915 cmd = 'glite-ce-job-purge'
916 exec_bin = True
917
918 cmd = '%s -n -N -i %s' % (cmd,idsfile)
919
920 logger.debug('job purge command: %s' % cmd)
921
922 rc, output, m = self.shell.cmd1('%s%s' % (self.__get_cmd_prefix_hack__(binary=exec_bin),cmd),allowed_exit=[0,255])
923
924 logger.debug(output)
925
926 if rc == 0:
927 return True
928 else:
929 return False
930
932 '''CREAM CE job cancelling'''
933
934 if not self.__cream_ui_check__():
935 return False
936
937 idsfile = tempfile.mktemp('.jids')
938 file(idsfile,'w').write('##CREAMJOBS##\n' + '\n'.join(jobids)+'\n')
939
940 cmd = 'glite-ce-job-cancel'
941 exec_bin = True
942
943 cmd = '%s -n -N -i %s' % (cmd,idsfile)
944
945 logger.debug('job cancel command: %s' % cmd)
946
947 rc, output, m = self.shell.cmd1('%s%s' % (self.__get_cmd_prefix_hack__(binary=exec_bin),cmd),allowed_exit=[0,255])
948
949 logger.debug(output)
950
951 if rc == 0:
952 return True
953 else:
954 return False
955
974
976
977 import Ganga.Core.Sandbox as Sandbox
978
979 Sandbox.getPackedOutputSandbox(outputdir, outputdir)
980
981
982 app_exitcode = -1
983 runtime_log = os.path.join(outputdir, '__jobscript__.log')
984 pat = re.compile(r'.*exit code (\d+).')
985
986 if not os.path.exists(runtime_log):
987 logger.warning('job runtime log not found: %s' % runtime_log)
988 return (False, 'job runtime log not found: %s' % runtime_log)
989
990 f = open(runtime_log, 'r')
991 for line in f.readlines():
992 mat = pat.match(line)
993 if mat:
994 app_exitcode = eval(mat.groups()[0])
995 break
996 f.close()
997
998
999
1000 if app_exitcode != 0:
1001 logger.debug('job\'s executable returns non-zero exit code: %d' % app_exitcode)
1002 return (False, app_exitcode)
1003 else:
1004 return (True, 0)
1005
1007 '''Expand jdl items'''
1008
1009 text = ""
1010 for key, value in items.iteritems():
1011
1012 if key == 'Requirements':
1013 if value: text += 'Requirements = \n %s;\n' % ' &&\n '.join(value)
1014
1015 elif key in ['ShallowRetryCount','RetryCount','NodeNumber','ExpiryTime', 'PerusalTimeInterval']:
1016 try:
1017 value = int(value)
1018 if value<0: raise ValueError
1019 text+='%s = %d;\n' % (key,value)
1020 except ValueError:
1021 logger.warning('%s is not positive integer.' % key)
1022
1023 elif key == 'Environment':
1024 if value: text += 'Environment = {\n "%s"\n};\n' % '",\n "'.join(['%s=\'%s\'' % var for var in value.items()])
1025
1026 elif type(value) == ListType:
1027 if value: text += '%s = {\n "%s"\n};\n' % (key,'",\n "'.join(value))
1028
1029 elif key == 'Rank':
1030 text += 'Rank = ( %s );\n' % value
1031
1032 elif key == 'Nodes':
1033 text += 'Nodes = %s;\n' % value
1034
1035 elif key in ['PerusalFileEnable', 'AllowZippedISB']:
1036 text += '%s = %s;\n' % (key, value)
1037
1038 elif key in ['DataRequirements']:
1039 text += 'DataRequirements = { %s };\n' % value
1040 else:
1041 text += '%s = "%s";\n' % (key,value)
1042
1043 return text
1044
1045 expandjdl=staticmethod(expandjdl)
1046