Package Ganga :: Package Utility :: Package external :: Package ARDAMDClient :: Module mdclient
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Utility.external.ARDAMDClient.mdclient

  1  ################################################################################ 
  2  # Ganga Project. http://cern.ch/ganga 
  3  # 
  4  # $Id: mdclient.py,v 1.1 2008-07-17 16:41:02 moscicki Exp $ 
  5  ################################################################################ 
  6  import socket 
  7  import mdinterface 
  8  import time 
  9  from mdinterface import CommandException, MDInterface 
 10   
 11  DEBUG = False 
 12   
 13  try: 
 14      import tlslite 
 15      USE_TLSLITE = True 
 16      from tlslite.api import * 
 17      if DEBUG: print "Using tlslite" 
 18  except ImportError, e: 
 19      USE_TLSLITE = False 
 20   
 21   
22 -class MDClient(MDInterface):
23 - def __init__(self, host, port, 24 login='anonymous', password = '', 25 keepalive = True):
26 27 self.connected = 0 28 self.host = host 29 self.port = port 30 self.login = login 31 self.password = password 32 self.keepalive = keepalive 33 self.buffer = '' 34 self.reqSSL = False 35 self.sslSock = 0 36 self.sslOptions = None 37 self.sessionID = 0 38 self.session = None 39 self.greetings = '' 40 self.protocolVersion = 0 41 self.currentCommand = "" 42 self.siteCache = {} 43 self.idCache = {}
44 45
46 - def requireSSL(self, key=None, cert=None):
47 self.reqSSL=True 48 self.keyFile=key 49 self.certFile=cert 50 if USE_TLSLITE: 51 self.sslOptions = tlslite.HandshakeSettings.HandshakeSettings 52 self.sslOptions.minVersion = (3,0) 53 self.sslOptions.maxVersion = (3,1)
54 55
56 - def __doSSLHandshake(self, session=None):
57 if USE_TLSLITE: 58 self.sslSock=TLSConnection(self.s) 59 if session: 60 if DEBUG: print "Doing SSL resuming session using TLSLite" 61 self.sslSock.handshakeClientCert(session=session) 62 return 63 if DEBUG: print "Doing SSL handshake using TLSLite" 64 cert = None 65 key = None 66 if self.certFile: 67 s = open(self.certFile).read() 68 x509 = X509() 69 x509.parse(s) 70 cert = X509CertChain([x509]) 71 if self.keyFile: 72 s = open(self.keyFile).read() 73 key = parsePEMKey(s, private=True) 74 self.sslSock.handshakeClientCert(cert, key, 75 self.session, None, None, False) 76 self.sslSock.closeSocket = False 77 else: 78 if DEBUG: print "Doing SSL handshake using builtin SSL" 79 self.sslSock=socket.ssl(self.s, self.keyFile, self.certFile)
80 81
82 - def connect(self):
83 self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 84 TCP_NODELAY = 1 85 self.s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 86 self.s.connect((self.host, self.port)) 87 88 self.greetings = "" 89 #Expect 3 lines of greetings: server version, protocol, auth-methods 90 while self.greetings.count('\n') < 3: 91 line=self.s.recv(1024) 92 if not line: 93 raise CommandException(-1, "Cannot connect") 94 self.greetings += line 95 96 pos = self.greetings.find('\nProtocol ') 97 if pos > -1: 98 self.protocolVersion = int(self.greetings[pos+10]) 99 100 if self.sessionID: 101 if DEBUG: print "Trying to resume session ", self.sessionID 102 # Do reconnect 103 if self.reqSSL: 104 self.s.send('resumeSSL%u\n\n' % self.sessionID) 105 line = self.s.recv(1024) # OK from server 106 self.__doSSLHandshake(self.session) 107 else: 108 self.s.send('resume%d\n\n' % self.sessionID) 109 self.connected=True 110 self.buffer='' 111 self.attr=0 112 return 0 113 else: 114 if self.reqSSL: 115 self.s.send('ssl\n\n') 116 line = self.s.recv(1024) # OK from server 117 self.__doSSLHandshake() 118 else: 119 self.s.send('plain\n\n') 120 line = self.s.recv(1024) # OK from server 121 122 # Send login information if not doing reconnect 123 context = '0 '+self.login 124 if len(self.password): 125 context = context + '\n5 ' + self.password 126 context = context + '\n\n' 127 128 if not self.sessionID: 129 if self.sslSock: 130 self.sslSock.write(context) 131 else: 132 self.s.send(context) 133 134 self.connected=True 135 self.buffer='' 136 self.attr=0
137 138
139 - def disconnect(self, saveSession = False):
140 if saveSession and self.sslSock: 141 if USE_TLSLITE: 142 self.session = self.sslSock.session 143 else: 144 self.session = None 145 if self.connected: 146 if self.sslSock: 147 if USE_TLSLITE: 148 self.sslSock.close() 149 else: 150 del self.sslSock 151 self.s.close() 152 self.connected = False
153 154
155 - def __sendCommand(self, command):
156 if DEBUG: print "Sending > ", command, "<" 157 if self.sslSock: 158 self.sslSock.write(command + '\n') 159 else: 160 self.s.send(command + '\n')
161 162
163 - def __encodeCommand(self, command):
164 command.replace('//', '////'); 165 command.replace('\n', '//n'); 166 return command
167
168 - def execute(self, command):
169 """ Returns silently if the command executes successfully, 170 throws an exception otherwise 171 """ 172 command = self.__encodeCommand(command) 173 self.currentCommand = command 174 self.buffer = "" 175 if not self.keepalive: 176 self.disconnect() 177 if not self.connected: 178 self.connect() 179 self.__sendCommand(command) 180 return self.retrieveResult()
181 182
183 - def retrieveResult(self):
184 self.EOT = 0 185 self.buffer = ""; 186 line = self.__fetchRow() 187 if line == None: 188 raise IOError("Server sent empty response") 189 pos=line.find(' ') 190 msg = "" 191 if(pos>-1): 192 retValue, msg=int(line[:pos]), line[pos+1:] 193 else: 194 retValue = int(line) 195 if retValue != 0: 196 # The command did not execute properly. Clear 197 # the input buffer and raise an exception 198 while not self.EOT: 199 if self.__fetchData()<0: 200 break 201 self.buffer = "" 202 msg = msg + ". Command was: " + self.currentCommand 203 raise CommandException(retValue, msg)
204 205
206 - def executeNoWait(self, command):
207 """ Returns silently if the command executes, 208 throws an exception if an error occours immediately 209 Does not wait for any return condition of the remote 210 command 211 """ 212 command = self.__encodeCommand(command) 213 self.currentCommand = command 214 self.buffer = "" 215 if not self.keepalive: 216 self.disconnect() 217 if not self.connected: 218 self.connect() 219 self.__sendCommand(command) 220 if(self.__dataArrived()): 221 return self.retrieveResult()
222 223
224 - def fetchRow(self):
225 return self.__fetchRow()
226 227
228 - def __fetchRow(self):
229 pos=self.buffer.find('\n') 230 if pos>-1: 231 line=self.buffer[:pos] 232 self.buffer=self.buffer[pos+1:] 233 if not len(self.buffer) and not self.EOT: 234 if self.__fetchData() < 0: return None 235 return line 236 if self.EOT: 237 return None 238 if self.__fetchData() <=0: 239 return None 240 return self.__fetchRow()
241 242
243 - def __fetchData(self):
244 while 1: 245 # Look for end of transmission 246 pos = self.buffer.find('\004') 247 if pos >-1: 248 # Need to find two EOT chars if protocol > 1 249 while self.protocolVersion >1 and self.buffer.find('\004', pos + 1) <0: 250 if self.sslSock: 251 line=self.sslSock.read(1024) 252 else: 253 line=self.s.recv(1024) 254 if not line: break 255 self.buffer += line 256 break 257 # Look for newline 258 pos = self.buffer.find('\n') 259 if pos>-1 and pos<len(self.buffer)-1: break 260 261 # Do read to find newline 262 if self.sslSock: 263 line=self.sslSock.read(1024) 264 else: 265 line=self.s.recv(1024) 266 if not line: break 267 self.buffer += line 268 pos=self.buffer.find('\004') 269 if pos>-1: 270 self.sessionID = 0 271 if pos < len(self.buffer)-8 and self.buffer[pos+1:pos+8] == 'session': 272 pos2 = self.buffer.find('\004', pos+1) 273 self.sessionID = long(self.buffer[pos+8:pos2]) 274 self.disconnect(True) 275 if DEBUG: print "Session ID", self.sessionID 276 self.buffer=self.buffer[:pos] 277 self.EOT=1 278 if not line: 279 return -1 280 return len(line)
281 282
283 - def __dataArrived(self):
284 gotSomething=1 285 self.s.setblocking(0) 286 try: 287 self.s.recv(1, MSG_PEEK) 288 except Exception, e: 289 gotSomething=0 290 self.s.setblocking(1) 291 return gotSomething
292 293
294 - def __quoteValue(self, value):
295 if type(value) == type(str()): 296 value = value.replace('\'', '\\\'') 297 value = "'" + value + "'" 298 return value
299 300
301 - def eot(self):
302 if len(self.buffer): return 0 303 if self.EOT: return 1 304 if self.__fetchData() <=0: return 1 305 return not len(self.buffer)
306 307
308 - def getattr(self, file, attributes):
309 command='getattr '+file 310 for i in attributes: 311 command+=' '+i 312 self.nattrs=len(attributes) 313 self.execute(command)
314 315
316 - def getEntry(self):
317 file = self.__fetchRow() 318 attributes=[] 319 for i in range(0, self.nattrs): 320 attribute=self.__fetchRow() 321 attributes.append(attribute) 322 return file, attributes
323 324
325 - def setAttr(self, file, keys, values):
326 command='setattr '+file 327 for i in range(len(keys)): 328 command+=' '+keys[i] 329 values[i] = self.__quoteValue(values[i]) 330 command+=' '+str(values[i]) 331 self.execute(command)
332 333
334 - def addEntry(self, file, keys, values):
335 command='addentry '+file 336 for i in range(len(keys)): 337 command += ' '+keys[i] 338 values[i] = self.__quoteValue(str(values[i])) 339 command += ' ' +values[i] 340 self.execute(command)
341 342
343 - def addEntries(self, entries):
344 command='addentries' 345 for e in entries: 346 command+=' '+ e 347 self.execute(command)
348 349
350 - def addAttr(self, file, name, t):
351 command='addattr '+file+' ' + name + ' ' +t 352 self.execute(command)
353 354
355 - def removeAttr(self, file, name):
356 command='removeattr '+file+' ' + name 357 self.execute(command)
358 359
360 - def clearAttr(self, file, name):
361 command='clearattr '+file+' ' + name 362 self.execute(command)
363 364
365 - def listEntries(self, pattern):
366 command='dir '+pattern 367 self.execute(command) 368 self.nattrs=1
369 370
371 - def pwd(self):
372 self.execute('pwd') 373 return self.__fetchRow()
374 375
376 - def listAttr(self, file):
377 command='listattr ' + file 378 self.execute(command) 379 attributes=[] 380 types=[] 381 while not self.eot(): 382 attribute = self.__fetchRow() 383 attributes.append(attribute) 384 t=self.__fetchRow() 385 types.append(t) 386 return attributes, types
387 388
389 - def createDir(self, dir):
390 command='createdir ' + dir 391 self.execute(command)
392 393
394 - def createPlainDir(self, dir):
395 command='createdir ' + dir + ' plain' 396 self.execute(command)
397 398
399 - def removeDir(self, dir):
400 command='rmdir ' + dir 401 self.execute(command)
402 403
404 - def rm(self, path):
405 command='rm ' + path 406 self.execute(command)
407 408
409 - def find(self, pattern, query):
410 command='find ' 411 command+=' '+ pattern 412 command+=' ' + self.__quoteValue(query) 413 self.nattrs=1 414 self.execute(command)
415 416
417 - def selectAttr(self, attributes, query):
418 command='selectattr ' 419 for i in attributes: 420 command+=' '+i 421 self.nattrs=len(attributes) 422 command+=' ' + self.__quoteValue(query) 423 self.execute(command)
424 425
426 - def getSelectAttrEntry(self):
427 attributes=[] 428 for i in range(0, self.nattrs): 429 attribute=self.__fetchRow() 430 attributes.append(attribute) 431 return attributes
432 433
434 - def updateAttr(self, pattern, updateExpr, condition):
435 command = 'updateattr ' + pattern 436 for i in updateExpr: 437 var, exp = self.splitUpdateClause(i) 438 command += ' ' + var + ' ' + self.__quoteValue(exp) 439 command += ' ' + self.__quoteValue(condition) 440 self.execute(command)
441 442
443 - def update(self, pattern, updateExpr, condition):
444 command = 'update ' + pattern 445 for i in updateExpr: 446 var, exp = self.splitUpdateClause(i) 447 command += ' ' + var + ' ' + self.__quoteValue(exp) 448 command += ' ' + self.__quoteValue(condition) 449 self.execute(command)
450 451
452 - def upload(self, collection, attributes):
453 command = 'upload ' + collection 454 for i in attributes: 455 command+=' '+i 456 self.nattrs=len(attributes) 457 self.executeNoWait(command)
458 459
460 - def put(self, file, values):
461 command='put ' + file 462 if(len(values)!=self.nattrs): 463 raise CommandException(3, "Illegal command") 464 for i in values: 465 command+=' '+i 466 self.executeNoWait(command)
467 468
469 - def abort(self):
470 command='abort' 471 self.execute(command)
472 473
474 - def commit(self):
475 command='commit' 476 self.execute(command)
477 478
479 - def sequenceCreate(self, name, directory, increment=1, start=1):
480 command='sequence_create ' + name + " " + directory + " " 481 command+= str(increment) + " " + str(start) 482 self.execute(command)
483 484
485 - def sequenceNext(self, name):
486 command='sequence_next ' + name 487 self.execute(command) 488 return self.__fetchRow()
489 490
491 - def sequenceRemove(self, name):
492 command='sequence_remove ' + name 493 self.execute(command)
494 495
496 - def cd(self, dir):
497 command='cd ' + dir 498 self.execute(command)
499 500
501 - def constraintAddNotNull(self,directory,attribute,name):
502 command='constraint_add_not_null '+directory+" "+attribute+" "+name 503 self.execute(command)
504 505
506 - def constraintAddUnique(self, directory, attribute, name):
507 command='constraint_add_unique '+directory+" "+attribute+" "+name 508 self.execute(command)
509 510
511 - def constraintAddReference(self, directory, attribute, reffered_attr, name):
512 command='constraint_add_reference '+directory+' '+attribute+' '+reffered_attr 513 command=command + ' ' + name 514 self.execute(command)
515 516
517 - def constraintAddCheck(self, directory, check, name):
518 command='constraint_add_check '+directory+' ' + self.__quoteValue(check)+' '+name 519 self.execute(command)
520 521
522 - def constraintDrop(self, directory, name):
523 command='constraint_drop '+directory+' '+name 524 self.execute(command)
525 526
527 - def constraintList(self, directory):
528 command='constraint_list '+directory 529 self.execute(command) 530 constraints=[] 531 while not self.eot(): 532 constraint = self.__fetchRow() 533 constraints.append(constraint) 534 return constraints
535 536
537 - def transaction(self):
538 command='transaction' 539 self.execute(command)
540 541
542 - def remoteExecute(self, remoteCommand):
543 command='execute ' + remoteCommand 544 self.execute(command) 545 result = '' 546 while not self.eot(): 547 result = result + self.__fetchRow() +'\n' 548 return result
549 550
551 - def siteAdd(self, site, server=''):
552 command = 'site_add ' + site 553 if len(server): 554 command = command + ' ' + server 555 self.execute(command)
556 557
558 - def siteRemove(self, site):
559 command = 'site_remove ' + name 560 self.execute(command)
561 562
563 - def replicaRegister(self, guidSiteList, create = True):
564 command = 'replica_register' 565 if create: 566 command = command + ' -c' 567 for g in guidSiteList: 568 (guid, site) = g 569 command += ' ' 570 if self.siteCache.has_key(site): 571 command = command + guid + ' ' + str(self.siteCache[site]) 572 else: 573 command = command + guid + ' ' + site 574 self.execute(command)
575 576
577 - def replicaUnregister(self, guid, site, delete = False):
578 command = 'replica_unregister ' 579 if delete: 580 command = command + '-d ' 581 command = command + guid + ' '+ site 582 self.execute(command)
583 584
585 - def replicaAdd(self, guid):
586 command = 'replica_add '
587 588
589 - def siteList(self):
590 command = 'site_list' 591 self.execute(command) 592 self.siteCache = {} 593 self.idCache = {} 594 while not self.eot(): 595 l = self.__fetchRow() 596 (id, name, config) = l.split() 597 self.siteCache[name] = int(id) 598 self.idCache[int(id)] = name 599 return self.siteCache
600 601
602 - def replicaList(self, files, resolveSites = True, isLFN = False):
603 if resolveSites and not len(self.idCache): 604 self.siteList() 605 command = 'replica_list' 606 if isLFN: 607 command += ' -l' 608 for f in files: 609 command += ' ' + f 610 self.execute(command) 611 res = {} 612 while not self.eot(): 613 l = self.__fetchRow() 614 try: 615 (file, s) = l.split(' ', 1) 616 except ValueError: 617 file = l 618 s = '' 619 sites = map(int, s.split()) 620 if resolveSites: 621 sites = map(self.idCache.get, sites) 622 res[file] = sites 623 return res
624