Package Ganga :: Package GPIDev :: Package Lib :: Package Tasks :: Module TaskApplication
[hide private]
[frames] | no frames]

Source Code for Module Ganga.GPIDev.Lib.Tasks.TaskApplication

  1   
  2  from Ganga.GPIDev.Schema import * 
  3  from common import * 
  4  from new import classobj 
  5  from Ganga.GPIDev.Base.Proxy import addProxy, stripProxy 
  6   
  7   
  8  handler_map = [] 
  9   
10 -def __task__init__(self):
11 ## This assumes TaskApplication is #1 in MRO ( the list of methods ) 12 baseclass = self.__class__.mro()[2] 13 baseclass.__init__(self)
14 ## Now do a trick to convince classes to use us if they foolishly check the name 15 ## (this is a bug workaround) 16 #self._name = baseclass._name 17
18 -def taskify(baseclass,name):
19 smajor = baseclass._schema.version.major 20 sminor = baseclass._schema.version.minor 21 22 if baseclass._category == "applications": 23 schema_items = { 24 'id' : SimpleItem(defvalue=-1, protected=1, copyable=1,splitable=1,doc='number of this application in the transform.', typelist=["int"]), 25 'tasks_id' : SimpleItem(defvalue="-1:-1", protected=1, copyable=1,splitable=1,doc='id of this task:transform',typelist=["str"]), 26 }.items() 27 taskclass = TaskApplication 28 elif baseclass._category == "splitters": 29 schema_items = { 30 'task_partitions' : SimpleItem(defvalue=[], copyable=1,doc='task partition numbers.', typelist=["list"]), 31 }.items() 32 taskclass = TaskSplitter 33 34 classdict = { 35 "_schema" : Schema(Version(smajor,sminor), dict(baseclass._schema.datadict.items() + schema_items)), 36 "_category" : baseclass._category, 37 "_name" : name, 38 "__init__" : __task__init__, 39 } 40 41 for var in ["_GUIPrefs","_GUIAdvancedPrefs","_exportmethods"]: 42 if var in baseclass.__dict__: 43 classdict[var] = baseclass.__dict__[var] 44 cls = classobj(name,(taskclass,baseclass), classdict) 45 46 ## Use the same handlers as for the base class 47 handler_map.append((baseclass.__name__, name)) 48 49 return cls
50
51 -class TaskApplication(object):
52 - def getTransform(self):
53 tid = self.tasks_id.split(":") 54 if len(tid) == 2 and tid[0].isdigit() and tid[1].isdigit(): 55 try: 56 task = GPI.tasks(int(tid[0])) 57 except KeyError: 58 return None 59 if task: 60 return task.transforms[int(tid[1])] 61 if len(tid) == 3 and tid[1].isdigit() and tid[2].isdigit(): 62 task = GPI.tasks(int(tid[1])) 63 if task: 64 return task.transforms[int(tid[2])] 65 return None
66
67 - def transition_update(self,new_status):
68 #print "Transition Update of app ", self.id, " to ",new_status 69 try: 70 if self.tasks_id.startswith("00"): ## Master job 71 if new_status == "new": ## something went wrong with submission 72 for sj in self._getParent().subjobs: 73 sj.application.transition_update(new_status) 74 return 75 transform = self.getTransform() 76 if transform: 77 transform._impl.setAppStatus(self, new_status) 78 except Exception, x: 79 import traceback, sys 80 logger.error("Exception in call to transform[%s].setAppStatus(%i, %s)", self.tasks_id, self.id, new_status) 81 print >> sys.stderr, x.__class__.__name__, ":", x 82 tb = sys.exc_info()[2] 83 if tb: 84 traceback.print_tb(tb) 85 else: 86 print >> sys.stderr, "No Traceback available" 87 88 logger.error("%s", x)
89 90 91
92 -class TaskSplitter(object):
93 ### Splitting based on numsubjobs
94 - def split(self,job):
95 subjobs = self.__class__.mro()[2].split(self,job) 96 ## Get information about the transform 97 transform = stripProxy(job.application.getTransform()) 98 id = job.application.id 99 partition = transform._app_partition[id] 100 ## Tell the transform this job will never be executed ... 101 transform.setAppStatus(job.application, "removed") 102 ## .. but the subjobs will be 103 for i in range(0,len(subjobs)): 104 subjobs[i].application.tasks_id = job.application.tasks_id 105 subjobs[i].application.id = transform.getNewAppID(self.task_partitions[i]) 106 # Do not set to submitting - failed submission will make the applications stuck... 107 # transform.setAppStatus(subjobs[i].application, "submitting") 108 if not job.application.tasks_id.startswith("00"): 109 job.application.tasks_id = "00:%s" % job.application.tasks_id 110 return subjobs
111 112 113 114 115 116 117 from Ganga.Lib.Executable.Executable import Executable 118 from Ganga.Lib.Splitters import ArgSplitter 119 120 ExecutableTask = taskify(Executable,"ExecutableTask") 121 ArgSplitterTask = taskify(ArgSplitter,"ArgSplitterTask") 122 123 task_map = {"Executable": ExecutableTask}
124 -def taskApp(app):
125 """ Copy the application app into a task application. Returns a task application without proxy """ 126 a = stripProxy(app) 127 if "Task" in a._name: 128 return a 129 elif a._name in task_map: 130 b = task_map[a._name]() 131 132 else: 133 logger.error("The application '%s' cannot be used with the tasks package yet!" % a._name) 134 raise AttributeError() 135 for k in a._data: 136 b._data[k] = a._data[k] 137 return b
138