1
2
3
4
5
6
7 __version__ = "2.2"
8
9 import os
10 import shutil
11 import binascii
12 import types
13 import threading
14 import time
15 import Ganga.Utility.external.ARDAMDClient.mdclient
16 import Ganga.Utility.external.ARDAMDClient.mdstandalone
17 import Ganga.Utility.external.ARDAMDClient.mdparser
18 import Ganga.Utility.Config
19 from Base import JobRepository
20 from Separator import Parser
21 from Ganga.Core.exceptions import RepositoryError, BulkOperationRepositoryError
22 from Ganga.Utility.external.ARDAMDClient.mdclient import MDClient
23 from Ganga.Utility.external.ARDAMDClient.mdstandalone import MDStandalone
24 from Ganga.Utility.external.ARDAMDClient.mdinterface import CommandException
25 from Ganga.Utility.external.ARDAMDClient.guid import newGuid
26 from Ganga.Utility.files import expandfilename
27
28
29
30
31
32
33 import Ganga.Utility.logging
34 logger = Ganga.Utility.logging.getLogger(modulename=1)
35
36
37 logging_config = Ganga.Utility.Config.getConfig('Logging')
38
39 arda_logger_name = 'Ganga.Utility.external.ARDAMDClient'
40 arda_logger = Ganga.Utility.logging.getLogger(name=arda_logger_name)
41
42
43
44
45
53
54
55 logging_config.attachUserHandler(None,switch_debug)
56 logging_config.attachSessionHandler(None,switch_debug)
57
58
59
60 switch_debug(arda_logger_name,Ganga.Utility.logging.getLevelName(arda_logger.getEffectiveLevel()))
61
62
63
64
65
66
67
68
69 all_configs = {}
70 all_configs['LocalAMGA'] = Ganga.Utility.Config.makeConfig('LocalAMGA_Repository','Settings for the local AMGA job repository')
71 all_configs['LocalAMGA'].addOption('blocklength', 1000, 'maximum number of jobs stored in a block of local repository')
72 all_configs['LocalAMGA'].addOption('cache_size', 3, 'maximum size of memory (in blocks) that local repository can use for job caching')
73 all_configs['LocalAMGA'].addOption('tries_limit', 200, 'maximum number of attempts to write/move file or to acquire the table lock in local repository')
74 all_configs['LocalAMGA'].addOption('lock_timeout', 60, 'maximum time in seconds that limits lock validity for local repository')
75
76 all_configs['RemoteAMGA'] = Ganga.Utility.Config.makeConfig('RemoteAMGA_Repository','Settings for the local AMGA job repository')
77 all_configs['RemoteAMGA'].addOption('host', 'gangamd.cern.ch', 'location of the AMGA metadata server used by the remote repository')
78 all_configs['RemoteAMGA'].addOption('port', 8822, 'port for secure connection to the remote repository')
79 all_configs['RemoteAMGA'].addOption('reqSSL', 1, 'flag for secure connection to the remote repository')
80 all_configs['RemoteAMGA'].addOption('login', '', 'login name to connect to the remote repository')
81
82 all_configs['Test'] = all_configs['LocalAMGA']
83
84
85
86
87
88
89 USE_ORACLE_AS_REMOTE_BACKEND = True
90 USE_FOLDERS_FOR_SUBJOBS = False
91 USE_COUNTERS_FOR_SUBJOBS = True
92 USE_COMPRESSED_BLOBS = True
93
94
95 if not USE_FOLDERS_FOR_SUBJOBS:
96 USE_COUNTERS_FOR_SUBJOBS = True
97
98
99
100 schema = [('id', 'int'),
101 ('name', 'varchar(254)'),
102 ('status', 'varchar(254)'),
103 ('application', 'varchar(254)'),
104 ('backend', 'varchar(254)')]
105
106
108 _counterName = 'jobseq'
109
110 _jobsTreeFldr = 'jobstree'
111 _jobsTreeAttr = ('folders', 'text')
112
113 _lock = ('lock', 'varchar(254)')
114 _blob = ('blob', 'text')
115 _id = ('id', 'int')
116 _counter = ('counter', 'int')
117 _subjobs = ('subjobs', 'text')
118 _isasubjob = ('isasubjob', 'varchar(254)')
119 _compressed = ('compressed', 'varchar(254)')
120 _istate = ('istate', 'varchar(254)')
121 _time = ('time', 'varchar(254)')
122
123
124 - def __init__(self, schema, role, streamer, tree_streamer, root_dir, init_schema):
125
126
127
128
129
130
131 JobRepository.__init__(self, schema, role, streamer, tree_streamer)
132 if not os.path.isabs(root_dir):
133 root_dir = os.path.join(os.sep, root_dir)
134 self.root_dir = os.path.normpath(root_dir)
135
136
137 self._rep_lock = threading.RLock()
138
139
140 self._initAll(init_schema)
141
142
143 - def _initAll(self, init_schema = True):
161
162
197
198
199 - def _initDir(self, path, schema = None, create_sequence = True):
219
220
222
223
224
225 self._rep_lock.acquire(1)
226 try:
227 try:
228 self.rm(os.path.join(path, '*'))
229 except CommandException, e:
230 if e.errorCode == 1:
231 pass
232 finally:
233 self._rep_lock.release()
234
235
249
250
252
253 return e.errorCode == 1
254
255
257
258 return e.errorCode == 11
259
260
262
263 return e.errorCode == 17
264
265
311
312
315
316
355
356
383
384
386 try:
387 return int(dbid)
388 except:
389 logger.warning("Wrong DB entry %s" % dbid)
390 return dbid
391
392
403
404
406 self._rep_lock.acquire(1)
407 try:
408 assert(type(fqid) in [types.TupleType, types.ListType])
409 if USE_FOLDERS_FOR_SUBJOBS:
410 pp = map(lambda x: '.'.join([str(x), "subjobs"]), fqid[:-1])
411 pp.insert(0, self.root_dir)
412 return os.sep.join(pp)
413 else:
414 return self.root_dir
415 finally:
416 self._rep_lock.release()
417
418
430
431
442
443
445 self._rep_lock.acquire(1)
446 try:
447 assert(type(fqid) in [types.TupleType, types.ListType])
448 if USE_FOLDERS_FOR_SUBJOBS:
449 pp = map(lambda x: '.'.join([str(x), "subjobs"]), fqid)
450 pp.insert(0, self.root_dir)
451 return os.sep.join(pp)
452 else:
453 return ''
454 finally:
455 self._rep_lock.release()
456
457
459 self._rep_lock.acquire(1)
460 try:
461 assert(type(fqid) in [types.TupleType, types.ListType])
462 if forced_action:
463 condition = ''
464 else:
465 if fqid in self._job_cache:
466 guid = self._job_cache[fqid]
467 else:
468 guid = self.guid
469 path = os.path.dirname(self._getJobFileName(fqid))
470 condition = ':'.join([path, self._lock[0] + '="%s"'%guid])
471 return condition
472 finally:
473 self._rep_lock.release()
474
475
485
486
487 - def _text2pstr(self, text):
488 import repr
489 logger.debug('_text2pstr: %s', repr.repr(text))
490 return binascii.unhexlify(text)
491
492
493 - def _pstr2text(self, pstr):
494 import repr
495 logger.debug('_pstr2text: %s', repr.repr(pstr))
496 return binascii.hexlify(pstr)
497
498
500 import zlib
501 return zlib.compress(v)
502
503
505 import zlib
506 return zlib.decompress(v)
507
508
509 - def _getValues(self, job, timestamp, deep = True):
510 self._rep_lock.acquire(1)
511 try:
512 def extractMD(r, mfqid):
513 fqid = list(mfqid)
514 jid = r[0]['data']['id']['data']
515 fqid.append(jid)
516 fqid = tuple(fqid)
517 res = {'fqid':fqid, 'metadata':{}}
518 for k, t in self.schema:
519 if k == self._blob[0]:
520 v = repr(r[0])
521 if USE_COMPRESSED_BLOBS:
522 v = self._compress(v)
523 elif k == self._lock[0]:
524 v = self.guid
525 elif k == self._istate[0]:
526 v = '_'
527 elif k == self._time[0]:
528 v = timestamp
529 elif k == self._subjobs[0]:
530 if r[1]:
531 v = map(lambda rr: rr[0]['data']['id']['data'], r[1])
532 else:
533 v = []
534 v = repr(v)
535 elif k == self._isasubjob[0]:
536 if len(fqid) > 1:
537 v = 'Y'
538 else:
539 v = 'N'
540 elif k == self._counter[0]:
541 v = '1'
542 elif k == self._compressed[0]:
543 if USE_COMPRESSED_BLOBS:
544 v = 'Y'
545 else:
546 v = 'N'
547 else:
548 v = r[0]['data'][k]
549 if v['simple']:
550 v = str(v['data'])
551 else:
552 v = v['name']
553 if t == 'text':
554 v = self._pstr2text(v)
555 if v == '':
556 v = 'None'
557 res['metadata'][k] = v
558 if deep:
559 return (res, map(lambda sr: extractMD(sr, fqid), r[1]))
560 else:
561 return (res, [])
562
563 mfqid = self._getFQID(job)[:-1]
564 jtree = Parser.extractSubJobs(self._streamer._getDictFromJob(job))
565 return extractMD(jtree, mfqid)
566
567 finally:
568 self._rep_lock.release()
569
570
571 - def _commitJobs(self, jobs, forced_action = False, deep = True, register = False, get_ids = True):
572 self._rep_lock.acquire(1)
573 try:
574 if register:
575 sch = self.schema
576 else:
577 sch = self._commit_schema
578 attrs = map(lambda x: x[0], sch)
579 details = {}
580 msgs = []
581
582 def commit_visitor(md, path):
583 try:
584 fqid = md[0]['fqid']
585 if fqid not in self._job_cache:
586 if not register:
587 msg = "Job %s is not found in job cache. Commitment for this job will normally fail." % str(fqid)
588 raise RepositoryError(msg = msg)
589 vals_dict = md[0]['metadata']
590 vals = map(lambda x: vals_dict[x], attrs)
591 updatecond = (not register) or (deep and USE_COUNTERS_FOR_SUBJOBS and len(md[1]) > 0)
592 if updatecond:
593 self._generic_updateAttr(fqid, attrs, vals, forced_action)
594 else:
595 self._generic_addEntry(fqid, attrs, vals)
596 except (CommandException, RepositoryError), e:
597 msg = "_commitJobs() command called while committing job %s raised an exception: %s" % (str(fqid), str(e))
598 msgs.append(msg)
599
600 details[fqid] = e
601 else:
602 job_cache[fqid] = self.guid
603 if deep:
604
605 sj_path = self._getSubJobPath(fqid)
606 for smd in md[1]:
607 commit_visitor(smd, sj_path)
608
609 def register_visitor(j, path, reserve):
610
611 try:
612 if j.master and USE_COUNTERS_FOR_SUBJOBS:
613 def get_id(reserve):
614 return int(self._counterNext(self._getFQID(j.master), reserve)) - 1
615 else:
616 def get_id(reserve):
617 return int(self._generic_sequenceNext(self._getSequenceName(path), reserve)) - 1
618 if get_ids:
619
620 j.id = get_id(reserve)
621 else:
622
623 while get_id(0) < j.id:
624 continue
625 except CommandException, e:
626 msg = "sequenceNext() command called while registering jobs raised an exception: %s" % str(e)
627
628 raise RepositoryError(e = e, msg = msg)
629 else:
630 if deep:
631
632 sj_reserve = len(j.subjobs)
633 if sj_reserve > 0:
634 fqid = self._getFQID(j)
635 sj_path = self._getSubJobPath(fqid)
636 if USE_FOLDERS_FOR_SUBJOBS:
637
638 self._initDir(sj_path)
639 if USE_COUNTERS_FOR_SUBJOBS:
640
641 self._initCounter(fqid)
642 for sj in j.subjobs:
643 register_visitor(sj, sj_path, sj_reserve)
644
645
646 job_categs = self._sortJobsByJobFolders(jobs)
647
648
649 for path in job_categs:
650 jobs = job_categs[path]
651 if register:
652 jobs_to_commit = []
653
654 j_reserve = len(jobs)
655 if j_reserve > 0:
656
657 if path != self.root_dir:
658 self._initDir(path)
659 for j in jobs:
660 try:
661 register_visitor(j, path, j_reserve)
662 except Exception, e:
663 msg = str(e)
664
665 details[path] = RepositoryError(e = e, msg = msg)
666 msgs.append(msg)
667 else:
668 jobs_to_commit.append(j)
669 else:
670 jobs_to_commit = jobs
671
672 job_cache = {}
673 timestamp = repr(time.time())
674 try:
675 self._initCommand()
676 try:
677 for j in jobs_to_commit:
678 commit_visitor(self._getValues(j, timestamp, deep), path)
679 finally:
680 self._finalizeCommand()
681 except Exception, e:
682 msg = str(e)
683
684 details[path] = RepositoryError(e = e, msg = msg)
685 msgs.append(msg)
686 else:
687 self._job_cache.update(job_cache)
688
689 if details:
690 raise BulkOperationRepositoryError(msg = '\n'.join(msgs), details = details)
691
692 finally:
693 self._rep_lock.release()
694
695
697 assert(type(i) in [types.TupleType, types.ListType, types.IntType])
698 if type(i) == types.IntType:
699 return (i,)
700 else:
701 return i
702
703
705 self._rep_lock.acquire(1)
706 try:
707 assert(type(selection) == types.DictionaryType)
708 if selection.has_key('table_path'):
709 s, p = (selection['attributes'], selection['table_path'])
710 else:
711 s, p = (selection, self.root_dir)
712 if not USE_FOLDERS_FOR_SUBJOBS:
713 if not s.has_key(self._isasubjob[0]):
714
715 s[self._isasubjob[0]] = 'N'
716 return (s,p)
717 finally:
718 self._rep_lock.release()
719
720
722 value = value.replace('\'', '\\\'')
723 return '%s \'"%s"\''%(name, value)
724
725
728
729
732
733
736
737
740
741
742 - def _getExtendedAttrList(self, attr_list):
743 lock = self._lock[0]
744 istate = self._istate[0]
745 lock_time = self._time[0]
746
747 extended_attr_list = attr_list[:]
748 extra_attr = [lock, istate, lock_time]
749 for a in extra_attr:
750 if a not in extended_attr_list:
751 extended_attr_list.append(a)
752 return (extended_attr_list, map(lambda a: extended_attr_list.index(a)+1, extra_attr))
753
754
773
774 if type(ids_or_attributes) in [types.TupleType, types.ListType]:
775 self._initBulkGetAttr()
776 for jid in ids_or_attributes:
777 fqid = self._fqnConverter(jid)
778 fqn = self._getJobFileName(fqid)
779 try:
780 self._generic_getattr(fqid, extended_attr_list)
781 except CommandException, e:
782 msg = "ARDA interface command getattr() called for job %s raised an exception: %s" % (str(fqid), str(e))
783 logger.error(msg)
784 else:
785 while not self._generic_eot():
786 try:
787 f, md = self._generic_getEntry()
788 assert(os.path.basename(fqn) == f)
789 except Exception, e:
790 msg = "ARDA interface command getEntry() called for job %s raised an exception: %s" % (str(fqid), str(e))
791 logger.error(msg)
792 else:
793 md.insert(0, fqid)
794 md_list.append(md)
795 update_cache(fqid)
796 self._finalizeBulkGetAttr()
797 else:
798 selection, path = self._getSelectionAndPath(ids_or_attributes)
799 try:
800 self._generic_selectAttr(selection, path, extended_attr_list)
801 except CommandException, e:
802 msg = "ARDA interface command selectAttr() raised an exception: %s" % str(e)
803 logger.error(msg)
804
805 else:
806 while not self._generic_eot():
807 try:
808 md = self._generic_getSelectAttrEntry()
809 except Exception, e:
810 msg = "ARDA interface command getSelectAttrEntry() raised an exception: %s" % str(e)
811 logger.error(msg)
812 else:
813 f = os.path.join(path, md[0])
814 fqid = self._getFQIDfromName(f)
815 md[0] = fqid
816 md_list.append(md)
817 update_cache(fqid)
818 return md_list
819 finally:
820 self._rep_lock.release()
821
822
844
845
846 - def _setLock(self, ids, istate, forced_action = False):
853
854
885
886
895
896
898
899
900
901 self._rep_lock.acquire(1)
902 try:
903 def next(fqid, reserve):
904 metadata = self._getLockedMetaData([fqid], [self._counter[0]], istate = '_counter')
905 if metadata:
906 md = metadata[0]
907 assert(md[0] == fqid)
908 now = md[1]
909 newval = str(int(now) + reserve)
910 attrs = [self._counter[0], self._lock[0], self._istate[0], self._time[0]]
911 values = [newval, self.guid, '_', repr(time.time())]
912 self._setMetaData([fqid], attrs, values)
913 return (now, newval)
914 else:
915 raise RepositoryError(msg = 'Can not lock job %s' % str(fqid))
916
917
918 if reserve > 1:
919 if not hasattr(self, 'sjid_reserve'):
920 self.sjid_reserve = {}
921 try:
922 if fqid in self.sjid_reserve:
923 now, reserved = self.sjid_reserve[fqid]
924 else:
925 now, reserved = map(int, next(fqid, reserve))
926 self.sjid_reserve[fqid] = [now, reserved]
927 newval = now + 1
928 if newval >= reserved:
929 del self.sjid_reserve[fqid]
930 else:
931 self.sjid_reserve[fqid][0] = newval
932 return str(now)
933 except Exception, e:
934 raise RepositoryError(e = e, msg = "getNextSubjobId error: " + str(e))
935 else:
936 return next(fqid, reserve)[0]
937 finally:
938 self._rep_lock.release()
939
940
948
949
961
962
963 - def _generic_addEntry(self, fqid, attrs, values):
965
966
971
972
975
976
979
980
982 return self.getEntry()
983
984
986
987 query = ' and '.join(map(lambda x: ':'.join([path, '%s="%s"'%x]), selection.items()))
988 attr = map(lambda x: ':'.join([path, x]), attr_list)
989 attr.insert(0, ':'.join([path, 'FILE']))
990 self.selectAttr(attr, query)
991
992
994 return self.getSelectAttrEntry()
995
996
1001
1002
1005
1006
1008 self._rep_lock.acquire(1)
1009 try:
1010 if masterJob:
1011 for j in jobs:
1012 if j.master:
1013 assert(j.master is masterJob)
1014 else:
1015 j._setParent(masterJob)
1016 self._commitJobs(jobs, forced_action = True, register = True)
1017 self._commitJobs([masterJob], deep = False)
1018 else:
1019 self._commitJobs(jobs, forced_action = True, register = True)
1020 finally:
1021 self._rep_lock.release()
1022
1023
1024 - def commitJobs(self, jobs, forced_action = False, deep = True):
1027
1028
1030 self._rep_lock.acquire(1)
1031 try:
1032 jobs = []
1033 attr_list = [self._subjobs[0], self._compressed[0], self._blob[0]]
1034
1035 def sorter(x, y):
1036 idx, idy = map(lambda x: x[0]['data']['id']['data'], (x,y))
1037 if idx > idy:
1038 return 1
1039 elif idx < idy:
1040 return -1
1041 else:
1042 return 0
1043
1044 def visitor(md):
1045 try:
1046 sjobs = eval(self._text2pstr(md[1]))
1047 pstr = self._text2pstr(md[3])
1048 if md[2] == 'Y':
1049 pstr = self._decompress(pstr)
1050 jdict = eval(pstr)
1051 rr = []
1052 if deep and sjobs:
1053 if USE_FOLDERS_FOR_SUBJOBS:
1054 path = self._getSubJobPath(md[0])
1055 dd = {'table_path':path, 'attributes':{}}
1056 else:
1057 dd = []
1058 mfqid = list(md[0])
1059 for s in sjobs:
1060 sfqid = mfqid[:]
1061 sfqid.append(s)
1062 dd.append(tuple(sfqid))
1063 metadata = self._getMetaData(dd, attr_list)
1064 for md in metadata:
1065 rr.append(visitor(md))
1066 if USE_FOLDERS_FOR_SUBJOBS:
1067 rr.sort(sorter)
1068 except Exception, e:
1069 msg = 'Dictionary of job %s cannot be evaluated because of the error: %s. The job is most likely corrupted and will not be not imported.' % (str(e), str(md[0]))
1070 raise RepositoryError(e = e, msg = msg)
1071 else:
1072 return [jdict, rr]
1073
1074 metadata = self._getMetaData(ids_or_attributes, attr_list)
1075 for md in metadata:
1076 try:
1077 jdict = Parser.insertSubJobs(visitor(md))
1078 job = self._streamer._getJobFromDict(jdict)
1079 except Exception, e:
1080 msg = 'Exception: %s while constructing job object from a dictionary' % (str(e))
1081 logger.error(msg)
1082 else:
1083 jobs.append(job)
1084 return jobs
1085 finally:
1086 self._rep_lock.release()
1087
1088
1089 - def deleteJobs(self, ids, forced_action = False):
1090 self._rep_lock.acquire(1)
1091 try:
1092 details = {}
1093 msgs = []
1094
1095 def getSubjobs(ids, deep = True):
1096
1097
1098 sjlist = self._getLockedMetaData(ids, [self._subjobs[0]], '_deleting', forced_action)
1099
1100 sj_len = len(sjlist)
1101 ids_len = len(ids)
1102 if sj_len != ids_len:
1103
1104 for i in range(ids_len):
1105 jid = ids[i]
1106 fqid = self._fqnConverter(jid)
1107 bp = min(i, sj_len)
1108 for k in range(bp, sj_len) + range(0, bp):
1109 if len(sjlist[k]) > 0:
1110 if fqid == sjlist[k][0]:
1111 break
1112 else:
1113 msg = "Can not remove job %s" % str(fqid)
1114 details[fqid] = RepositoryError(msg = msg)
1115 msgs.append(msg)
1116 for md in sjlist:
1117 del md[2:]
1118 sjobs = eval(self._text2pstr(md[1]))
1119 mfqid = list(md[0])
1120 for i in range(len(sjobs)):
1121 sfqid = mfqid[:]
1122 sfqid.append(sjobs[i])
1123 sjobs[i] = tuple(sfqid)
1124 if deep:
1125 md[1] = getSubjobs(sjobs, deep)
1126 else:
1127 md[1] = sjobs
1128 return sjlist
1129
1130 def rm(sjlist):
1131 deleted_jobs = []
1132 for (fqid, ssjlist) in sjlist:
1133 try:
1134 self._generic_rm(fqid, forced_action)
1135 except (RepositoryError, CommandException), e:
1136 msg = "deleteJobs() command called while deleting job %s raised an exception: %s" % (str(fqid), str(e))
1137 details[fqid] = RepositoryError(e = e, msg = msg)
1138 msgs.append(msg)
1139 else:
1140
1141 if fqid in self._job_cache:
1142 del self._job_cache[fqid]
1143
1144 deleted_jobs.append([fqid, rm(ssjlist)])
1145 return deleted_jobs
1146
1147 sjlist = getSubjobs(ids)
1148
1149
1150 self._initBulkRm()
1151 try:
1152 deleted_jobs = rm(sjlist)
1153 finally:
1154 self._finalizeBulkRm()
1155
1156
1157 if USE_FOLDERS_FOR_SUBJOBS:
1158
1159 try:
1160 self._forcedRemoveDir(self._getSubJobPath(fqid))
1161 except Exception, e:
1162 msg = "Can not remove subjobs folder of the job %s because of the error: %s" % (str(fqid), str(e))
1163
1164 details[fqid] = RepositoryError(e = e, msg = msg)
1165 msgs.append(msg)
1166
1167
1168 mfqids = {}
1169 for (fqid, ssjlist) in deleted_jobs:
1170 mfqid = fqid[:-1]
1171 if mfqid:
1172 if not mfqid in mfqids:
1173 mfqids[mfqid] = []
1174 mfqids[mfqid].append(fqid)
1175
1176 sjlist = getSubjobs(mfqids.keys(), deep = False)
1177
1178 self._initCommand()
1179 try:
1180 attrs_r = [self._subjobs[0], self._lock[0], self._istate[0], self._time[0], self._counter[0]]
1181 values_r = ['', self.guid, '_', repr(time.time()), '1']
1182 attrs_s = attrs_r[:-1]
1183 values_s = values_r[:-1]
1184 for (mfqid, ssjlist) in sjlist:
1185 try:
1186 for fqid in mfqids[mfqid]:
1187 ssjlist.remove(fqid)
1188 for i in range(len(ssjlist)):
1189 ssjlist[i] = ssjlist[i][-1]
1190 if len(ssjlist) == 0:
1191
1192 attrs = attrs_r
1193 values = values_r
1194 else:
1195 attrs = attrs_s
1196 values = values_s
1197 values[0] = self._pstr2text(repr(ssjlist))
1198 self._generic_updateAttr(mfqid, attrs, values, forced_action)
1199 except Exception, e:
1200 msg = "Can not update master job %s of subjob %s because of the error: %s" % (str(mfqid), str(fqid), str(e))
1201
1202 details[fqid] = RepositoryError(e = e, msg = msg)
1203 msgs.append(msg)
1204 finally:
1205 self._finalizeCommand()
1206
1207 if USE_FOLDERS_FOR_SUBJOBS:
1208 for (mfqid, ssjlist) in sjlist:
1209 try:
1210 if len(ssjlist) == 0:
1211
1212 self._forcedRemoveDir(self._getSubJobPath(mfqid))
1213 except Exception, e:
1214 msg = "Can not unsplit master job %s after deleting the last subjob %s because of the error: %s" % (str(mfqid), str(fqid), str(e))
1215
1216 details[fqid] = RepositoryError(e = e, msg = msg)
1217 msgs.append(msg)
1218
1219 if details:
1220 raise BulkOperationRepositoryError(msg = '\n'.join(msgs), details = details)
1221 finally:
1222 self._rep_lock.release()
1223
1224
1226 self._rep_lock.acquire(1)
1227 try:
1228 try:
1229 metadata = self._getMetaData(ids_or_attributes, ['id'])
1230 except RepositoryError, e:
1231 msg = 'Exception: %s while getting job ids.' % str(e)
1232 logger.error(msg)
1233 raise e
1234 return map(lambda x: x[0], metadata)
1235 finally:
1236 self._rep_lock.release()
1237
1238
1240 self._rep_lock.acquire(1)
1241 try:
1242 mdlist = []
1243 attr_list = self._commit_schema[:]
1244 attr_list.remove(self._blob)
1245 attr_list.remove(self._lock)
1246 attr_list = map(lambda x: x[0], attr_list)
1247 try:
1248 metadata = self._getMetaData(ids_or_attributes, attr_list)
1249 except RepositoryError, e:
1250 msg = 'Exception: %s while getting job attributes.' % str(e)
1251 logger.error(msg)
1252 raise e
1253 for md in metadata:
1254 mdd = {}
1255 for a, v in zip(attr_list, md[1:]):
1256 if a == self._subjobs[0]:
1257 v = eval(self._text2pstr(v))
1258 mdd[a] = v
1259 mdlist.append(mdd)
1260 return mdlist
1261 finally:
1262 self._rep_lock.release()
1263
1264
1266 self._rep_lock.acquire(1)
1267 try:
1268 details = {}
1269 msgs = []
1270 attr_list = [self._compressed[0], self._blob[0]]
1271 attrs = ['status', self._compressed[0], self._blob[0], self._lock[0], self._istate[0], self._time[0]]
1272
1273 statusList = map(lambda x: (self._fqnConverter(x[0]), x[1]), statusList)
1274
1275
1276 fqids = map(lambda x: x[0], statusList)
1277 metadata = self._getMetaData(fqids, attr_list)
1278 mdids = map(lambda x: x[0], metadata)
1279
1280 istate = '_'
1281 timestamp = repr(time.time())
1282
1283 self._initCommand()
1284 try:
1285 for fqid, status in statusList:
1286 if fqid in mdids:
1287 md = metadata[mdids.index(fqid)]
1288 pstr = self._text2pstr(md[2])
1289 if md[1] == 'Y':
1290 pstr = self._decompress(pstr)
1291 jdict = eval(pstr)
1292 jdict['data']['status']['data'] = status
1293 pstr = repr(jdict)
1294 if USE_COMPRESSED_BLOBS:
1295 compressed = 'Y'
1296 pstr = self._compress(pstr)
1297 else:
1298 compressed = 'N'
1299 blob = self._pstr2text(pstr)
1300 vals = [status, compressed, blob, self.guid, istate, timestamp]
1301 try:
1302 self._generic_updateAttr(fqid, attrs, vals, forced_action)
1303 except (CommandException, RepositoryError), e:
1304 msg = "setJobsStatus() command called while committing job %s raised an exception: %s" % (str(fqid), str(e))
1305 msgs.append(msg)
1306
1307 details[fqid] = RepositoryError(e = e, msg = msg)
1308 else:
1309 self._job_cache[fqid] = self.guid
1310 else:
1311 msg = "Job %s is not found in the repository" % str(fqid)
1312 msgs.append(msg)
1313
1314 details[fqid] = RepositoryError(msg = msg)
1315 if details:
1316 raise BulkOperationRepositoryError(msg = '\n'.join(msgs), details = details)
1317 finally:
1318 self._finalizeCommand()
1319 finally:
1320 self._rep_lock.release()
1321
1322
1324 self._rep_lock.acquire(1)
1325 try:
1326 attr_list = ['status']
1327 try:
1328 metadata = self._getMetaData(ids_or_attributes, attr_list)
1329 except RepositoryError, e:
1330 msg = 'Exception: %s while getting job status.' % str(e)
1331 logger.error(msg)
1332 raise e
1333 return metadata
1334 finally:
1335 self._rep_lock.release()
1336
1337
1366
1367
1369 self._rep_lock.acquire(1)
1370 try:
1371 jtreefldr = os.path.join(self.root_dir, self._jobsTreeFldr)
1372 attrs = [self._jobsTreeAttr[0]]
1373 fn = os.path.join(jtreefldr, str(tree_id))
1374 if self._tree_streamer:
1375 val = self._tree_streamer.getStreamFromTree(jobtree)
1376 values = [self._pstr2text(val)]
1377 else:
1378 logger.warning("JobTree streamer has not been set.")
1379 logger.warning("jobtree object can not be saved in the repository")
1380 return
1381 try:
1382 self.listEntries(jtreefldr)
1383 while not self.eot():
1384 try:
1385 file, type = self.getEntry()
1386 except Exception, e:
1387 logger.error(str(e))
1388 else:
1389 if os.path.basename(file) == os.path.basename(fn):
1390 self.setAttr(fn, attrs, values)
1391 break
1392 else:
1393 self.addEntry(fn, attrs, values)
1394 except CommandException, e:
1395 logger.debug(str(e))
1396 raise RepositoryError(e = e, msg = str(e))
1397 finally:
1398 self._rep_lock.release()
1399
1400
1418
1419
1420
1422
1423
1424 - def __init__(self,
1425 schema,
1426 role,
1427 streamer,
1428 tree_streamer,
1429 root_dir,
1430 host = 'gangamd.cern.ch',
1431 port = 8822,
1432 login = 'user',
1433 password = 'ganga',
1434 reqSSL = True,
1435 keepalive = True,
1436 init_schema = True,
1437 **kwds):
1438 MDClient.__init__(self,
1439 host = host,
1440 port = port,
1441 login = login,
1442 password = password,
1443 keepalive = keepalive)
1444
1445 if reqSSL:
1446 fn = self._getGridProxy()
1447 key = kwds.get('key')
1448 if not key:
1449 key = fn
1450 cert = kwds.get('cert')
1451 if not cert:
1452 cert = fn
1453
1454 MDClient.requireSSL(self, key, cert)
1455 try:
1456 MDClient.connect(self)
1457 except Exception, e:
1458 msg = "Can not connect to the Repository because of the error: %s" % str(e)
1459 logger.error(msg)
1460 raise RepositoryError(e = e, msg = msg)
1461
1462 ARDARepositoryMixIn.__init__(self, schema, role,
1463 streamer,
1464 tree_streamer,
1465 root_dir,
1466 init_schema)
1467
1468
1481
1482
1484
1485
1486
1487
1488 if e.errorCode == 1:
1489 return True
1490 elif e.errorCode == 11 and e.msg.startswith('Not a directory'):
1491 return True
1492 return False
1493
1494
1496
1497 return e.errorCode == 17 or e.errorCode == 1
1498
1499
1501 logger.error("method removeAllLocks should not be called for remote registry")
1502
1503
1507
1508
1510 logger.error("method listAllLocks should not be called for remote registry")
1511 return []
1512
1513
1514
1516
1517 _blobsFldr = 'blobs'
1518 _blobsFldrAttr = ('jobBlob', 'text')
1519
1520
1521
1522
1523
1524
1525
1526 - def __init__(self,
1527 schema,
1528 role,
1529 streamer,
1530 tree_streamer,
1531 root_dir,
1532 host = 'gangamd.cern.ch',
1533 port = 8822,
1534 login = 'user',
1535 password = 'ganga',
1536 reqSSL = True,
1537 keepalive = True,
1538 init_schema = True,
1539 **kwds):
1540 RemoteARDAJobRepository.__init__(self,
1541 schema,
1542 role,
1543 streamer,
1544 tree_streamer,
1545 root_dir,
1546 host,
1547 port,
1548 login,
1549 password,
1550 reqSSL,
1551 keepalive,
1552 init_schema,
1553 **kwds)
1554
1555
1556
1558
1559 return e.errorCode == 11
1560
1561
1579
1580
1583
1584
1586 path = self._getBlobsFldrName(self._getJobFolderName(fqid))
1587 basename = map(str, fqid)
1588 basename.append(guid)
1589 fn = '.'.join(basename)
1590 return os.path.join(path, fn)
1591
1592
1593 - def _initDir(self, path, schema = None, create_sequence = True):
1606
1607
1621
1622
1623 - def _addBlob(self, fqid, attrs, values):
1632
1633
1634 - def _generic_addEntry(self, fqid, attrs, values):
1640
1641
1648
1649
1651 return not len(self._command_buffer)
1652
1653
1655 command_buffer = []
1656 RemoteARDAJobRepository._generic_getattr(self, fqid, attr_list)
1657 while not self.eot():
1658 command_buffer.append(self.getEntry())
1659 if self._blob[0] in attr_list:
1660 self._command_buffer = []
1661 blob_index = attr_list.index(self._blob[0])
1662 for (f, md) in command_buffer:
1663 blob_fn = self._getBlobFileName(fqid, md[blob_index])
1664 try:
1665 self.getattr(blob_fn, [self._blobsFldrAttr[0]])
1666 except CommandException, e:
1667 msg = "ARDA interface command getattr() called for job %s raised an exception: %s" % (str(fqid), str(e))
1668 logger.error(msg)
1669 else:
1670 while not self.eot():
1671 fn, blob_md = self.getEntry()
1672 assert(fn == os.path.basename(blob_fn))
1673 md[blob_index] = blob_md[0]
1674 self._command_buffer.append((f, md))
1675 else:
1676 self._command_buffer = command_buffer
1677
1678
1680 return self._command_buffer.pop(0)
1681
1682
1709
1710
1712
1713 return self._command_buffer.pop(0)
1714
1715
1725
1726
1727
1729
1730
1731 - def __init__(self,
1732 schema,
1733 role,
1734 streamer,
1735 tree_streamer,
1736 root_dir,
1737 local_root = '/tmp/',
1738 blocklength = 1000,
1739 cache_size = 100000,
1740 tries_limit = 200,
1741 lock_timeout = 1,
1742 init_schema = True,
1743 **kwds):
1744
1745
1746 self.__row = 0
1747
1748
1749 if not os.path.isdir(local_root):
1750 try:
1751 os.makedirs(local_root)
1752 except Exception, e:
1753 logger.error(str(e))
1754 raise e
1755 else:
1756 logger.debug("Root directory %s has been successfully created" % local_root)
1757
1758
1759
1760 MDStandalone.__init__(self, local_root,
1761 blocklength = blocklength,
1762 cache_size = cache_size,
1763 tries_limit = tries_limit)
1764
1765
1766
1767 max_lock_time = 0.05*tries_limit
1768 if lock_timeout < max_lock_time:
1769 logger.warning("The 'lock_timeout' parameter is too small with respect to 'tries_limit' = %d" % tries_limit)
1770 logger.warning("In order to avoid risk of deleting valid lock file the 'lock_timeout' parameter will be adjusted automatically")
1771 lock_timeout = max_lock_time
1772 self._lock_timeout = lock_timeout
1773
1774
1775 try:
1776 old_locks = MDStandalone.listAllLocks(self, self._lock_timeout)
1777 if old_locks:
1778 logger.warning("Lock files that are older than %d seconds are found in the repository" % self._lock_timeout)
1779 logger.warning("These locks most likely appear as a result of previous errors and will be removed")
1780 logger.warning("You can suppress this operation by increasing 'lock_timeout' parameter of the local repository constructor")
1781 logger.warning("Deleting old lock files: %s", old_locks)
1782 MDStandalone.removeAllLocks(self, self._lock_timeout)
1783 except Exception, e:
1784 logger.debug(str(e))
1785 Ganga.Utility.logging.log_user_exception(logger)
1786 raise RepositoryError(e = e, msg = str(e))
1787
1788
1789 ARDARepositoryMixIn.__init__(self, schema, role,
1790 streamer,
1791 tree_streamer,
1792 root_dir,
1793 init_schema)
1794
1795
1808
1809
1811 self._rep_lock.acquire(1)
1812 try:
1813 r_table = self._MDStandalone__absolutePath(self.root_dir)
1814 p_table = self._MDStandalone__absolutePath(path)
1815 if p_table != r_table:
1816 try:
1817
1818 if r_table in self.loaded_tables:
1819 mdtable = self.self.loaded_tables[r_table]
1820 attr_name = mdtable.attributes.blocks[0].name
1821 attr_dir = mdtable.attributes.storage.dirname
1822 path_dir = self._MDStandalone__systemPath(p_table)
1823 shutil.copy(os.path.join(attr_dir, attr_name), path_dir)
1824 except Exception, e:
1825 logger.debug(str(e))
1826
1827 ARDARepositoryMixIn._createAttrIfMissing(self, path, schema)
1828 finally:
1829 self._rep_lock.release()
1830
1831
1833
1834 ii = range(len(mdtable.entries))
1835 return ii[self.__row:] + ii[:self.__row]
1836
1837
1839 self._MDStandalone__initTransaction()
1840 try:
1841 self.rows = []
1842 mdtable = self._MDStandalone__loadTable(path)[0]
1843
1844 indx = []
1845 for k in attr_list:
1846 if k in mdtable.attributeDict:
1847 ind = mdtable.attributeDict[k]+1
1848 indx.append(ind)
1849
1850 f_indx = []
1851 f_res = True
1852 for k in selection:
1853 if k in mdtable.attributeDict:
1854 ind = mdtable.attributeDict[k]+1
1855 f_indx.append((ind,k))
1856 else:
1857 f_res = False
1858
1859 def filterer(e):
1860 res = f_res
1861 if res:
1862 for ind, k in f_indx:
1863 res = res and (e[ind] == selection[k])
1864 if not res:
1865 break
1866 return res
1867
1868 for e in mdtable.entries:
1869 if filterer(e):
1870 md = [e[0]]
1871 for ind in indx:
1872 md.append(e[ind])
1873 self.rows.append(md)
1874 finally:
1875 self.releaseAllLocks()
1876
1877
1879 self._MDStandalone__initTransaction()
1880 try:
1881 file = self._getJobFileName(fqid)
1882 path, entry = os.path.split(file)
1883 mdtable = self._MDStandalone__loadTable(path)[0]
1884 self.rows = []
1885
1886 indx = []
1887 for k in attrs:
1888 ind = mdtable.attributeDict[k]+1
1889 indx.append(ind)
1890
1891 for i in self._get_iteration_list(mdtable):
1892 e = mdtable.entries[i]
1893 if e[0] == entry:
1894 row = []
1895 row.append(e[0])
1896 for ind in indx:
1897 row.append(e[ind])
1898 self.rows.append(row)
1899 self.__row = i
1900 break
1901 finally:
1902 self.releaseAllLocks()
1903
1904
1928
1929
1949
1950
1953
1954
1957
1958
1970
1971
1974
1975
1978
1979
1991
1992
1994 self._rep_lock.acquire(1)
1995 try:
1996 try:
1997 return MDStandalone.listAllLocks(self, self._lock_timeout)
1998 except Exception, e:
1999 logger.debug(str(e))
2000 Ganga.Utility.logging.log_user_exception(logger)
2001 raise RepositoryError(e = e, msg = str(e))
2002 finally:
2003 self._rep_lock.release()
2004
2005
2006
2007
2009
2010 config = Ganga.Utility.Config.getConfig('Configuration')
2011
2012
2013 repositoryType = "LocalAMGA"
2014
2015 assert repositoryType in ['LocalAMGA', 'RemoteAMGA', 'Test']
2016
2017
2018 rep_config = all_configs[repositoryType]
2019 kwargs = kwargs.copy()
2020 for key in rep_config.options.keys():
2021 if key not in kwargs:
2022 kwargs[key] = rep_config[key]
2023
2024 kw_schema = kwargs.get('schema')
2025 if kw_schema is None:
2026 kw_schema = schema[:]
2027
2028 role = kwargs.get('role')
2029 if not role:
2030 role = 'Client'
2031
2032 streamer = kwargs.get('streamer')
2033 if streamer is None:
2034 from Ganga.GPIDev.Streamers.SimpleStreamer import SimpleJobStreamer
2035 streamer = SimpleJobStreamer()
2036
2037 tree_streamer = kwargs.get('tree_streamer')
2038 if tree_streamer is None:
2039 from Ganga.GPIDev.Streamers.SimpleStreamer import SimpleTreeStreamer
2040 tree_streamer = SimpleTreeStreamer()
2041
2042 root_dir = kwargs.get('root_dir')
2043 if not root_dir:
2044 if repositoryType == 'RemoteAMGA':
2045 root_dir = '/'.join(['', 'users', config['user'], __version__])
2046 else:
2047 root_dir = '/'.join(['', __version__])
2048
2049 subpath = kwargs.get('subpath')
2050 if subpath:
2051 root_dir = '/'.join([root_dir, subpath])
2052
2053
2054 for aname in ['schema', 'role', 'streamer', 'tree_streamer', 'root_dir']:
2055 if aname in kwargs:
2056 del kwargs[aname]
2057
2058 info = 'AMGA Job Repository: type=%s ganga_user=%s' % (config['repositorytype'], config['user'])
2059 try:
2060 if repositoryType == 'LocalAMGA':
2061 if not kwargs.get('local_root'):
2062
2063 kwargs['local_root'] = os.path.join(expandfilename(config['gangadir']), 'repository', config['user'], 'LocalAMGA')
2064 info += ' db_location=%s' % kwargs['local_root']
2065 logger.debug("Creating local repository ...")
2066 return LocalARDAJobRepository(kw_schema, role,
2067 streamer,
2068 tree_streamer,
2069 root_dir,
2070 **kwargs)
2071
2072 if repositoryType == 'RemoteAMGA':
2073 if not kwargs.get('login'):
2074 kwargs['login'] = config['user']
2075 info += ' login=%s host=%s port=%s reqSSL=%s' % (kwargs['login'], kwargs['host'], kwargs['port'], kwargs['reqSSL'])
2076 if USE_ORACLE_AS_REMOTE_BACKEND:
2077 RemoteRepositoryClass = RemoteOracleARDAJobRepository
2078 else:
2079 RemoteRepositoryClass = RemoteARDAJobRepository
2080 logger.debug("Creating remote repository ...")
2081 return RemoteRepositoryClass(kw_schema, role,
2082 streamer,
2083 tree_streamer,
2084 root_dir,
2085 **kwargs)
2086
2087 logger.debug("Creating test repository...")
2088 from TestRepository import TestRepository
2089 if not kwargs.get('local_root'):
2090
2091 kwargs['local_root'] = os.path.join(expandfilename(config['gangadir']), 'repository', config['user'], 'Test')
2092 info += ' root_dir='+root_dir
2093 return TestRepository(kw_schema, role, streamer, root_dir, **kwargs)
2094
2095 finally:
2096 info += ' root_dir=' + root_dir
2097 logger.info(info)
2098