Package Ganga :: Package Utility :: Package AMGAServerTools :: Module mdclient
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Utility.AMGAServerTools.mdclient

  1  # 
  2  # $Id: mdclient.py,v 1.1 2008-07-17 16:41:01 moscicki Exp $ 
  3  # 
  4  import socket 
  5  import mdinterface 
  6  import time 
  7  from mdinterface import CommandException, MDInterface 
  8   
  9  DEBUG = False 
 10   
 11  try: 
 12      import tlslite 
 13      USE_TLSLITE = True 
 14      from tlslite.api import * 
 15      if DEBUG: print "Using tlslite" 
 16  except ImportError, e: 
 17      USE_TLSLITE = False 
 18   
 19   
20 -class MDClient(MDInterface):
21 - def __init__(self, host, port, 22 login='anonymous', password = '', 23 keepalive = True):
24 self.connected = 0 25 self.host = host 26 self.port = port 27 self.login = login 28 self.password = password 29 self.keepalive = keepalive 30 self.buffer = '' 31 self.reqSSL = False 32 self.sslSock = 0 33 self.sslOptions = None 34 self.sessionID = 0 35 self.session = None 36 self.greetings = '' 37 self.protocolVersion = 0 38 self.currentCommand = ""
39
40 - def requireSSL(self, key=None, cert=None):
41 self.reqSSL=True 42 self.keyFile=key 43 self.certFile=cert 44 if USE_TLSLITE: 45 self.sslOptions = tlslite.HandshakeSettings.HandshakeSettings 46 self.sslOptions.minVersion = (3,0) 47 self.sslOptions.maxVersion = (3,1)
48 49
50 - def __doSSLHandshake(self, session=None):
51 if USE_TLSLITE: 52 self.sslSock=TLSConnection(self.s) 53 if session: 54 if DEBUG: print "Doing SSL resuming session using TLSLite" 55 self.sslSock.handshakeClientCert(session=session) 56 return 57 58 if DEBUG: print "Doing SSL handshake using TLSLite" 59 cert = None 60 key = None 61 if self.certFile: 62 s = open(self.certFile).read() 63 x509 = X509() 64 x509.parse(s) 65 cert = X509CertChain([x509]) 66 if self.keyFile: 67 s = open(self.keyFile).read() 68 key = parsePEMKey(s, private=True) 69 self.sslSock.handshakeClientCert(cert, key, 70 self.session, None, None, False) 71 self.sslSock.closeSocket = False 72 else: 73 if DEBUG: print "Doing SSL handshake using builtin SSL" 74 self.sslSock=socket.ssl(self.s, self.keyFile, self.certFile)
75 76
77 - def connect(self):
78 self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 79 TCP_NODELAY = 1 80 self.s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 81 self.s.connect((self.host, self.port)) 82 83 self.greetings = "" 84 #Expect 3 lines of greetings: server version, protocol, auth-methods 85 while self.greetings.count('\n') < 3: 86 line=self.s.recv(1024) 87 if not line: 88 raise CommandException(-1, "Cannot connect") 89 self.greetings += line 90 91 pos = self.greetings.find('\nProtocol ') 92 if pos > -1: 93 self.protocolVersion = int(self.greetings[pos+10]) 94 95 if self.sessionID: 96 if DEBUG: print "Trying to resume session ", self.sessionID 97 # Do reconnect 98 if self.reqSSL: 99 self.s.send('resumeSSL%u\n\n' % self.sessionID) 100 line = self.s.recv(1024) # OK from server 101 self.__doSSLHandshake(self.session) 102 else: 103 self.s.send('resume%d\n\n' % self.sessionID) 104 self.connected=True 105 self.buffer='' 106 self.attr=0 107 return 0 108 else: 109 if self.reqSSL: 110 self.s.send('ssl\n\n') 111 line = self.s.recv(1024) # OK from server 112 self.__doSSLHandshake() 113 else: 114 self.s.send('plain\n\n') 115 line = self.s.recv(1024) # OK from server 116 117 # Send login information if not doing reconnect 118 context = '0 '+self.login + '\n5 ' + self.password + '\n\n' 119 if not self.sessionID: 120 if self.sslSock: 121 self.sslSock.write(context) 122 else: 123 self.s.send(context) 124 125 self.connected=True 126 self.buffer='' 127 self.attr=0
128 129
130 - def disconnect(self, saveSession = False):
131 if saveSession and self.sslSock: 132 if USE_TLSLITE: 133 self.session = self.sslSock.session 134 else: 135 self.session = None 136 137 if self.connected: 138 if self.sslSock and USE_TLSLITE: 139 self.sslSock.close() 140 self.s.close() 141 self.connected = False
142 143
144 - def __sendCommand(self, command):
145 if DEBUG: print "Sending > ", command, "<" 146 if self.sslSock: 147 self.sslSock.write(command + '\n') 148 else: 149 self.s.send(command + '\n')
150
151 - def __encodeCommand(self, command):
152 command.replace('//', '////'); 153 command.replace('\n', '//n'); 154 return command
155 156
157 - def execute(self, command):
158 """ Returns silently if the command executes successfully, 159 throws an exception otherwise 160 """ 161 command = self.__encodeCommand(command) 162 self.currentCommand = command 163 164 self.buffer = "" 165 if not self.keepalive: 166 self.disconnect() 167 if not self.connected: 168 self.connect() 169 self.__sendCommand(command) 170 return self.retrieveResult()
171 172
173 - def retrieveResult(self):
174 self.EOT = 0 175 self.buffer = ""; 176 line = self.__fetchRow() 177 if line == None: 178 raise IOError("Server sent empty response") 179 pos=line.find(' ') 180 msg = "" 181 if(pos>-1): 182 retValue, msg=int(line[:pos]), line[pos+1:] 183 else: 184 retValue = int(line) 185 if retValue != 0: 186 # The command did not execute properly. Clear 187 # the input buffer and raise an exception 188 while not self.EOT: 189 if self.__fetchData()<0: 190 break 191 self.buffer = "" 192 msg = msg + ". Command was: " + self.currentCommand 193 raise CommandException(retValue, msg)
194 195
196 - def executeNoWait(self, command):
197 """ Returns silently if the command executes, 198 throws an exception if an error occours immediately 199 Does not wait for any return condition of the remote 200 command 201 """ 202 203 command = self.__encodeCommand(command) 204 self.currentCommand = command 205 206 self.buffer = "" 207 if not self.keepalive: 208 self.disconnect() 209 if not self.connected: 210 self.connect() 211 self.__sendCommand(command) 212 if(self.__dataArrived()): 213 return self.retrieveResult()
214 215
216 - def fetchRow(self):
217 return self.__fetchRow()
218
219 - def __fetchRow(self):
220 # print "buffer: >>>>>>>>>>>", self.buffer, "<<<<<<<<<<<<" 221 pos=self.buffer.find('\n') 222 if pos>-1: 223 line=self.buffer[:pos] 224 self.buffer=self.buffer[pos+1:] 225 if not len(self.buffer) and not self.EOT: 226 if self.__fetchData() < 0: return None 227 return line 228 if self.EOT: 229 return None 230 if self.__fetchData() <=0: 231 return None 232 return self.__fetchRow()
233 234
235 - def __fetchData(self):
236 while 1: 237 # Look for end of transmission 238 pos = self.buffer.find('\004') 239 if pos >-1: 240 # Need to find two EOT chars if protocol > 1 241 while self.protocolVersion >1 and self.buffer.find('\004', pos + 1) <0: 242 if self.sslSock: 243 line=self.sslSock.read(1024) 244 else: 245 line=self.s.recv(1024) 246 if not line: break 247 self.buffer += line 248 break 249 250 # Look for newline 251 pos = self.buffer.find('\n') 252 if pos>-1 and pos<len(self.buffer)-1: break 253 254 # Do read to find newline 255 if self.sslSock: 256 line=self.sslSock.read(1024) 257 else: 258 line=self.s.recv(1024) 259 if not line: break 260 self.buffer += line 261 262 pos=self.buffer.find('\004') 263 if pos>-1: 264 self.sessionID = 0 265 if pos < len(self.buffer)-8 and self.buffer[pos+1:pos+8] == 'session': 266 pos2 = self.buffer.find('\004', pos+1) 267 self.sessionID = long(self.buffer[pos+8:pos2]) 268 self.disconnect(True) 269 if DEBUG: print "Session ID", self.sessionID 270 self.buffer=self.buffer[:pos] 271 self.EOT=1 272 if not line: 273 return -1 274 return len(line)
275 276
277 - def __dataArrived(self):
278 gotSomething=1 279 self.s.setblocking(0) 280 try: 281 self.s.recv(1, MSG_PEEK) 282 except Exception, e: 283 gotSomething=0 284 self.s.setblocking(1) 285 return gotSomething
286 287
288 - def __quoteValue(self, value):
289 if type(value) == type(str()): 290 value = value.replace('\'', '\\\'') 291 value = "'" + value + "'" 292 return value
293 294
295 - def eot(self):
296 if len(self.buffer): return 0 297 if self.EOT: return 1 298 if self.__fetchData() <=0: return 1 299 return not len(self.buffer)
300 301
302 - def getattr(self, file, attributes):
303 command='getattr '+file 304 for i in attributes: 305 command+=' '+i 306 self.nattrs=len(attributes) 307 self.execute(command)
308 309
310 - def getEntry(self):
311 file = self.__fetchRow() 312 attributes=[] 313 for i in range(0, self.nattrs): 314 attribute=self.__fetchRow() 315 attributes.append(attribute) 316 return file, attributes
317 318
319 - def setAttr(self, file, keys, values):
320 command='setattr '+file 321 for i in range(len(keys)): 322 command+=' '+keys[i] 323 values[i] = self.__quoteValue(values[i]) 324 command+=' '+str(values[i]) 325 self.execute(command)
326 327
328 - def addEntry(self, file, keys, values):
329 command='addentry '+file 330 for i in range(len(keys)): 331 command += ' '+keys[i] 332 values[i] = self.__quoteValue(str(values[i])) 333 command += ' ' +values[i] 334 self.execute(command)
335 336
337 - def addEntries(self, entries):
338 command='addentries' 339 for e in entries: 340 command+=' '+ e 341 self.execute(command)
342 343
344 - def addAttr(self, file, name, t):
345 command='addattr '+file+' ' + name + ' ' +t 346 self.execute(command)
347 348
349 - def removeAttr(self, file, name):
350 command='removeattr '+file+' ' + name 351 self.execute(command)
352 353
354 - def clearAttr(self, file, name):
355 command='clearattr '+file+' ' + name 356 self.execute(command)
357 358
359 - def listEntries(self, pattern):
360 command='dir '+pattern 361 self.execute(command) 362 self.nattrs=1
363 364
365 - def pwd(self):
366 self.execute('pwd') 367 return self.__fetchRow()
368 369
370 - def listAttr(self, file):
371 command='listattr ' + file 372 self.execute(command) 373 attributes=[] 374 types=[] 375 while not self.eot(): 376 attribute = self.__fetchRow() 377 attributes.append(attribute) 378 t=self.__fetchRow() 379 types.append(t) 380 return attributes, types
381 382
383 - def createDir(self, dir):
384 command='createdir ' + dir 385 self.execute(command)
386
387 - def createPlainDir(self, dir):
388 command='createdir ' + dir + ' plain' 389 self.execute(command)
390
391 - def removeDir(self, dir):
392 command='rmdir ' + dir 393 self.execute(command)
394 395
396 - def rm(self, path):
397 command='rm ' + path 398 self.execute(command)
399 400
401 - def find(self, pattern, query):
402 command='find ' 403 command+=' '+ pattern 404 command+=' ' + self.__quoteValue(query) 405 self.execute(command)
406 407
408 - def selectAttr(self, attributes, query):
409 command='selectattr ' 410 for i in attributes: 411 command+=' '+i 412 self.nattrs=len(attributes) 413 command+=' ' + self.__quoteValue(query) 414 self.execute(command)
415 416
417 - def getSelectAttrEntry(self):
418 attributes=[] 419 for i in range(0, self.nattrs): 420 attribute=self.__fetchRow() 421 attributes.append(attribute) 422 return attributes
423 424
425 - def updateAttr(self, pattern, updateExpr, condition):
426 command = 'updateattr ' + pattern 427 for i in updateExpr: 428 var, exp = self.splitUpdateClause(i) 429 command += ' ' + var + ' ' + self.__quoteValue(exp) 430 command += ' ' + self.__quoteValue(condition) 431 self.execute(command)
432 433
434 - def upload(self, collection, attributes):
435 command = 'upload ' + collection 436 for i in attributes: 437 command+=' '+i 438 self.nattrs=len(attributes) 439 self.executeNoWait(command)
440 441
442 - def put(self, file, values):
443 command='put ' + file 444 if(len(values)!=self.nattrs): 445 raise CommandException(3, "Illegal command") 446 for i in values: 447 command+=' '+i 448 self.executeNoWait(command)
449 450
451 - def abort(self):
452 command='abort' 453 self.execute(command)
454 455
456 - def commit(self):
457 command='commit' 458 self.execute(command)
459 460
461 - def sequenceCreate(self, name, directory, increment=1, start=1):
462 command='sequence_create ' + name + " " + directory + " " 463 command+= str(increment) + " " + str(start) 464 self.execute(command)
465 466
467 - def sequenceNext(self, name):
468 command='sequence_next ' + name 469 self.execute(command) 470 return self.__fetchRow()
471 472
473 - def sequenceRemove(self, name):
474 command='sequence_remove ' + name 475 self.execute(command)
476 477
478 - def cd(self, dir):
479 command='cd ' + dir 480 self.execute(command)
481
482 - def constraintAddNotNull(self,directory,attribute,name):
483 command='constraint_add_not_null '+directory+" "+attribute+" "+name 484 self.execute(command)
485
486 - def constraintAddUnique(self, directory, attribute, name):
487 command='constraint_add_unique '+directory+" "+attribute+" "+name 488 self.execute(command)
489
490 - def constraintAddReference(self, directory, attribute, reffered_attr, name):
491 command='constraint_add_reference '+directory+' '+attribute+' '+reffered_attr 492 command=command + ' ' + name 493 self.execute(command)
494
495 - def constraintAddCheck(self, directory, check, name):
496 command='constraint_add_check '+directory+' ' + self.__quoteValue(check)+' '+name 497 self.execute(command)
498
499 - def constraintDrop(self, directory, name):
500 command='constraint_drop '+directory+' '+name 501 self.execute(command)
502
503 - def constraintList(self, directory):
504 command='constraint_list '+directory 505 self.execute(command) 506 constraints=[] 507 while not self.eot(): 508 constraint = self.__fetchRow() 509 constraints.append(constraint) 510 return constraints
511
512 - def transaction(self):
513 command='transaction' 514 self.execute(command)
515