Package Ganga :: Package Lib :: Package MonitoringServices :: Package Octopus :: Module OctopusMS'
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Lib.MonitoringServices.Octopus.OctopusMS'

 1  from Ganga.GPIDev.Adapters.IMonitoringService import IMonitoringService 
 2  from Ganga.Lib.MonitoringServices.Octopus.Octopus import * 
 3  from urlparse import urlparse 
 4  import sys 
 5  import traceback 
 6   
7 -class OctopusMS(IMonitoringService):
8 """ A simple tool to send the stdout of a job continuusly to an Octopus server so 9 that the user can immediately look at it."""
10 - def __init__(self, job_info):
11 IMonitoringService.__init__(self,job_info) 12 if type(job_info) is type({}): 13 print job_info 14 self.server = self.job_info['octopus_server'] 15 self.port = self.job_info['octopus_port'] 16 self.client = Octopus(self.server, self.port) 17 self.channel = self.job_info['channel'] 18 self.stdoutpos = 0 19 self.stderrpos = 0 20 self.DEBUG = False
21
22 - def start(self,*other):
23 if self.DEBUG: print 'Start called' 24 print 'Octopus channel:', self.channel 25 return self.progress(other)
26
27 - def progress(self,*other):
28 if self.DEBUG: print 'Progress called' 29 if not self.client.connected: 30 try: 31 self.client.create(self.channel) 32 except ProtocolException, e: 33 print >> sys.stderr, 'Error connecting to octopus server: ', e 34 return 35 self.stdoutFile = open('stdout', 'r') 36 try: 37 self.stdoutFile.seek(0,2) 38 cLen = self.stdoutFile.tell() 39 self.stdoutFile.seek(self.stdoutpos) 40 if self.DEBUG: print 'File length:', cLen 41 while cLen > self.stdoutpos: 42 b = self.stdoutFile.read() 43 if self.DEBUG: print 'Sending >', b, '<' 44 self.client.send(b) 45 self.stdoutpos = self.stdoutpos + len(b) 46 if self.DEBUG: print 'Leaving progress' 47 except ProtocolException, e: 48 print >> sys.stderr, 'Error sending to octopus server: ', e
49
50 - def stop(self,exitcode,*other):
51 if self.DEBUG: print 'Octopus done!' 52 self.client.close()
53
54 - def getJobInfo(self):
55 import os 56 try: 57 s = os.environ['GANGA_OCTOPUS_SERVER'] 58 except KeyError: 59 from Ganga.Utility.util import hostname 60 s = hostname() 61 62 try: 63 p = os.environ['GANGA_OCTOPUS_PORT'] 64 except KeyError: 65 p = 8882 66 67 h = long(hash(self.job_info.getFQID('.'))) 68 h = h + sys.maxint * (long(hash(s)) + sys.maxint) 69 70 return {'jobid':self.job_info.getFQID('.'), 'channel':h, 71 'octopus_server': s, 'octopus_port': p}
72
73 - def getSandboxModules(self):
78