Package Ganga :: Package Lib :: Package MonitoringServices :: Package Octopus :: Module Octopus'
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Lib.MonitoringServices.Octopus.Octopus'

  1  #!/usr/bin/env python 
  2  # 
  3  # $Id: Octopus.py,v 1.1 2008-07-17 16:40:59 moscicki Exp $ 
  4  # 
  5   
  6  import socket 
  7  import random 
  8  import sys 
  9  import time 
 10   
 11  DEBUG = True 
 12   
13 -class ProtocolException(Exception):
14 """ Raised when the protocol fails. 15 """
16 - def __init__(self, errorCode, msg):
17 self.errorCode = errorCode 18 self.msg = msg
19
20 - def __str__(self):
21 return repr(self.errorCode) + ' - ' + repr(self.msg)
22 23
24 -class Octopus:
25 - def __init__(self, host, port=8882):
26 self.connected = False 27 self.host = host 28 self.port = port 29 self.buffer = '' 30 self.eotFound = False
31
32 - def connect(self):
33 self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 34 TCP_NODELAY = 1 35 self.s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 36 if DEBUG: print 'Connecting to ', self.host, self.port 37 self.s.connect((self.host, self.port)) 38 self.connected = True 39 self.buffer = ''
40
41 - def open(self, channel, requestNew):
42 if DEBUG: print 'Opening channel ', channel, ' create is ', requestNew 43 if not self.connected: 44 try: 45 self.connect() 46 except socket.error, e: 47 raise ProtocolException(-1, "Could not connect to server " 48 + e.__str__()) 49 try: 50 if requestNew == 1: 51 self.s.send('create %s\n\n' % channel) 52 elif requestNew == -1: 53 self.s.send('join %s \n\n' % channel) 54 else: 55 self.s.send('channel %s\n\n' % channel) 56 response = '' 57 while response.find("\n\n") < 0: 58 buf = self.s.recv(1024) # OK from server 59 if not buf: 60 raise ProtocolException(-4, "Server disconnect") 61 response = response + buf 62 # if len(response) > 500: 63 # raise ProtocolException(-2, "Server header too long") 64 pos = response.find("ERROR ") 65 if pos > -1: 66 err = int(response.split(' ')[1]) 67 raise ProtocolException(err, "Server error: " + response[pos:]) 68 pos = response.find("OK\n\n") 69 if pos < 0: 70 raise ProtocolException(-3, "Illegal server header") 71 self.buffer = response[pos+4:] 72 self.s.setblocking(0) 73 except socket.error, e: 74 raise ProtocolException(-1, "Could not connect to server " 75 + e.__str__())
76
77 - def close(self, sendEOT = True):
78 if sendEOT: 79 self.send('\004') 80 self.s.close()
81
82 - def create(self, channel = 0):
83 if channel == 0: 84 channel = randint(1, sys.maxint) 85 return self.open(channel, 1)
86
87 - def join(self, channel = 0):
88 if channel == 0: 89 channel = randint(1, sys.maxint) 90 return self.open(channel, 0)
91
92 - def send(self, message):
93 if DEBUG: print 'Sending ', message 94 try: 95 self.s.send(message) 96 except socket.error, e: 97 raise ProtocolException(-5, "Send failed: " + e.__str__())
98
99 - def read(self, length=-1):
100 if len(self.buffer) > 0: 101 result = self.buffer 102 self.buffer = '' 103 else: 104 result = '' 105 106 if length == -1: 107 self.s.setblocking(0) 108 result = result + self.s.recv(1024) 109 pos = result.find('\004') 110 if pos >-1: 111 self.eotFound = True 112 return result 113 self.s.setblocking(1) 114 result = result + self.s.recv(length) 115 pos = result.find('\004') 116 if pos >-1: 117 self.eotFound = True 118 119 return result
120