1 """GangaMon Monitoring Service plugin."""
2
3 from Ganga.Lib.MonitoringServices.MSGMS import MSGUtil
4
5
7 """Initialize MSGMS configuration."""
8 try:
9 from Ganga.Utility import Config
10
11 config = Config.makeConfig('MSGMS','Settings for the MSGMS monitoring plugin. Cannot be changed ruding the interactive Ganga session.')
12 config.addOption('server', 'ganga.msg.cern.ch', 'The server to connect to')
13 config.addOption('port', 6163, 'The port to connect to')
14 config.addOption('username', '', '')
15 config.addOption('password', '', '')
16 config.addOption('message_destination', '/queue/ganga.status', '')
17 config.addOption('usage_message_destination',"/queue/ganga.usage",'')
18 config.addOption('job_submission_message_destination',"/queue/ganga.jobsubmission",'')
19
20
21 def deny_modification(name,x):
22 raise Config.ConfigError('Cannot modify [MSGMS] settings (attempted %s=%s)' % (name, x))
23 config.attachUserHandler(deny_modification, None)
24 except ImportError:
25
26 pass
27 _initconfig()
28
29
30
31 _publisher = None
39
40
41 from Ganga.GPIDev.Adapters.IMonitoringService import IMonitoringService
42 -class MSGMS(IMonitoringService):
43 """GangaMon Monitoring Service based on MSG.
44
45 Publishes job meta-data to an MSG destination on submit, start and stop events
46 for consumption by a GangaMon subscriber.
47
48 See IMonitoringService for implementation details.
49 """
50
51 - def __init__(self, job_info, config_info):
54
59 getConfig = staticmethod(getConfig)
60
68
70 """Create job_info from Job object."""
71 if self.job_info.master is None:
72 ganga_job_id = str(self.job_info.id)
73 ganga_job_uuid = self.job_info.info.uuid
74 ganga_master_uuid = 0
75 else:
76 ganga_job_id = str(self.job_info.master.id) + '.' + str(self.job_info.id)
77 ganga_job_uuid = self.job_info.info.uuid
78 ganga_master_uuid = self.job_info.master.info.uuid
79 from Ganga.Utility import Config
80 return { 'ganga_job_uuid' : ganga_job_uuid
81 , 'ganga_master_uuid' : ganga_master_uuid
82 , 'ganga_user_repository' : Config.getConfig('Configuration')['user']
83 + '@' + Config.getConfig('System')['GANGA_HOSTNAME']
84 + ':' + Config.getConfig('Configuration')['gangadir']
85 , 'ganga_job_id' : ganga_job_id
86 , 'subjobs' : len(self.job_info.subjobs)
87 , 'backend' : self.job_info.backend.__class__.__name__
88 , 'application' : self.job_info.application.__class__.__name__
89 , 'job_name' : self.job_info.name
90 , 'hostname' : ''
91 , 'event' : ''
92 }
93
95 """Create message from job_info adding hostname and event."""
96 import types
97 if isinstance(self.job_info, types.DictType):
98
99 message = self.job_info.copy()
100 else:
101
102 message = self.getJobInfo()
103 message['hostname'] = hostname()
104 message['event'] = event
105 return message
106
107 - def send(self, message):
108 """Send the message to the configured destination."""
109
110 p = _get_publisher(
111 self.config_info['server'],
112 self.config_info['port'],
113 self.config_info['username'],
114 self.config_info['password'],
115 )
116
117 headers = {'persistent':'true'}
118 p.send(self.config_info['message_destination'], repr(message), headers)
119
121 """Log submit event on client."""
122
123 if self.job_info.master is not None:
124 if self.job_info.id == 0:
125 masterjob_msg = self.getMessage('submitted')
126 masterjob_msg['subjobs'] = len(self.job_info.master.subjobs)
127 masterjob_msg['ganga_job_id'] = str(masterjob_msg['ganga_job_id']).split('.')[0]
128
129 masterjob_msg['ganga_job_uuid'] = masterjob_msg['ganga_master_uuid']
130 masterjob_msg['ganga_master_uuid'] = 0
131 self.send(masterjob_msg)
132
133 self.job_info.info.monitoring_links.append(('http://gangamon.cern.ch/ganga/#tid=%s'%self.job_info.info.uuid,'dashboard'))
134
135
136 msg = self.getMessage('submitted')
137 self.send(msg)
138
139 - def start(self, **opts):
140 """Log start event on worker node."""
141 message = self.getMessage('running')
142 self.send(message)
143
145 """Log progress event on worker node. NOP."""
146 pass
147
148 - def stop(self, exitcode, **opts):
149 """Log stop event on worker node."""
150 if exitcode == 0:
151 event = 'finished'
152 else:
153 event = 'failed'
154 message = self.getMessage(event)
155 self.send(message)
156
157
158
160 """ Try to get the hostname in the most possible reliable way as described in the Python
161 LibRef."""
162 import socket
163 try:
164 return socket.gethostbyaddr(socket.gethostname())[0]
165
166
167
168 except:
169 return 'localhost'
170