Package Ganga :: Package Lib :: Package MonitoringServices :: Package Dashboard :: Module DashboardMS
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Lib.MonitoringServices.Dashboard.DashboardMS

  1  """Dashboard Monitoring Service plugin. 
  2   
  3  N.B. This code is under development and should not generally be used or relied upon. 
  4   
  5  """ 
  6   
  7   
  8  import sys 
  9   
 10  from Ganga.Lib.MonitoringServices.MSGMS import MSGUtil 
 11  from Ganga.Lib.MonitoringServices.Dashboard import FormatUtil 
 12   
 13   
14 -def _initconfig():
15 """Initialize DashboardMS configuration.""" 16 try: 17 from Ganga.Utility import Config 18 # create configuration 19 config = Config.makeConfig('DashboardMS', 'Settings for Dashboard Messaging Service.') 20 config.addOption('server', 'ganga.msg.cern.ch', 'The MSG server name.') 21 config.addOption('port', 6163, 'The MSG server port.') 22 config.addOption('user', '', '') 23 config.addOption('password', '', '') 24 config.addOption('destination_job_status', '/topic/dashboard.atlas.jobStatus', 'The MSG destination (topic or queue) for job status messages.') 25 config.addOption('destination_job_processing_attributes', '/topic/dashboard.atlas.jobProcessingAttributes', 'The MSG destination (topic or queue) for job processing attributes messages.') 26 config.addOption('destination_job_meta', '/topic/dashboard.atlas.jobMeta', 'The MSG destination (topic or queue) for job meta messages.') 27 config.addOption('destination_task_meta', '/topic/dashboard.atlas.taskMeta', 'The MSG destination (topic or queue) for task meta messages.') 28 config.addOption('task_type', 'analysis', 'The type of task. e.g. analysis, production, hammercloud,...') 29 # prevent modification during the interactive ganga session 30 def deny_modification(name, value): 31 raise Config.ConfigError('Cannot modify [DashboardMS] settings (attempted %s=%s)' % (name, value))
32 config.attachUserHandler(deny_modification, None) 33 except ImportError: 34 # on worker node so Config is not needed since it is copied to DashboardMS constructor 35 pass 36 _initconfig() 37 38 39 # singleton publisher 40 _publisher = None
41 -def _get_publisher(server, port, username, password):
42 """Return the singleton publisher, lazily instantiating it. 43 44 N.B. The configuration enforces that server/port/username/password cannot 45 change so this method can cache a singleton publisher. 46 """ 47 global _publisher 48 if _publisher is None: 49 _publisher = MSGUtil.createPublisher(server, port, username, password) 50 _publisher.start() 51 return _publisher
52 53 54 from Ganga.GPIDev.Adapters import IMonitoringService
55 -class DashboardMS(IMonitoringService.IMonitoringService):
56 """Dashboard Monitoring Service base class. 57 58 Subclasses should override getSandboxModules(), getJobInfo() and the event 59 methods: submit(), start(), progress(), stop(), etc.. Typically, the event 60 methods will use job_info and _send() to send job meta-data via MSG using 61 the WLCG format. 62 """ 63
64 - def __init__(self, job_info, config_info):
65 """Construct the Dashboard Monitoring Service.""" 66 IMonitoringService.IMonitoringService.__init__(self, job_info, config_info) 67 # try to initialize logger or default to None if on worker node. see log() 68 try: 69 import Ganga.Utility.logging 70 self._logger = Ganga.Utility.logging.getLogger() 71 except ImportError: 72 self._logger = None
73
74 - def getConfig():
75 """Return DashboardMS Config object.""" 76 from Ganga.Utility import Config 77 return Config.getConfig('DashboardMS')
78 getConfig = staticmethod(getConfig) 79
80 - def getSandboxModules(self):
88
89 - def _send(self, destination, message):
90 """Send the message to the configured destination.""" 91 # get publisher 92 p = _get_publisher( 93 self.config_info['server'], 94 self.config_info['port'], 95 self.config_info['user'], 96 self.config_info['password'], 97 ) 98 # send message 99 headers = {'persistent':'true'} 100 wlcg_msg = FormatUtil.dictToWlcg(message, include_microseconds=False) 101 p.send(destination, wlcg_msg, headers)
102
103 - def _log(self, level='info', message=''):
104 """Log message to logger on client or stderr on worker node.""" 105 if self._logger and hasattr(self._logger, level): 106 getattr(self._logger, level)(message) 107 else: 108 print >>sys.stderr, '[DashboardMS %s] %s' % (level, message)
109