1 """Module containing class for handling job submission to Remote backend"""
2
3 __author__ = "Mark Slater <mws@hep.ph.bham.ac.uk>"
4 __date__ = "10 June 2008"
5 __version__ = "1.0"
6
7 from Ganga.Core import Sandbox
8 from Ganga.GPIDev.Adapters.IBackend import IBackend
9 from Ganga.GPIDev.Lib.File.FileBuffer import FileBuffer
10 from Ganga.GPIDev.Schema import ComponentItem, Schema, SimpleItem, Version
11 from Ganga.Utility.ColourText import Foreground, Effects
12
13 import Ganga.Utility.Config
14 import Ganga.Utility.logging
15 logger = Ganga.Utility.logging.getLogger()
16
17 import commands
18 import inspect
19 import os
20 import time
21 from Ganga.Lib.Root import randomString
22
23 logger = Ganga.Utility.logging.getLogger()
24
26 """Shutdown the transport cleanly - otherwise python 2.5 throws a wobble"""
27 if (tr != None):
28 tr.close()
29 tr = None
30
32 """Remote backend - submit jobs to a Remote pool.
33
34 The remote backend works as an SSH tunnel to a remote site
35 where a ganga session is opened and the job submitted there
36 using the specified remote_backend. It is (in theory!)
37 transparent to the user and should allow submission of any jobs
38 to any backends that are already possible in Ganga.
39
40 NOTE: Due to the file transfers required, there can be some slow
41 down during submission and monitoring
42
43
44 E.g. 1 - Hello World example submitted to local backend:
45
46 j = Job(application=Executable(exe='/bin/echo',args=['Hello World']), backend="Remote")
47 j.backend.host = "bluebear.bham.ac.uk" # Host name
48 j.backend.username = "slatermw" # User name
49 j.backend.ganga_cmd = "/bb/projects/Ganga/runGanga" # Ganga Command line on remote site
50 j.backend.ganga_dir = "/bb/phy/slatermw/gangadir/remote_jobs" # Where to store the jobs
51 j.backend.remote_backend = Local()
52 j.submit()
53
54
55 E.g. 2 - Root example submitted to PBS backend:
56
57 r = Root()
58 r.version = '5.14.00'
59 r.script = 'gengaus.C'
60
61 j = Job(application=r,backend="Remote")
62 j.backend.host = "bluebear.bham.ac.uk"
63 j.backend.username = "slatermw"
64 j.backend.ganga_cmd = "/bb/projects/Ganga/runGanga"
65 j.backend.ganga_dir = "/bb/phy/slatermw/gangadir/remote_jobs"
66 j.outputsandbox = ['gaus.txt']
67 j.backend.remote_backend = PBS()
68 j.submit()
69
70
71 E.g. 3 - Athena example submitted to LCG backend
72 NOTE: you don't need a grid certificate (or UI) available on the local machine,
73 just the remote machine
74
75 j = Job()
76 j.name='Ex3_2_1'
77 j.application=Athena()
78 j.application.prepare(athena_compile=False)
79 j.application.option_file='/disk/f8b/home/mws/athena/testarea/13.0.40/PhysicsAnalysis/AnalysisCommon/UserAnalysis/run/AthExHelloWorld_jobOptions.py'
80
81 j.backend = Remote()
82 j.backend.host = "bluebear.bham.ac.uk"
83 j.backend.username = "slatermw"
84 j.backend.ganga_cmd = "/bb/projects/Ganga/runGanga"
85 j.backend.ganga_dir = "/bb/phy/slatermw/gangadir/remote_jobs"
86 j.backend.environment = {'ATLAS_VERSION' : '13.0.40'} # Additional environment variables
87 j.backend.remote_backend = LCG()
88 j.backend.remote_backend.CE = 'epgce2.ph.bham.ac.uk:2119/jobmanager-lcgpbs-short'
89
90 j.submit()
91
92 E.g. 4 - Hello World submitted at CERN on LSF using atlas startup
93
94 j = Job()
95 j.backend = Remote()
96 j.backend.host = "lxplus.cern.ch"
97 j.backend.username = "mslater"
98 j.backend.ganga_cmd = "ganga"
99 j.backend.ganga_dir = "/afs/cern.ch/user/m/mslater/gangadir/remote_jobs"
100 j.backend.pre_script = ['source /afs/cern.ch/sw/ganga/install/etc/setup-atlas.csh'] # source the atlas setup script before running ganga
101 j.backend.remote_backend = LSF()
102 j.submit()
103
104 """
105
106 _schema = Schema( Version( 1, 0 ), {\
107 "remote_backend": ComponentItem('backends',doc='specification of the resources to be used (e.g. batch system)'),
108 "host" : SimpleItem( defvalue="", doc="The remote host and port number ('host:port') to use. Default port is 22." ),
109 "ssh_key" : SimpleItem( defvalue="", doc="Set to true to the location of the the ssh key to use for authentication, e.g. /home/mws/.ssh/id_rsa. Note, you should make sure 'key_type' is also set correctly." ),
110 "key_type" : SimpleItem( defvalue="RSA", doc="Set to the type of ssh key to use (if required). Possible values are 'RSA' and 'DSS'."),
111 "username" : SimpleItem( defvalue="", doc="The username at the remote host" ),
112 "ganga_dir" : SimpleItem( defvalue="", doc="The directory to use for the remote workspace, repository, etc." ),
113 "ganga_cmd" : SimpleItem( defvalue="", doc="Command line to start ganga on the remote host" ),
114 "environment" : SimpleItem( defvalue={}, doc="Overides any environment variables set in the job" ),
115 "pre_script" : SimpleItem( defvalue=[''], doc="Sequence of commands to execute before running Ganga on the remote site" ),
116 'remote_job_id' : SimpleItem(defvalue=0,protected=1,copyable=0,doc='Remote job id.'),
117 'exitcode' : SimpleItem(defvalue=0,protected=1,copyable=0,doc='Application exit code'),
118 'actualCE' : SimpleItem(defvalue=0,protected=1,copyable=0,doc='Computing Element where the job actually runs.')
119 } )
120
121 _category = "backends"
122 _name = "Remote"
123
124 _port = 22
125 _transport = None
126 _sftp = None
127 _code = randomString()
128 _transportarray = None
129 _key = {}
130
133
138
140 job = self.getJobObject()
141 if job.status in [ 'submitted', 'running', 'completing' ]:
142
143
144
145 import os
146
147
148 script = """#!/usr/bin/env python
149 #-----------------------------------------------------
150 # This is a setup script for a remote job. It
151 # does very litte
152 #-----------------------------------------------------
153
154 # print a finished token
155 print "***_FINISHED_***"
156 """
157
158
159 if (self.opentransport() == False):
160 return False
161
162
163
164
165
166
167
168
169
170
171
172 return True
173
175
176 import paramiko
177 import getpass
178 import atexit
179
180 if (self._transport != None):
181
182 return
183
184
185 if Remote._transportarray != None:
186 for t in Remote._transportarray:
187 if (t != None) and (t[0] == self.username) and (t[1] == self.host):
188
189
190 if t[2] == None or t[3] == None:
191 logger.warning("Too many retries for remote host " + self.username + "@" + self.host + ". Restart Ganga to have another go.")
192 return False
193
194 self._transport = t[2]
195 self._sftp = t[3]
196
197
198
199 channel = self._transport.open_session()
200 channel.exec_command( 'mkdir -p ' + self.ganga_dir )
201 bufout = ""
202 while not channel.exit_status_ready():
203 if channel.recv_ready():
204 bufout = channel.recv(1024)
205
206 return
207
208
209 num_try = 0
210 password = ""
211 while num_try < 3:
212
213 try:
214 temp_host = self.host
215 temp_port = self._port
216 if self.host.find(":") != -1:
217
218 temp_port = eval( self.host[ self.host.find(":")+1 : ] )
219 temp_host = self.host[ : self.host.find(":") ]
220
221 self._transport = paramiko.Transport((temp_host, temp_port))
222
223
224 self._transport.setDaemon(True)
225
226
227 atexit.register(shutdown_transport, self._transport)
228
229 if self.ssh_key != "" and os.path.exists(os.path.expanduser( os.path.expandvars( self.ssh_key ) ) ):
230 privatekeyfile = os.path.expanduser( os.path.expandvars( self.ssh_key ) )
231
232 if not Remote._key.has_key(self.ssh_key):
233
234 if self.key_type == "RSA":
235 password = getpass.getpass('Enter passphrase for key \'%s\': ' % (self.ssh_key))
236 Remote._key[self.ssh_key] = paramiko.RSAKey.from_private_key_file(privatekeyfile,password=password )
237 elif self.key_type == "DSS":
238 password = getpass.getpass('Enter passphrase for key \'%s\': ' % (self.ssh_key))
239 Remote._key[self.ssh_key] = paramiko.DSSKey.from_private_key_file(privatekeyfile,password=password )
240 else:
241 logger.error("Unknown ssh key_type '%s'. Unable to connect." % self.key_type)
242 return False
243
244 self._transport.connect(username = self.username, pkey = Remote._key[self.ssh_key])
245 else:
246 password = getpass.getpass('Password for %s@%s: ' % (self.username, self.host))
247 self._transport.connect(username=self.username, password=password)
248
249
250
251 password = " "
252
253 channel = self._transport.open_session()
254 channel.exec_command( 'mkdir -p ' + self.ganga_dir )
255 self._sftp = paramiko.SFTPClient.from_transport(self._transport)
256
257
258 Remote._transportarray = [Remote._transportarray,
259 [self.username, self.host, self._transport, self._sftp]]
260 num_try = 1000
261
262 except:
263 logger.warning("Error when comunicating with remote host. Retrying...")
264 self._transport = None
265 self._sftp = None
266 if Remote._key.has_key( self.ssh_key ):
267 del Remote._key[self.ssh_key]
268
269 num_try = num_try + 1
270
271 if num_try == 3:
272 logger.error("Could not logon to remote host " + self.username + "@" + self.host + " after three attempts. Restart Ganga to have another go.")
273 Remote._transportarray = [Remote._transportarray,
274 [self.username, self.host, None, None]]
275 return False
276
277 return True
278
280 """Run a ganga script on the remote site"""
281
282 import getpass
283
284
285 cmd_str = ""
286 for c in pre_script:
287 cmd_str += c + '\n'
288
289 cmd_str += self.ganga_cmd + " -o\'[Configuration]gangadir=" + self.ganga_dir + "\' "
290 cmd_str += self.ganga_dir + script_name + '\n'
291 cmd_file = os.path.join(self.ganga_dir, "__gangacmd__" + randomString())
292 self._sftp.open( cmd_file, 'w').write(cmd_str)
293
294
295 channel = self._transport.open_session()
296 channel.exec_command("source " + cmd_file)
297
298
299 stdout = bufout = ""
300 stderr = buferr = ""
301 grid_ok = False
302
303 while not channel.exit_status_ready():
304
305 if channel.recv_ready():
306 bufout = channel.recv(1024)
307 stdout += bufout
308
309 if channel.recv_stderr_ready():
310 buferr = channel.recv_stderr(1024)
311 stderr += buferr
312
313 if stdout.find("***_FINISHED_***") != -1:
314 break
315
316 if (bufout.find("GRID pass") != -1 or buferr.find("GRID pass") != -1) :
317 grid_ok = True
318 password = getpass.getpass('Enter GRID pass phrase: ')
319 channel.send( password + "\n" )
320 password = ""
321
322 bufout = buferr = ""
323
324 self._sftp.remove( cmd_file )
325
326 return stdout, stderr
327
328 - def submit( self, jobconfig, master_input_sandbox ):
329 """Submit the job to the remote backend.
330
331 Return value: True if job is submitted successfully,
332 or False otherwise"""
333
334 import os
335 import getpass
336
337
338 fail = 0
339 if self.remote_backend == None:
340 logger.error("No backend specified for remote host.")
341 fail = 1
342 if self.host == "":
343 logger.error("No remote host specified.")
344 fail = 1
345 if self.username == "":
346 logger.error("No username specified.")
347 fail = 1
348 if self.ganga_dir == "":
349 logger.error("No remote ganga directory specified.")
350 fail = 1
351 if self.ganga_cmd == "":
352 logger.error("No ganga command specified.")
353 fail = 1
354
355 if fail:
356 return 0
357
358
359 if self.opentransport() == False:
360 return 0
361
362
363 job = self.getJobObject()
364 subjob_input_sandbox = job.createPackedInputSandbox(jobconfig.getSandboxFiles())
365 input_sandbox = subjob_input_sandbox + master_input_sandbox
366
367
368 sbx_name = '/__subjob_input_sbx__%s' % self._code
369 self._sftp.put(subjob_input_sandbox[0], self.ganga_dir + sbx_name)
370 sbx_name = '/__master_input_sbx__%s' % self._code
371 self._sftp.put(master_input_sandbox[0], self.ganga_dir + sbx_name )
372
373
374 scriptpath = self.preparejob(jobconfig,master_input_sandbox)
375
376
377 data = open(scriptpath, 'r').read()
378 script_name = '/__jobscript_run__%s.py' % self._code
379 self._sftp.open(self.ganga_dir + script_name, 'w').write(data)
380
381
382 stdout, stderr = self.run_remote_script( script_name, self.pre_script )
383
384
385 self._sftp.remove(self.ganga_dir + script_name)
386
387
388 if stdout.find("***_FINISHED_***") != -1:
389 status, outputdir, id, be = self.grabremoteinfo(stdout)
390
391 self.remote_job_id = id
392 if hasattr(self.remote_backend,'exitcode'):
393 self.exitcode = be.exitcode
394 if hasattr(self.remote_backend,'actualCE'):
395 self.actualCE = be.actualCE
396
397
398
399 for o in be._schema.allItems():
400 exec("self.remote_backend." + o[0] + " = be." + o[0])
401
402 return 1
403 else:
404 logger.error("Problem submitting the job on the remote site.")
405 logger.error("<last 1536 bytes of stderr>")
406 cut = stderr[ len(stderr) - 1536 : ]
407
408 for ln in cut.splitlines():
409 logger.error(ln)
410
411 logger.error("<end of last 1536 bytes of stderr>")
412
413 return 0
414
415
417 """Kill running job.
418
419 No arguments other than self
420
421 Return value: True if job killed successfully,
422 or False otherwise"""
423
424 script = """#!/usr/bin/env python
425 #-----------------------------------------------------
426 # This is a kill script for a remote job. It
427 # attempts to kill the given job and returns
428 #-----------------------------------------------------
429 import os,os.path,shutil,tempfile
430 import sys,popen2,time,traceback
431
432 ############################################################################################
433
434 ###INLINEMODULES###
435
436 ############################################################################################
437
438 code = ###CODE###
439 jid = ###JOBID###
440
441 j = jobs( jid )
442 j.kill()
443
444 # Start pickle token
445 print "***_START_PICKLE_***"
446
447 # pickle the job
448 import pickle
449 print j.outputdir
450 print pickle.dumps(j._impl)
451 print j
452
453 # print a finished token
454 print "***_END_PICKLE_***"
455 print "***_FINISHED_***"
456 """
457
458 script = script.replace('###CODE###', repr(self._code))
459 script = script.replace('###JOBID###', str(self.remote_job_id))
460
461
462 if (self.opentransport() == False):
463 return 0
464
465
466 script_name = '/__jobscript_kill__%s.py' % self._code
467 self._sftp.open(self.ganga_dir + script_name, 'w').write(script)
468
469
470 stdout, stderr = self.run_remote_script( script_name, self.pre_script )
471
472
473 if stdout.find("***_FINISHED_***") != -1:
474 status, outputdir, id, be = self.grabremoteinfo(stdout)
475
476 if status == 'killed':
477 return True
478
479 return False
480
482 """Remove the selected job from the remote site
483
484 No arguments other than self
485
486 Return value: True if job removed successfully,
487 or False otherwise"""
488
489 script = """#!/usr/bin/env python
490 #-----------------------------------------------------
491 # This is a remove script for a remote job. It
492 # attempts to kill the given job and returns
493 #-----------------------------------------------------
494 import os,os.path,shutil,tempfile
495 import sys,popen2,time,traceback
496
497 ############################################################################################
498
499 ###INLINEMODULES###
500
501 ############################################################################################
502
503 code = ###CODE###
504 jid = ###JOBID###
505
506 j = jobs( jid )
507 j.remove()
508
509 jobs( jid )
510
511 # print a finished token
512 print "***_FINISHED_***"
513 """
514
515 script = script.replace('###CODE###', repr(self._code))
516 script = script.replace('###JOBID###', str(self.remote_job_id))
517
518
519 if (self.opentransport() == False):
520 return 0
521
522
523 script_name = '/__jobscript_remove__%s.py' % self._code
524 self._sftp.open(self.ganga_dir + script_name, 'w').write(script)
525
526
527 stdout, stderr = self.run_remote_script( script_name, self.pre_script )
528
529
530 if stdout.find("***_FINISHED_***") != -1:
531 return True
532
533 return False
534
536 """Resubmit the job.
537
538 No arguments other than self
539
540 Return value: 1 if job was resubmitted,
541 or 0 otherwise"""
542
543 script = """#!/usr/bin/env python
544 #-----------------------------------------------------
545 # This is a resubmit script for a remote job. It
546 # attempts to kill the given job and returns
547 #-----------------------------------------------------
548 import os,os.path,shutil,tempfile
549 import sys,popen2,time,traceback
550
551 ############################################################################################
552
553 ###INLINEMODULES###
554
555 ############################################################################################
556
557 code = ###CODE###
558 jid = ###JOBID###
559
560 j = jobs( jid )
561 j.resubmit()
562
563 # Start pickle token
564 print "***_START_PICKLE_***"
565
566 # pickle the job
567 import pickle
568 print j.outputdir
569 print pickle.dumps(j._impl)
570 print j
571
572 # print a finished token
573 print "***_END_PICKLE_***"
574 print "***_FINISHED_***"
575 """
576
577 script = script.replace('###CODE###', repr(self._code))
578 script = script.replace('###JOBID###', str(self.remote_job_id))
579
580
581 if (self.opentransport() == False):
582 return 0
583
584
585 script_name = '/__jobscript_resubmit__%s.py' % self._code
586 self._sftp.open(self.ganga_dir + script_name, 'w').write(script)
587
588
589 stdout, stderr = self.run_remote_script( script_name, self.pre_script )
590
591
592 if stdout.find("***_FINISHED_***") != -1:
593 status, outputdir, id, be = self.grabremoteinfo(stdout)
594
595 if status == 'submitted' or status == 'running':
596 return 1
597
598 return 0
599
615
616 - def preparejob( self, jobconfig, master_input_sandbox ):
617 """Prepare the script to create the job on the remote host"""
618
619 from Ganga.Utility import tempfile
620
621 workdir = tempfile.mkdtemp()
622 job = self.getJobObject()
623
624 script = """#!/usr/bin/env python
625 #-----------------------------------------------------
626 # This job wrapper script is automatically created by
627 # GANGA Remote backend handler.
628 #
629 # It controls:
630 # 1. unpack input sandbox
631 # 2. create the new job
632 # 3. submit it
633 #-----------------------------------------------------
634 import os,os.path,shutil,tempfile
635 import sys,popen2,time,traceback
636 import tarfile
637
638 ############################################################################################
639
640 ###INLINEMODULES###
641
642 ############################################################################################
643
644 j = Job()
645
646 output_sandbox = ###OUTPUTSANDBOX###
647 input_sandbox = ###INPUTSANDBOX###
648 appexec = ###APPLICATIONEXEC###
649 appargs = ###APPLICATIONARGS###
650 back_end = ###BACKEND###
651 ganga_dir = ###GANGADIR###
652 code = ###CODE###
653 environment = ###ENVIRONMENT###
654 user_env = ###USERENV###
655
656 if user_env != None:
657 for env_var in user_env:
658 environment[env_var] = user_env[env_var]
659
660 j.outputsandbox = output_sandbox
661 j.backend = back_end
662
663 # Unpack the input sandboxes
664 shutil.move(os.path.expanduser(ganga_dir + "/__subjob_input_sbx__" + code), j.inputdir+"/__subjob_input_sbx__")
665 shutil.move(os.path.expanduser(ganga_dir + "/__master_input_sbx__" + code), j.inputdir+"/__master_input_sbx__")
666
667 # Add the files in the sandbox to the job
668 inputsbx = []
669 fullsbxlist = []
670 try:
671 tar = tarfile.open(j.inputdir+"/__master_input_sbx__")
672 filelist = tar.getnames()
673 print filelist
674
675 for f in filelist:
676 fullsbxlist.append( f )
677 inputsbx.append( j.inputdir + "/" + f )
678
679 except:
680 print "Unable to open master input sandbox"
681
682 try:
683 tar = tarfile.open(j.inputdir+"/__subjob_input_sbx__")
684 filelist = tar.getnames()
685
686 for f in filelist:
687 fullsbxlist.append( f )
688 inputsbx.append( j.inputdir + "/" + f )
689
690 except:
691 print "Unable to open subjob input sandbox"
692
693 # sort out the path of the exe
694 if appexec in fullsbxlist:
695 j.application = Executable ( exe = File(os.path.join(j.inputdir, appexec)), args = appargs, env = environment )
696 print "Script found: %s" % appexec
697 else:
698 j.application = Executable ( exe = appexec, args = appargs, env = environment )
699
700
701 j.inputsandbox = inputsbx
702
703 getPackedInputSandbox(j.inputdir+"/__subjob_input_sbx__", j.inputdir + "/.")
704 getPackedInputSandbox(j.inputdir+"/__master_input_sbx__", j.inputdir + "/.")
705
706 # submit the job
707 j.submit()
708
709 # Start pickle token
710 print "***_START_PICKLE_***"
711
712 # pickle the job
713 import pickle
714 print j.outputdir
715 print pickle.dumps(j._impl)
716
717 # print a finished token
718 print "***_END_PICKLE_***"
719 print "***_FINISHED_***"
720 """
721 import inspect
722 import Ganga.Core.Sandbox as Sandbox
723 script = script.replace('###ENVIRONMENT###', repr(jobconfig.env) )
724 script = script.replace('###USERENV###', repr(self.environment) )
725 script = script.replace('###INLINEMODULES###', inspect.getsource(Sandbox.WNSandbox))
726 script = script.replace('###OUTPUTSANDBOX###', repr(jobconfig.outputbox))
727 script = script.replace('###APPLICATIONEXEC###',repr(os.path.basename(jobconfig.getExeString())))
728 script = script.replace('###APPLICATIONARGS###',repr(jobconfig.getArgStrings()))
729
730
731 import StringIO
732 be_out = StringIO.StringIO()
733 job.backend.remote_backend.printTree( be_out, "copyable" )
734 be_str = be_out.getvalue()
735 script = script.replace('###BACKEND###', be_str)
736
737 script = script.replace('###GANGADIR###', repr(self.ganga_dir))
738 script = script.replace('###CODE###', repr(self._code))
739
740 sandbox_list = jobconfig.getSandboxFiles()
741
742 str_list = "[ "
743 for fname in sandbox_list:
744 str_list += "j.inputdir + '/' + " + repr(os.path.basename( fname.name ))
745 str_list += ", "
746
747 str_list += "j.inputdir + '/__master_input_sbx__' ]"
748
749 script = script.replace('###INPUTSANDBOX###', str_list)
750 return job.getInputWorkspace().writefile(FileBuffer('__jobscript__.py',script),executable=0)
751
877
878 updateMonitoringInformation = \
879 staticmethod( updateMonitoringInformation )
880