Package Ganga :: Package Lib :: Package Remote :: Module Remote'
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Lib.Remote.Remote'

  1  """Module containing class for handling job submission to Remote backend""" 
  2   
  3  __author__  = "Mark Slater <mws@hep.ph.bham.ac.uk>" 
  4  __date__    = "10 June 2008" 
  5  __version__ = "1.0" 
  6   
  7  from Ganga.Core import Sandbox 
  8  from Ganga.GPIDev.Adapters.IBackend import IBackend 
  9  from Ganga.GPIDev.Lib.File.FileBuffer import FileBuffer 
 10  from Ganga.GPIDev.Schema import ComponentItem, Schema, SimpleItem, Version 
 11  from Ganga.Utility.ColourText import Foreground, Effects 
 12   
 13  import Ganga.Utility.Config  
 14  import Ganga.Utility.logging 
 15  logger = Ganga.Utility.logging.getLogger() 
 16   
 17  import commands 
 18  import inspect 
 19  import os 
 20  import time 
 21  from Ganga.Lib.Root import randomString 
 22   
 23  logger = Ganga.Utility.logging.getLogger() 
 24   
25 -def shutdown_transport( tr ):
26 """Shutdown the transport cleanly - otherwise python 2.5 throws a wobble""" 27 if (tr != None): 28 tr.close() 29 tr = None
30
31 -class Remote( IBackend ):
32 """Remote backend - submit jobs to a Remote pool. 33 34 The remote backend works as an SSH tunnel to a remote site 35 where a ganga session is opened and the job submitted there 36 using the specified remote_backend. It is (in theory!) 37 transparent to the user and should allow submission of any jobs 38 to any backends that are already possible in Ganga. 39 40 NOTE: Due to the file transfers required, there can be some slow 41 down during submission and monitoring 42 43 44 E.g. 1 - Hello World example submitted to local backend: 45 46 j = Job(application=Executable(exe='/bin/echo',args=['Hello World']), backend="Remote") 47 j.backend.host = "bluebear.bham.ac.uk" # Host name 48 j.backend.username = "slatermw" # User name 49 j.backend.ganga_cmd = "/bb/projects/Ganga/runGanga" # Ganga Command line on remote site 50 j.backend.ganga_dir = "/bb/phy/slatermw/gangadir/remote_jobs" # Where to store the jobs 51 j.backend.remote_backend = Local() 52 j.submit() 53 54 55 E.g. 2 - Root example submitted to PBS backend: 56 57 r = Root() 58 r.version = '5.14.00' 59 r.script = 'gengaus.C' 60 61 j = Job(application=r,backend="Remote") 62 j.backend.host = "bluebear.bham.ac.uk" 63 j.backend.username = "slatermw" 64 j.backend.ganga_cmd = "/bb/projects/Ganga/runGanga" 65 j.backend.ganga_dir = "/bb/phy/slatermw/gangadir/remote_jobs" 66 j.outputsandbox = ['gaus.txt'] 67 j.backend.remote_backend = PBS() 68 j.submit() 69 70 71 E.g. 3 - Athena example submitted to LCG backend 72 NOTE: you don't need a grid certificate (or UI) available on the local machine, 73 just the remote machine 74 75 j = Job() 76 j.name='Ex3_2_1' 77 j.application=Athena() 78 j.application.prepare(athena_compile=False) 79 j.application.option_file='/disk/f8b/home/mws/athena/testarea/13.0.40/PhysicsAnalysis/AnalysisCommon/UserAnalysis/run/AthExHelloWorld_jobOptions.py' 80 81 j.backend = Remote() 82 j.backend.host = "bluebear.bham.ac.uk" 83 j.backend.username = "slatermw" 84 j.backend.ganga_cmd = "/bb/projects/Ganga/runGanga" 85 j.backend.ganga_dir = "/bb/phy/slatermw/gangadir/remote_jobs" 86 j.backend.environment = {'ATLAS_VERSION' : '13.0.40'} # Additional environment variables 87 j.backend.remote_backend = LCG() 88 j.backend.remote_backend.CE = 'epgce2.ph.bham.ac.uk:2119/jobmanager-lcgpbs-short' 89 90 j.submit() 91 92 E.g. 4 - Hello World submitted at CERN on LSF using atlas startup 93 94 j = Job() 95 j.backend = Remote() 96 j.backend.host = "lxplus.cern.ch" 97 j.backend.username = "mslater" 98 j.backend.ganga_cmd = "ganga" 99 j.backend.ganga_dir = "/afs/cern.ch/user/m/mslater/gangadir/remote_jobs" 100 j.backend.pre_script = ['source /afs/cern.ch/sw/ganga/install/etc/setup-atlas.csh'] # source the atlas setup script before running ganga 101 j.backend.remote_backend = LSF() 102 j.submit() 103 104 """ 105 106 _schema = Schema( Version( 1, 0 ), {\ 107 "remote_backend": ComponentItem('backends',doc='specification of the resources to be used (e.g. batch system)'), 108 "host" : SimpleItem( defvalue="", doc="The remote host and port number ('host:port') to use. Default port is 22." ), 109 "ssh_key" : SimpleItem( defvalue="", doc="Set to true to the location of the the ssh key to use for authentication, e.g. /home/mws/.ssh/id_rsa. Note, you should make sure 'key_type' is also set correctly." ), 110 "key_type" : SimpleItem( defvalue="RSA", doc="Set to the type of ssh key to use (if required). Possible values are 'RSA' and 'DSS'."), 111 "username" : SimpleItem( defvalue="", doc="The username at the remote host" ), 112 "ganga_dir" : SimpleItem( defvalue="", doc="The directory to use for the remote workspace, repository, etc." ), 113 "ganga_cmd" : SimpleItem( defvalue="", doc="Command line to start ganga on the remote host" ), 114 "environment" : SimpleItem( defvalue={}, doc="Overides any environment variables set in the job" ), 115 "pre_script" : SimpleItem( defvalue=[''], doc="Sequence of commands to execute before running Ganga on the remote site" ), 116 'remote_job_id' : SimpleItem(defvalue=0,protected=1,copyable=0,doc='Remote job id.'), 117 'exitcode' : SimpleItem(defvalue=0,protected=1,copyable=0,doc='Application exit code'), 118 'actualCE' : SimpleItem(defvalue=0,protected=1,copyable=0,doc='Computing Element where the job actually runs.') 119 } ) 120 121 _category = "backends" 122 _name = "Remote" 123 #_hidden = False # KUBA: temporarily disabled from the public 124 _port = 22 125 _transport = None 126 _sftp = None 127 _code = randomString() 128 _transportarray = None 129 _key = {} 130
131 - def __init__( self ):
132 super( Remote, self ).__init__()
133
134 - def __del__( self ):
135 if (self._transport != None): 136 self._transport.close() 137 self._transport = None
138
139 - def setup( self ): # KUBA: generic setup hook
140 job = self.getJobObject() 141 if job.status in [ 'submitted', 'running', 'completing' ]: 142 143 # Send a script over to the remote site that updates this jobs 144 # info with the info of the remote job 145 import os 146 147 # Create a ganga script that updates the job info from the remote site 148 script = """#!/usr/bin/env python 149 #----------------------------------------------------- 150 # This is a setup script for a remote job. It 151 # does very litte 152 #----------------------------------------------------- 153 154 # print a finished token 155 print "***_FINISHED_***" 156 """ 157 158 # check for the connection 159 if (self.opentransport() == False): 160 return False 161 162 # send the script 163 #script_name = '/__setupscript__%s.py' % self._code 164 #self._sftp.open(self.ganga_dir + script_name, 'w').write(script) 165 166 # run the script 167 #stdout, stderr = self.run_remote_script( script_name, self.pre_script ) 168 169 # remove the script 170 #self._sftp.remove(self.ganga_dir + script_name) 171 172 return True
173
174 - def opentransport( self ):
175 176 import paramiko 177 import getpass 178 import atexit 179 180 if (self._transport != None): 181 # transport is open 182 return 183 184 # check for a useable transport for this username and host 185 if Remote._transportarray != None: 186 for t in Remote._transportarray: 187 if (t != None) and (t[0] == self.username) and (t[1] == self.host): 188 189 # check for too many retries on the same host 190 if t[2] == None or t[3] == None: 191 logger.warning("Too many retries for remote host " + self.username + "@" + self.host + ". Restart Ganga to have another go.") 192 return False 193 194 self._transport = t[2] 195 self._sftp = t[3] 196 197 # ensure that the remote dir is still there - it will crash if the dir structure 198 # changes with the sftp sill open 199 channel = self._transport.open_session() 200 channel.exec_command( 'mkdir -p ' + self.ganga_dir ) 201 bufout = "" 202 while not channel.exit_status_ready(): 203 if channel.recv_ready(): 204 bufout = channel.recv(1024) 205 206 return 207 208 # Ask user for password - give three tries 209 num_try = 0 210 password = "" 211 while num_try < 3: 212 213 try: 214 temp_host = self.host 215 temp_port = self._port 216 if self.host.find(":") != -1: 217 # user specified port 218 temp_port = eval( self.host[ self.host.find(":")+1 : ] ) 219 temp_host = self.host[ : self.host.find(":") ] 220 221 self._transport = paramiko.Transport((temp_host, temp_port)) 222 223 # avoid hang on exit my daemonising the thread 224 self._transport.setDaemon(True) 225 226 # register for proper shutdown 227 atexit.register(shutdown_transport, self._transport) 228 229 if self.ssh_key != "" and os.path.exists(os.path.expanduser( os.path.expandvars( self.ssh_key ) ) ): 230 privatekeyfile = os.path.expanduser( os.path.expandvars( self.ssh_key ) ) 231 232 if not Remote._key.has_key(self.ssh_key): 233 234 if self.key_type == "RSA": 235 password = getpass.getpass('Enter passphrase for key \'%s\': ' % (self.ssh_key)) 236 Remote._key[self.ssh_key] = paramiko.RSAKey.from_private_key_file(privatekeyfile,password=password ) 237 elif self.key_type == "DSS": 238 password = getpass.getpass('Enter passphrase for key \'%s\': ' % (self.ssh_key)) 239 Remote._key[self.ssh_key] = paramiko.DSSKey.from_private_key_file(privatekeyfile,password=password ) 240 else: 241 logger.error("Unknown ssh key_type '%s'. Unable to connect." % self.key_type) 242 return False 243 244 self._transport.connect(username = self.username, pkey = Remote._key[self.ssh_key]) 245 else: 246 password = getpass.getpass('Password for %s@%s: ' % (self.username, self.host)) 247 self._transport.connect(username=self.username, password=password) 248 249 250 # blank the password just in case 251 password = " " 252 253 channel = self._transport.open_session() 254 channel.exec_command( 'mkdir -p ' + self.ganga_dir ) 255 self._sftp = paramiko.SFTPClient.from_transport(self._transport) 256 257 # Add to the transport array 258 Remote._transportarray = [Remote._transportarray, 259 [self.username, self.host, self._transport, self._sftp]] 260 num_try = 1000 261 262 except: 263 logger.warning("Error when comunicating with remote host. Retrying...") 264 self._transport = None 265 self._sftp = None 266 if Remote._key.has_key( self.ssh_key ): 267 del Remote._key[self.ssh_key] 268 269 num_try = num_try + 1 270 271 if num_try == 3: 272 logger.error("Could not logon to remote host " + self.username + "@" + self.host + " after three attempts. Restart Ganga to have another go.") 273 Remote._transportarray = [Remote._transportarray, 274 [self.username, self.host, None, None]] 275 return False 276 277 return True
278
279 - def run_remote_script( self, script_name, pre_script ):
280 """Run a ganga script on the remote site""" 281 282 import getpass 283 284 # Set up a command file to source. This gets around a silly alias problem 285 cmd_str = "" 286 for c in pre_script: 287 cmd_str += c + '\n' 288 289 cmd_str += self.ganga_cmd + " -o\'[Configuration]gangadir=" + self.ganga_dir + "\' " 290 cmd_str += self.ganga_dir + script_name + '\n' 291 cmd_file = os.path.join(self.ganga_dir, "__gangacmd__" + randomString()) 292 self._sftp.open( cmd_file, 'w').write(cmd_str) 293 294 # run ganga command 295 channel = self._transport.open_session() 296 channel.exec_command("source " + cmd_file) 297 298 # Read the output after command 299 stdout = bufout = "" 300 stderr = buferr = "" 301 grid_ok = False 302 303 while not channel.exit_status_ready(): 304 305 if channel.recv_ready(): 306 bufout = channel.recv(1024) 307 stdout += bufout 308 309 if channel.recv_stderr_ready(): 310 buferr = channel.recv_stderr(1024) 311 stderr += buferr 312 313 if stdout.find("***_FINISHED_***") != -1: 314 break 315 316 if (bufout.find("GRID pass") != -1 or buferr.find("GRID pass") != -1) : 317 grid_ok = True 318 password = getpass.getpass('Enter GRID pass phrase: ') 319 channel.send( password + "\n" ) 320 password = "" 321 322 bufout = buferr = "" 323 324 self._sftp.remove( cmd_file ) 325 326 return stdout, stderr
327
328 - def submit( self, jobconfig, master_input_sandbox ):
329 """Submit the job to the remote backend. 330 331 Return value: True if job is submitted successfully, 332 or False otherwise""" 333 334 import os 335 import getpass 336 337 # First some sanity checks... 338 fail = 0 339 if self.remote_backend == None: 340 logger.error("No backend specified for remote host.") 341 fail = 1 342 if self.host == "": 343 logger.error("No remote host specified.") 344 fail = 1 345 if self.username == "": 346 logger.error("No username specified.") 347 fail = 1 348 if self.ganga_dir == "": 349 logger.error("No remote ganga directory specified.") 350 fail = 1 351 if self.ganga_cmd == "": 352 logger.error("No ganga command specified.") 353 fail = 1 354 355 if fail: 356 return 0 357 358 # initiate the connection 359 if self.opentransport() == False: 360 return 0 361 362 # Tar up the input sandbox and copy to the remote cluster 363 job = self.getJobObject() 364 subjob_input_sandbox = job.createPackedInputSandbox(jobconfig.getSandboxFiles()) 365 input_sandbox = subjob_input_sandbox + master_input_sandbox 366 367 # send the sandbox 368 sbx_name = '/__subjob_input_sbx__%s' % self._code 369 self._sftp.put(subjob_input_sandbox[0], self.ganga_dir + sbx_name) 370 sbx_name = '/__master_input_sbx__%s' % self._code 371 self._sftp.put(master_input_sandbox[0], self.ganga_dir + sbx_name ) 372 373 # run the submit script on the remote cluster 374 scriptpath = self.preparejob(jobconfig,master_input_sandbox) 375 376 # send the script 377 data = open(scriptpath, 'r').read() 378 script_name = '/__jobscript_run__%s.py' % self._code 379 self._sftp.open(self.ganga_dir + script_name, 'w').write(data) 380 381 # run the script 382 stdout, stderr = self.run_remote_script( script_name, self.pre_script ) 383 384 # delete the jobscript 385 self._sftp.remove(self.ganga_dir + script_name) 386 387 # Copy the job object 388 if stdout.find("***_FINISHED_***") != -1: 389 status, outputdir, id, be = self.grabremoteinfo(stdout) 390 391 self.remote_job_id = id 392 if hasattr(self.remote_backend,'exitcode'): 393 self.exitcode = be.exitcode 394 if hasattr(self.remote_backend,'actualCE'): 395 self.actualCE = be.actualCE 396 397 # copy each variable in the schema 398 # Please can someone tell me why I can't just do self.remote_backend = be? 399 for o in be._schema.allItems(): 400 exec("self.remote_backend." + o[0] + " = be." + o[0]) 401 402 return 1 403 else: 404 logger.error("Problem submitting the job on the remote site.") 405 logger.error("<last 1536 bytes of stderr>") 406 cut = stderr[ len(stderr) - 1536 : ] 407 408 for ln in cut.splitlines(): 409 logger.error(ln) 410 411 logger.error("<end of last 1536 bytes of stderr>") 412 413 return 0
414 415
416 - def kill( self ):
417 """Kill running job. 418 419 No arguments other than self 420 421 Return value: True if job killed successfully, 422 or False otherwise""" 423 424 script = """#!/usr/bin/env python 425 #----------------------------------------------------- 426 # This is a kill script for a remote job. It 427 # attempts to kill the given job and returns 428 #----------------------------------------------------- 429 import os,os.path,shutil,tempfile 430 import sys,popen2,time,traceback 431 432 ############################################################################################ 433 434 ###INLINEMODULES### 435 436 ############################################################################################ 437 438 code = ###CODE### 439 jid = ###JOBID### 440 441 j = jobs( jid ) 442 j.kill() 443 444 # Start pickle token 445 print "***_START_PICKLE_***" 446 447 # pickle the job 448 import pickle 449 print j.outputdir 450 print pickle.dumps(j._impl) 451 print j 452 453 # print a finished token 454 print "***_END_PICKLE_***" 455 print "***_FINISHED_***" 456 """ 457 458 script = script.replace('###CODE###', repr(self._code)) 459 script = script.replace('###JOBID###', str(self.remote_job_id)) 460 461 # check for the connection 462 if (self.opentransport() == False): 463 return 0 464 465 # send the script 466 script_name = '/__jobscript_kill__%s.py' % self._code 467 self._sftp.open(self.ganga_dir + script_name, 'w').write(script) 468 469 # run the script 470 stdout, stderr = self.run_remote_script( script_name, self.pre_script ) 471 472 # Copy the job object 473 if stdout.find("***_FINISHED_***") != -1: 474 status, outputdir, id, be = self.grabremoteinfo(stdout) 475 476 if status == 'killed': 477 return True 478 479 return False
480
481 - def remove( self ):
482 """Remove the selected job from the remote site 483 484 No arguments other than self 485 486 Return value: True if job removed successfully, 487 or False otherwise""" 488 489 script = """#!/usr/bin/env python 490 #----------------------------------------------------- 491 # This is a remove script for a remote job. It 492 # attempts to kill the given job and returns 493 #----------------------------------------------------- 494 import os,os.path,shutil,tempfile 495 import sys,popen2,time,traceback 496 497 ############################################################################################ 498 499 ###INLINEMODULES### 500 501 ############################################################################################ 502 503 code = ###CODE### 504 jid = ###JOBID### 505 506 j = jobs( jid ) 507 j.remove() 508 509 jobs( jid ) 510 511 # print a finished token 512 print "***_FINISHED_***" 513 """ 514 515 script = script.replace('###CODE###', repr(self._code)) 516 script = script.replace('###JOBID###', str(self.remote_job_id)) 517 518 # check for the connection 519 if (self.opentransport() == False): 520 return 0 521 522 # send the script 523 script_name = '/__jobscript_remove__%s.py' % self._code 524 self._sftp.open(self.ganga_dir + script_name, 'w').write(script) 525 526 # run the script 527 stdout, stderr = self.run_remote_script( script_name, self.pre_script ) 528 529 # Copy the job object 530 if stdout.find("***_FINISHED_***") != -1: 531 return True 532 533 return False
534
535 - def resubmit( self ):
536 """Resubmit the job. 537 538 No arguments other than self 539 540 Return value: 1 if job was resubmitted, 541 or 0 otherwise""" 542 543 script = """#!/usr/bin/env python 544 #----------------------------------------------------- 545 # This is a resubmit script for a remote job. It 546 # attempts to kill the given job and returns 547 #----------------------------------------------------- 548 import os,os.path,shutil,tempfile 549 import sys,popen2,time,traceback 550 551 ############################################################################################ 552 553 ###INLINEMODULES### 554 555 ############################################################################################ 556 557 code = ###CODE### 558 jid = ###JOBID### 559 560 j = jobs( jid ) 561 j.resubmit() 562 563 # Start pickle token 564 print "***_START_PICKLE_***" 565 566 # pickle the job 567 import pickle 568 print j.outputdir 569 print pickle.dumps(j._impl) 570 print j 571 572 # print a finished token 573 print "***_END_PICKLE_***" 574 print "***_FINISHED_***" 575 """ 576 577 script = script.replace('###CODE###', repr(self._code)) 578 script = script.replace('###JOBID###', str(self.remote_job_id)) 579 580 # check for the connection 581 if (self.opentransport() == False): 582 return 0 583 584 # send the script 585 script_name = '/__jobscript_resubmit__%s.py' % self._code 586 self._sftp.open(self.ganga_dir + script_name, 'w').write(script) 587 588 # run the script 589 stdout, stderr = self.run_remote_script( script_name, self.pre_script ) 590 591 # Copy the job object 592 if stdout.find("***_FINISHED_***") != -1: 593 status, outputdir, id, be = self.grabremoteinfo(stdout) 594 595 if status == 'submitted' or status == 'running': 596 return 1 597 598 return 0
599
600 - def grabremoteinfo( self, out ):
601 602 from string import strip 603 import pickle 604 605 # Find the start and end of the pickle 606 start = out.find("***_START_PICKLE_***") + len("***_START_PICKLE_***") 607 stop = out.find("***_END_PICKLE_***") 608 outputdir = out[start + 1:out.find("\n", start + 1) - 1] 609 pickle_str = out[out.find("\n", start + 1) + 1:stop] 610 611 # Now unpickle and recreate the job 612 j = pickle.loads(pickle_str) 613 614 return j.status, outputdir, j.id, j.backend
615
616 - def preparejob( self, jobconfig, master_input_sandbox ):
617 """Prepare the script to create the job on the remote host""" 618 619 from Ganga.Utility import tempfile 620 621 workdir = tempfile.mkdtemp() 622 job = self.getJobObject() 623 624 script = """#!/usr/bin/env python 625 #----------------------------------------------------- 626 # This job wrapper script is automatically created by 627 # GANGA Remote backend handler. 628 # 629 # It controls: 630 # 1. unpack input sandbox 631 # 2. create the new job 632 # 3. submit it 633 #----------------------------------------------------- 634 import os,os.path,shutil,tempfile 635 import sys,popen2,time,traceback 636 import tarfile 637 638 ############################################################################################ 639 640 ###INLINEMODULES### 641 642 ############################################################################################ 643 644 j = Job() 645 646 output_sandbox = ###OUTPUTSANDBOX### 647 input_sandbox = ###INPUTSANDBOX### 648 appexec = ###APPLICATIONEXEC### 649 appargs = ###APPLICATIONARGS### 650 back_end = ###BACKEND### 651 ganga_dir = ###GANGADIR### 652 code = ###CODE### 653 environment = ###ENVIRONMENT### 654 user_env = ###USERENV### 655 656 if user_env != None: 657 for env_var in user_env: 658 environment[env_var] = user_env[env_var] 659 660 j.outputsandbox = output_sandbox 661 j.backend = back_end 662 663 # Unpack the input sandboxes 664 shutil.move(os.path.expanduser(ganga_dir + "/__subjob_input_sbx__" + code), j.inputdir+"/__subjob_input_sbx__") 665 shutil.move(os.path.expanduser(ganga_dir + "/__master_input_sbx__" + code), j.inputdir+"/__master_input_sbx__") 666 667 # Add the files in the sandbox to the job 668 inputsbx = [] 669 fullsbxlist = [] 670 try: 671 tar = tarfile.open(j.inputdir+"/__master_input_sbx__") 672 filelist = tar.getnames() 673 print filelist 674 675 for f in filelist: 676 fullsbxlist.append( f ) 677 inputsbx.append( j.inputdir + "/" + f ) 678 679 except: 680 print "Unable to open master input sandbox" 681 682 try: 683 tar = tarfile.open(j.inputdir+"/__subjob_input_sbx__") 684 filelist = tar.getnames() 685 686 for f in filelist: 687 fullsbxlist.append( f ) 688 inputsbx.append( j.inputdir + "/" + f ) 689 690 except: 691 print "Unable to open subjob input sandbox" 692 693 # sort out the path of the exe 694 if appexec in fullsbxlist: 695 j.application = Executable ( exe = File(os.path.join(j.inputdir, appexec)), args = appargs, env = environment ) 696 print "Script found: %s" % appexec 697 else: 698 j.application = Executable ( exe = appexec, args = appargs, env = environment ) 699 700 701 j.inputsandbox = inputsbx 702 703 getPackedInputSandbox(j.inputdir+"/__subjob_input_sbx__", j.inputdir + "/.") 704 getPackedInputSandbox(j.inputdir+"/__master_input_sbx__", j.inputdir + "/.") 705 706 # submit the job 707 j.submit() 708 709 # Start pickle token 710 print "***_START_PICKLE_***" 711 712 # pickle the job 713 import pickle 714 print j.outputdir 715 print pickle.dumps(j._impl) 716 717 # print a finished token 718 print "***_END_PICKLE_***" 719 print "***_FINISHED_***" 720 """ 721 import inspect 722 import Ganga.Core.Sandbox as Sandbox 723 script = script.replace('###ENVIRONMENT###', repr(jobconfig.env) ) 724 script = script.replace('###USERENV###', repr(self.environment) ) 725 script = script.replace('###INLINEMODULES###', inspect.getsource(Sandbox.WNSandbox)) 726 script = script.replace('###OUTPUTSANDBOX###', repr(jobconfig.outputbox)) 727 script = script.replace('###APPLICATIONEXEC###',repr(os.path.basename(jobconfig.getExeString()))) 728 script = script.replace('###APPLICATIONARGS###',repr(jobconfig.getArgStrings())) 729 730 # get a string describing the required backend 731 import StringIO 732 be_out = StringIO.StringIO() 733 job.backend.remote_backend.printTree( be_out, "copyable" ) 734 be_str = be_out.getvalue() 735 script = script.replace('###BACKEND###', be_str) 736 737 script = script.replace('###GANGADIR###', repr(self.ganga_dir)) 738 script = script.replace('###CODE###', repr(self._code)) 739 740 sandbox_list = jobconfig.getSandboxFiles() 741 742 str_list = "[ " 743 for fname in sandbox_list: 744 str_list += "j.inputdir + '/' + " + repr(os.path.basename( fname.name )) 745 str_list += ", " 746 747 str_list += "j.inputdir + '/__master_input_sbx__' ]" 748 749 script = script.replace('###INPUTSANDBOX###', str_list) 750 return job.getInputWorkspace().writefile(FileBuffer('__jobscript__.py',script),executable=0)
751
752 - def updateMonitoringInformation( jobs ):
753 754 # Send a script over to the remote site that updates this jobs 755 # info with the info of the remote job 756 import os 757 import getpass 758 from string import strip 759 760 # first, loop over the jobs and sort by host, username, gangadir and pre_script 761 jobs_sort = {} 762 for j in jobs: 763 host_str = j.backend.username + "@" + j.backend.host + ":" + j.backend.ganga_dir + "+" + ';'.join(j.backend.pre_script) 764 if not jobs_sort.has_key(host_str): 765 jobs_sort[host_str] = [] 766 767 jobs_sort[host_str].append(j) 768 769 for host_str in jobs_sort: 770 # Create a ganga script that updates the job info for all jobs at this remote site 771 script = """#!/usr/bin/env python 772 #----------------------------------------------------- 773 # This is a monitoring script for a remote job. It 774 # outputs some useful job info and exits 775 #----------------------------------------------------- 776 import os,os.path,shutil,tempfile 777 import sys,popen2,time,traceback 778 779 ############################################################################################ 780 781 ###INLINEMODULES### 782 783 ############################################################################################ 784 785 code = ###CODE### 786 jids = ###JOBID### 787 788 runMonitoring() 789 790 import pickle 791 792 for jid in jids: 793 794 j = jobs( jid ) 795 796 # Start pickle token 797 print "***_START_PICKLE_***" 798 799 # pickle the job 800 print j.outputdir 801 print pickle.dumps(j._impl) 802 print j 803 804 # print a finished token 805 print "***_END_PICKLE_***" 806 807 print "***_FINISHED_***" 808 """ 809 810 mj = jobs_sort[host_str][0] 811 script = script.replace('###CODE###', repr(mj.backend._code)) 812 rem_ids = [] 813 for j in jobs_sort[host_str]: 814 rem_ids.append(j.backend.remote_job_id) 815 script = script.replace('###JOBID###', str(rem_ids)) 816 817 # check for the connection 818 if (mj.backend.opentransport() == False): 819 return 0 820 821 # send the script 822 script_name = '/__jobscript__%s.py' % mj.backend._code 823 mj.backend._sftp.open(mj.backend.ganga_dir + script_name, 'w').write(script) 824 825 # run the script 826 stdout, stderr = mj.backend.run_remote_script( script_name, mj.backend.pre_script ) 827 828 # Copy the job object 829 if stdout.find("***_FINISHED_***") != -1: 830 831 start_pos = stdout.find("***_START_PICKLE_***") 832 end_pos = stdout.find("***_END_PICKLE_***") + len("***_END_PICKLE_***") 833 834 while start_pos != -1 and end_pos != -1: 835 pickle_str = stdout[start_pos:end_pos + 1] 836 837 status, outputdir, id, be = mj.backend.grabremoteinfo( pickle_str ) 838 839 # find the job and update it 840 found = False 841 for j in jobs_sort[host_str]: 842 843 if (id == j.backend.remote_job_id): 844 found = True 845 if status != j.status: 846 j.updateStatus(status) 847 848 if hasattr(j.backend.remote_backend,'exitcode'): 849 j.backend.exitcode = be.exitcode 850 if hasattr(j.backend.remote_backend,'actualCE'): 851 j.backend.actualCE = be.actualCE 852 853 for o in be._schema.allItems(): 854 exec("j.backend.remote_backend." + o[0] + " = be." + o[0]) 855 856 # check for completed or failed and pull the output if required 857 if j.status == 'completed' or j.status == 'failed': 858 859 # we should have output, so get the file list first 860 filelist = j.backend._sftp.listdir(outputdir) 861 862 # go through and sftp them back 863 for fname in filelist: 864 data = j.backend._sftp.open(outputdir + '/' + fname, 'r').read() 865 open(j.outputdir + '/' + os.path.basename(fname), 'w').write(data) 866 867 if not found: 868 logger.warning("Couldn't match remote id %d with monitored job. Serious problems in Remote monitoring." % id) 869 870 start_pos = stdout.find("***_START_PICKLE_***", end_pos) 871 end_pos = stdout.find("***_END_PICKLE_***", end_pos) + len("***_END_PICKLE_***") 872 873 # remove the script 874 j.backend._sftp.remove(j.backend.ganga_dir + script_name) 875 876 return None
877 878 updateMonitoringInformation = \ 879 staticmethod( updateMonitoringInformation ) 880