Package Ganga :: Package Core :: Package GangaRepository :: Module GangaRepositorySQLite
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Core.GangaRepository.GangaRepositorySQLite

  1  # Note: Following stuff must be considered in a GangaRepository: 
  2  # 
  3  # * lazy loading 
  4  # * locking 
  5   
  6  from GangaRepository import GangaRepository, PluginManagerError, EmptyGangaObject, RepositoryError, InaccessibleObjectError 
  7  from Ganga.Utility.Config import getConfig 
  8  import os, os.path, fcntl, time, errno 
  9   
 10  import sqlite 
 11   
 12  try: 
 13           import cPickle as pickle 
 14  except: 
 15           import pickle 
 16   
 17  import Ganga.Utility.logging 
 18  logger = Ganga.Utility.logging.getLogger() 
 19   
 20  from Ganga.GPIDev.Lib.GangaList.GangaList import makeGangaListByRef 
 21  from Ganga.GPIDev.Base.Objects import Node 
 22   
23 -class GangaRepositorySQLite(GangaRepository):
24 """GangaRepository SQLite""" 25
26 - def startup(self):
27 """ Starts an repository and reads in a directory structure.""" 28 self._load_timestamp = {} 29 self._cache_load_timestamp = {} 30 self.known_bad_ids = [] 31 self.root = os.path.join(self.registry.location,"0.1",self.registry.name) 32 try: 33 os.makedirs(self.root) 34 except OSError, x: 35 pass 36 self.con = sqlite.connect(os.path.join(self.root,"database.db")) 37 print "Connected to ", os.path.join(self.root,"database.db") 38 self.cur = self.con.cursor() 39 tables = self.cur.execute("SELECT name FROM sqlite_master WHERE type='table' and name LIKE 'objects'") 40 if len(self.cur.fetchall()) == 0: 41 self.cur.execute("CREATE TABLE objects (id INTEGER PRIMARY KEY, classname VARCHAR(30), category VARCHAR(20), idx VARCHAR(100), data VARCHAR(1000))") 42 self.con.commit() 43 self.update_index(verbose = True)
44
45 - def shutdown(self):
46 """Shutdown the repository. Flushing is done by the Registry 47 Raise RepositoryError""" 48 self.cur.close() 49 self.con.close()
50
51 - def update_index(self,id = None,verbose=False):
52 """ Update the list of available objects 53 Raise RepositoryError""" 54 # First locate and load the index files 55 logger.debug("updating index...") 56 self.cur.execute("SELECT id,classname,category,idx FROM objects") 57 for e in self.cur: 58 id = int(e[0]) 59 if e[1] is None: # deleted object 60 continue 61 # Locked IDs can be ignored 62 #if id in self.sessionlock.locked: 63 # continue 64 # Now we treat unlocked IDs 65 if id in self.objects: 66 obj = self.objects[id] 67 else: 68 obj = self._make_empty_object_(id,e[2],e[1]) 69 obj._index_cache = pickle.loads(e[3]) 70 logger.debug("updated index done")
71
72 - def add(self, objs, force_ids = None):
73 """ Add the given objects to the repository, forcing the IDs if told to. 74 Raise RepositoryError""" 75 ids = [] 76 if not force_ids is None: # assume the ids are already locked by Registry 77 if not len(objs) == len(force_ids): 78 raise RepositoryError(self, "Internal Error: add with different number of objects and force_ids!") 79 ids = force_ids 80 for i in range(0,len(objs)): 81 cls = objs[i]._name 82 cat = objs[i]._category 83 objs[i]._index_cache = self.registry.getIndexCache(objs[i]) 84 data = pickle.dumps(objs[i]._data).replace("'","''") 85 idx = pickle.dumps(objs[i]._index_cache).replace("'","''") 86 if force_ids is None: 87 self.cur.execute("INSERT INTO objects (id,classname,category,idx,data) VALUES (NULL,'%s','%s','%s','%s')" % (cls,cat,idx,data)) 88 else: 89 self.cur.execute("INSERT INTO objects (id,classname,category,idx,data) VALUES (%i,'%s','%s','%s','%s')" % (force_ids[i],cls,cat,idx,data)) 90 ids.append(self.cur.lastrowid) 91 self._internal_setitem__(ids[i], objs[i]) 92 self.con.commit() 93 return ids
94
95 - def flush(self, ids):
96 for id in ids: 97 obj = self.objects[id] 98 if obj._name != "EmptyGangaObject": 99 obj._index_cache = self.registry.getIndexCache(obj) 100 data = pickle.dumps(obj._data).replace("'","''") 101 idx = pickle.dumps(obj._index_cache).replace("'","''") 102 self.cur.execute("UPDATE objects SET idx='%s',data='%s' WHERE id=%s" % (idx, data, id)) 103 #print "flushing id ", id, " backend ", obj.backend._name 104 self.con.commit()
105
106 - def load(self, ids):
107 self.cur.execute("SELECT id,classname,category,data FROM objects WHERE id IN (%s)" % (",".join(map(str,ids)))) 108 for e in self.cur: 109 #print "load: ",e 110 id = int(e[0]) 111 if e[1] is None: # deleted object 112 continue 113 if not id in self.objects: 114 obj = self._make_empty_object_(id,e[2],e[1]) 115 else: 116 obj = self.objects[id] 117 if obj._data is None: 118 obj._data = pickle.loads(e[3]) 119 obj.__setstate__(obj.__dict__) 120 ids.remove(id) 121 if len(ids) > 0: 122 raise KeyError(ids[0])
123
124 - def delete(self, ids):
125 for id in ids: 126 self.cur.execute("UPDATE objects SET classname=NULL,category=NULL,idx=NULL,data=NULL WHERE id=%s" % (id)) 127 self._internal_del__(id) 128 self.con.commit()
129
130 - def lock(self,ids):
131 return ids
132
133 - def unlock(self,ids):
134 return ids
135
136 - def get_lock_session(self,id):
137 """get_lock_session(id) 138 Tries to determine the session that holds the lock on id for information purposes, and return an informative string. 139 Returns None on failure 140 """ 141 return ""
142
143 - def get_other_sessions(self):
144 """get_session_list() 145 Tries to determine the other sessions that are active and returns an informative string for each of them. 146 """ 147 return []
148
149 - def reap_locks(self):
150 """reap_locks() --> True/False 151 Remotely clear all foreign locks from the session. 152 WARNING: This is not nice. 153 Returns True on success, False on error.""" 154 return False
155
156 - def clean(self):
157 """clean() --> True/False 158 Clear EVERYTHING in this repository, counter, all jobs, etc. 159 WARNING: This is not nice.""" 160 self.shutdown() 161 os.unlink(os.path.join(self.root,"database.db")) 162 self.startup()
163