Package Ganga :: Package Core
[hide private]
[frames] | no frames]

Source Code for Package Ganga.Core

  1  """ 
  2  Core package defines the fundamental subsystems of Ganga Architecture. 
  3  Subsystems are autonomous components (such as a remote services) which may be independetly deployed. 
  4  Subsystems may also be created as local objects in the Ganga Client process. 
  5  """ 
  6   
  7  from exceptions import * 
  8   
  9  monitoring_component = None 
 10   
11 -def set_autostart_policy(interactive_session):
12 """ 13 Change the default value of autostart of the monitoring, depending if the session is interactive or batch. 14 The autostart value may be overriden in the config file, so warn if it differs from the default. 15 This function should be called 16 """ 17 from Ganga.Core.MonitoringComponent.Local_GangaMC_Service import config
18 19 # internal helper variable for interactive shutdown 20 t_last = None 21
22 -def bootstrap(reg, interactive_session):
23 """ 24 Create local subsystems. In the future this procedure should be enhanced to connect to remote subsystems. 25 FIXME: this procedure should be moved to the Runtime package. 26 27 This function will change the default value of autostart of the monitoring, depending if the session is interactive or batch. 28 The autostart value may be overriden in the config file, so warn if it differs from the default. 29 """ 30 from Ganga.Core.MonitoringComponent.Local_GangaMC_Service import JobRegistry_Monitor, config 31 32 config.addOption('forced_shutdown_policy','session_type','If there are remaining background activities at exit such as monitoring, output download Ganga will attempt to wait for the activities to complete. You may select if a user is prompted to answer if he wants to force shutdown ("interactive") or if the system waits on a timeout without questions ("timeout"). The default is "session_type" which will do interactive shutdown for CLI and timeout for scripts.') 33 34 config.addOption('forced_shutdown_timeout',60,"Timeout in seconds for forced Ganga shutdown in batch mode.") 35 config.addOption('forced_shutdown_prompt_time',10,"User will get the prompt every N seconds, as specified by this parameter.") 36 config.addOption('forced_shutdown_first_prompt_time',5,"User will get the FIRST prompt after N seconds, as specified by this parameter. This parameter also defines the time that Ganga will wait before shutting down, if there are only non-critical threads alive, in both interactive and batch mode.") 37 38 from Ganga.Utility.logging import getLogger 39 40 logger = getLogger() 41 42 from Ganga.Core.GangaThread import GangaThreadPool 43 44 # create generic Ganga thread pool 45 thread_pool = GangaThreadPool.getInstance() 46 47 #start the internal services coordinator 48 from Ganga.Core.InternalServices import Coordinator,ShutdownManager 49 Coordinator.bootstrap() 50 #load the shutdown manager 51 #ShutdownManager.install() 52 53 # backend-specific setup (e.g. Remote: setup any remote ssh pipes) 54 for j in reg: 55 if hasattr(j,'status') and j.status in ['submitted','running']: 56 if hasattr(j,'backend'): # protect: EmptyGangaObject does not have backend either 57 if hasattr(j.backend,'setup'): # protect: EmptyGangaObject does not have setup() method 58 j.backend.setup() 59 60 #start the monitoring loop 61 global monitoring_component 62 monitoring_component = JobRegistry_Monitor( reg ) 63 monitoring_component.start() 64 65 #register the MC shutdown hook 66 import atexit 67 68 def should_wait_interactive_cb(t_total, critical_thread_ids, non_critical_thread_ids): 69 global t_last 70 if t_last is None: 71 t_last = -time.time() 72 # if there are critical threads then prompt user or wait depending on configuration 73 if critical_thread_ids: 74 if ((t_last<0 and time.time()+t_last > config['forced_shutdown_first_prompt_time']) or 75 (t_last>0 and time.time()-t_last > config['forced_shutdown_prompt_time'])): 76 msg = """Job status update or output download still in progress (shutdown not completed after %d seconds). 77 %d background thread(s) still running: %s. 78 Do you want to force the exit (y/[n])? """ % (t_total, len(critical_thread_ids), critical_thread_ids) 79 resp = raw_input(msg) 80 t_last = time.time() 81 return resp.lower() != 'y' 82 else: 83 return True 84 # if there are non-critical threads then wait or shutdown depending on configuration 85 elif non_critical_thread_ids: 86 if t_total < config['forced_shutdown_first_prompt_time']: 87 return True 88 else: 89 return False 90 # if there are no threads then shutdown 91 else: 92 return False
93 94 def should_wait_batch_cb(t_total, critical_thread_ids, non_critical_thread_ids): 95 # if there are critical threads then wait or shutdown depending on configuration 96 if critical_thread_ids: 97 if t_total < config['forced_shutdown_timeout']: 98 return True 99 else: 100 logger.warning('Shutdown was forced after waiting for %d seconds for background activities to finish (monitoring, output download, etc). This may result in some jobs being corrupted.',t_total) 101 return False 102 # if there are non-critical threads then wait or shutdown depending on configuration 103 elif non_critical_thread_ids: 104 if t_total < config['forced_shutdown_first_prompt_time']: 105 return True 106 else: 107 return False 108 # if there are no threads then shutdown 109 else: 110 return False 111 112 #register the exit function with the highest priority (==0) 113 #atexit.register((0,monitoring_component.stop), fail_cb=mc_fail_cb,max_retries=config['max_shutdown_retries']) 114 115 116 #select the shutdown method based on configuration and/or session type 117 forced_shutdown_policy = config['forced_shutdown_policy'] 118 119 if forced_shutdown_policy == 'interactive': 120 should_wait_cb = should_wait_interactive_cb 121 else: 122 if forced_shutdown_policy == 'batch': 123 should_wait_cb = should_wait_batch_cb 124 else: 125 if interactive_session: 126 should_wait_cb = should_wait_interactive_cb 127 else: 128 should_wait_cb = should_wait_batch_cb 129 130 atexit.register((0,thread_pool.shutdown), should_wait_cb=should_wait_cb) 131 132 #export to GPI 133 from Ganga.Runtime.GPIexport import exportToGPI 134 exportToGPI('runMonitoring',monitoring_component.runMonitoring,'Functions') 135 136 autostart_default = interactive_session 137 config.overrideDefaultValue('autostart',bool(autostart_default)) 138 139 if config['autostart'] is not autostart_default: 140 msg = 'monitoring loop %s (the default setting for %s session is %s)' 141 val = { True : ('enabled', 'batch', 'disabled'), 142 False: ('disabled', 'interactive', 'enabled')} 143 logger.warning(msg%val[config['autostart']]) 144 145 if config['autostart']: 146 monitoring_component.enableMonitoring() 147 148 149 # THIS IS FOR DEBUGGING ONLY 150 import time 151 class Stuck(GangaThread.GangaThread): 152 def __init__(self): 153 GangaThread.GangaThread.__init__(self,name='Stuck') 154 def run(self): 155 i = 0 156 while i < 10: 157 time.sleep(3) 158 #print '*'*30,i 159 i += 1 160 def stop(self): 161 print "I was asked to stop..." 162 #DISABLED 163 #s = Stuck() 164 #s.start() 165