Package Ganga :: Package Lib :: Package LCG :: Module LCGOutputDownloader
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Lib.LCG.LCGOutputDownloader

  1  from Ganga.Utility.logging import getLogger 
  2  from Ganga.Core.GangaThread.MTRunner import MTRunner, Data, Algorithm 
  3  from Ganga.Lib.LCG.Utility import *  
  4   
  5  logger = getLogger() 
  6   
7 -class LCGOutputDownloadTask:
8 """ 9 Class for defining a data object for each output downloading task. 10 """ 11 12 _attributes = ('gridObj', 'jobObj', 'use_wms_proxy') 13
14 - def __init__(self, gridObj, jobObj, use_wms_proxy):
15 self.gridObj = gridObj 16 self.jobObj = jobObj 17 self.use_wms_proxy = use_wms_proxy
18
19 - def __eq__(self, other):
20 """ 21 download task comparison based on job's FQID. 22 """ 23 if self.jobObj.getFQID('.') == other.jobObj.getFQID('.'): 24 return True 25 else: 26 return False
27
28 - def __str__(self):
29 """ 30 represents the task by the job object 31 """ 32 return 'downloading task for job %s' % self.jobObj.getFQID('.')
33
34 -class LCGOutputDownloadAlgorithm(Algorithm):
35 """ 36 Class for implementing the logic of each downloading task. 37 """ 38
39 - def process(self, item):
40 """ 41 downloads output of one LCG job 42 """ 43 44 pps_check = (True,None) 45 46 grid = item.gridObj 47 job = item.jobObj 48 wms_proxy = item.use_wms_proxy 49 50 ## it is very likely that the job's downloading task has been 51 ## created and assigned in a previous monitoring loop 52 ## ignore such kind of cases 53 if job.status in ['completing', 'completed', 'failed']: 54 return True 55 56 ## it can also happen that the job was killed/removed by user between 57 ## the downloading task was created in queue and being taken by one of 58 ## the downloading thread. Ignore suck kind of cases 59 if job.status in ['removed','killed']: 60 return True 61 62 job.updateStatus('completing') 63 outw = job.getOutputWorkspace() 64 65 pps_check = grid.get_output(job.backend.id, outw.getPath(), wms_proxy=wms_proxy) 66 67 if pps_check[0]: 68 job.updateStatus('completed') 69 job.backend.exitcode = 0 70 else: 71 job.updateStatus('failed') 72 # update the backend's reason if the failure detected in the Ganga's pps 73 if pps_check[1] != 0: 74 job.backend.reason = 'non-zero app. exit code: %s' % pps_check[1] 75 job.backend.exitcode = pps_check[1] 76 77 # needs to update the master job's status to give an up-to-date status of the whole job 78 if job.master: 79 job.master.updateMasterJobStatus() 80 81 self.__appendResult__( job.getFQID('.'), True ) 82 83 return True
84
85 -class LCGOutputDownloader(MTRunner):
86 87 """ 88 Class for managing the LCG output downloading activities based on MTRunner. 89 """ 90
91 - def __init__(self, numThread=10):
92 93 MTRunner.__init__(self, name='lcg_output_downloader', data=Data(collection=[]), algorithm=LCGOutputDownloadAlgorithm()) 94 95 self.keepAlive = True 96 self.numThread = numThread
97
98 - def countAliveAgent(self):
99 100 return self.__cnt_alive_threads__()
101
102 - def addTask(self, grid, job, use_wms_proxy):
103 104 task = LCGOutputDownloadTask(grid, job, use_wms_proxy) 105 106 logger.debug( 'add output downloading task: job %s' % job.getFQID('.') ) 107 108 self.addDataItem(task) 109 110 return True
111