1 """
2 Core package defines the fundamental subsystems of Ganga Architecture.
3 Subsystems are autonomous components (such as a remote services) which may be independetly deployed.
4 Subsystems may also be created as local objects in the Ganga Client process.
5 """
6
7 from exceptions import *
8
9 monitoring_component = None
10
12 """
13 Change the default value of autostart of the monitoring, depending if the session is interactive or batch.
14 The autostart value may be overriden in the config file, so warn if it differs from the default.
15 This function should be called
16 """
17 from Ganga.Core.MonitoringComponent.Local_GangaMC_Service import config
18
19
20 t_last = None
21
23 """
24 Create local subsystems. In the future this procedure should be enhanced to connect to remote subsystems.
25 FIXME: this procedure should be moved to the Runtime package.
26
27 This function will change the default value of autostart of the monitoring, depending if the session is interactive or batch.
28 The autostart value may be overriden in the config file, so warn if it differs from the default.
29 """
30 from Ganga.Core.MonitoringComponent.Local_GangaMC_Service import JobRegistry_Monitor, config
31
32 config.addOption('forced_shutdown_policy','session_type','If there are remaining background activities at exit such as monitoring, output download Ganga will attempt to wait for the activities to complete. You may select if a user is prompted to answer if he wants to force shutdown ("interactive") or if the system waits on a timeout without questions ("timeout"). The default is "session_type" which will do interactive shutdown for CLI and timeout for scripts.')
33
34 config.addOption('forced_shutdown_timeout',60,"Timeout in seconds for forced Ganga shutdown in batch mode.")
35 config.addOption('forced_shutdown_prompt_time',10,"User will get the prompt every N seconds, as specified by this parameter.")
36 config.addOption('forced_shutdown_first_prompt_time',5,"User will get the FIRST prompt after N seconds, as specified by this parameter. This parameter also defines the time that Ganga will wait before shutting down, if there are only non-critical threads alive, in both interactive and batch mode.")
37
38 from Ganga.Utility.logging import getLogger
39
40 logger = getLogger()
41
42 from Ganga.Core.GangaThread import GangaThreadPool
43
44
45 thread_pool = GangaThreadPool.getInstance()
46
47
48 from Ganga.Core.InternalServices import Coordinator,ShutdownManager
49 Coordinator.bootstrap()
50
51
52
53
54 for j in reg:
55 if hasattr(j,'status') and j.status in ['submitted','running']:
56 if hasattr(j,'backend'):
57 if hasattr(j.backend,'setup'):
58 j.backend.setup()
59
60
61 global monitoring_component
62 monitoring_component = JobRegistry_Monitor( reg )
63 monitoring_component.start()
64
65
66 import atexit
67
68 def should_wait_interactive_cb(t_total, critical_thread_ids, non_critical_thread_ids):
69 global t_last
70 if t_last is None:
71 t_last = -time.time()
72
73 if critical_thread_ids:
74 if ((t_last<0 and time.time()+t_last > config['forced_shutdown_first_prompt_time']) or
75 (t_last>0 and time.time()-t_last > config['forced_shutdown_prompt_time'])):
76 msg = """Job status update or output download still in progress (shutdown not completed after %d seconds).
77 %d background thread(s) still running: %s.
78 Do you want to force the exit (y/[n])? """ % (t_total, len(critical_thread_ids), critical_thread_ids)
79 resp = raw_input(msg)
80 t_last = time.time()
81 return resp.lower() != 'y'
82 else:
83 return True
84
85 elif non_critical_thread_ids:
86 if t_total < config['forced_shutdown_first_prompt_time']:
87 return True
88 else:
89 return False
90
91 else:
92 return False
93
94 def should_wait_batch_cb(t_total, critical_thread_ids, non_critical_thread_ids):
95
96 if critical_thread_ids:
97 if t_total < config['forced_shutdown_timeout']:
98 return True
99 else:
100 logger.warning('Shutdown was forced after waiting for %d seconds for background activities to finish (monitoring, output download, etc). This may result in some jobs being corrupted.',t_total)
101 return False
102
103 elif non_critical_thread_ids:
104 if t_total < config['forced_shutdown_first_prompt_time']:
105 return True
106 else:
107 return False
108
109 else:
110 return False
111
112
113
114
115
116
117 forced_shutdown_policy = config['forced_shutdown_policy']
118
119 if forced_shutdown_policy == 'interactive':
120 should_wait_cb = should_wait_interactive_cb
121 else:
122 if forced_shutdown_policy == 'batch':
123 should_wait_cb = should_wait_batch_cb
124 else:
125 if interactive_session:
126 should_wait_cb = should_wait_interactive_cb
127 else:
128 should_wait_cb = should_wait_batch_cb
129
130 atexit.register((0,thread_pool.shutdown), should_wait_cb=should_wait_cb)
131
132
133 from Ganga.Runtime.GPIexport import exportToGPI
134 exportToGPI('runMonitoring',monitoring_component.runMonitoring,'Functions')
135
136 autostart_default = interactive_session
137 config.overrideDefaultValue('autostart',bool(autostart_default))
138
139 if config['autostart'] is not autostart_default:
140 msg = 'monitoring loop %s (the default setting for %s session is %s)'
141 val = { True : ('enabled', 'batch', 'disabled'),
142 False: ('disabled', 'interactive', 'enabled')}
143 logger.warning(msg%val[config['autostart']])
144
145 if config['autostart']:
146 monitoring_component.enableMonitoring()
147
148
149
150 import time
151 class Stuck(GangaThread.GangaThread):
152 def __init__(self):
153 GangaThread.GangaThread.__init__(self,name='Stuck')
154 def run(self):
155 i = 0
156 while i < 10:
157 time.sleep(3)
158
159 i += 1
160 def stop(self):
161 print "I was asked to stop..."
162
163
164
165