1 """Dashboard Monitoring Service plugin.
2
3 N.B. This code is under development and should not generally be used or relied upon.
4
5 """
6
7
8 import sys
9
10 from Ganga.Lib.MonitoringServices.MSGMS import MSGUtil
11 from Ganga.Lib.MonitoringServices.Dashboard import FormatUtil
12
13
15 """Initialize DashboardMS configuration."""
16 try:
17 from Ganga.Utility import Config
18
19 config = Config.makeConfig('DashboardMS', 'Settings for Dashboard Messaging Service.')
20 config.addOption('server', 'ganga.msg.cern.ch', 'The MSG server name.')
21 config.addOption('port', 6163, 'The MSG server port.')
22 config.addOption('user', '', '')
23 config.addOption('password', '', '')
24 config.addOption('destination_job_status', '/topic/dashboard.atlas.jobStatus', 'The MSG destination (topic or queue) for job status messages.')
25 config.addOption('destination_job_processing_attributes', '/topic/dashboard.atlas.jobProcessingAttributes', 'The MSG destination (topic or queue) for job processing attributes messages.')
26 config.addOption('destination_job_meta', '/topic/dashboard.atlas.jobMeta', 'The MSG destination (topic or queue) for job meta messages.')
27 config.addOption('destination_task_meta', '/topic/dashboard.atlas.taskMeta', 'The MSG destination (topic or queue) for task meta messages.')
28 config.addOption('task_type', 'analysis', 'The type of task. e.g. analysis, production, hammercloud,...')
29
30 def deny_modification(name, value):
31 raise Config.ConfigError('Cannot modify [DashboardMS] settings (attempted %s=%s)' % (name, value))
32 config.attachUserHandler(deny_modification, None)
33 except ImportError:
34
35 pass
36 _initconfig()
37
38
39
40 _publisher = None
52
53
54 from Ganga.GPIDev.Adapters import IMonitoringService
55 -class DashboardMS(IMonitoringService.IMonitoringService):
56 """Dashboard Monitoring Service base class.
57
58 Subclasses should override getSandboxModules(), getJobInfo() and the event
59 methods: submit(), start(), progress(), stop(), etc.. Typically, the event
60 methods will use job_info and _send() to send job meta-data via MSG using
61 the WLCG format.
62 """
63
64 - def __init__(self, job_info, config_info):
73
78 getConfig = staticmethod(getConfig)
79
88
89 - def _send(self, destination, message):
90 """Send the message to the configured destination."""
91
92 p = _get_publisher(
93 self.config_info['server'],
94 self.config_info['port'],
95 self.config_info['user'],
96 self.config_info['password'],
97 )
98
99 headers = {'persistent':'true'}
100 wlcg_msg = FormatUtil.dictToWlcg(message, include_microseconds=False)
101 p.send(destination, wlcg_msg, headers)
102
103 - def _log(self, level='info', message=''):
104 """Log message to logger on client or stderr on worker node."""
105 if self._logger and hasattr(self._logger, level):
106 getattr(self._logger, level)(message)
107 else:
108 print >>sys.stderr, '[DashboardMS %s] %s' % (level, message)
109