Package Ganga :: Package Lib :: Package MonitoringServices :: Package MSGMS :: Module MSGMS
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Lib.MonitoringServices.MSGMS.MSGMS

  1  """GangaMon Monitoring Service plugin.""" 
  2   
  3  from Ganga.Lib.MonitoringServices.MSGMS import MSGUtil 
  4   
  5   
6 -def _initconfig():
7 """Initialize MSGMS configuration.""" 8 try: 9 from Ganga.Utility import Config 10 # create configuration 11 config = Config.makeConfig('MSGMS','Settings for the MSGMS monitoring plugin. Cannot be changed ruding the interactive Ganga session.') 12 config.addOption('server', 'ganga.msg.cern.ch', 'The server to connect to') 13 config.addOption('port', 6163, 'The port to connect to') 14 config.addOption('username', '', '') 15 config.addOption('password', '', '') 16 config.addOption('message_destination', '/queue/ganga.status', '') 17 config.addOption('usage_message_destination',"/queue/ganga.usage",'') 18 config.addOption('job_submission_message_destination',"/queue/ganga.jobsubmission",'') 19 20 # prevent modification during the interactive ganga session 21 def deny_modification(name,x): 22 raise Config.ConfigError('Cannot modify [MSGMS] settings (attempted %s=%s)' % (name, x))
23 config.attachUserHandler(deny_modification, None) 24 except ImportError: 25 # on worker node so Config is not needed since it is copied to MSGMS constructor 26 pass 27 _initconfig() 28 29 30 # singleton publisher 31 _publisher = None
32 -def _get_publisher(server, port, username, password):
33 #FIXME: this assumes server/port/username/password cannot change and caches a singleton publisher 34 global _publisher 35 if _publisher is None: 36 _publisher = MSGUtil.createPublisher(server,port,username,password) 37 _publisher.start() 38 return _publisher
39 40 41 from Ganga.GPIDev.Adapters.IMonitoringService import IMonitoringService
42 -class MSGMS(IMonitoringService):
43 """GangaMon Monitoring Service based on MSG. 44 45 Publishes job meta-data to an MSG destination on submit, start and stop events 46 for consumption by a GangaMon subscriber. 47 48 See IMonitoringService for implementation details. 49 """ 50
51 - def __init__(self, job_info, config_info):
52 """Construct the GangaMon monitoring service.""" 53 IMonitoringService.__init__(self, job_info, config_info)
54
55 - def getConfig():
56 """Return MSGMS Config object.""" 57 from Ganga.Utility import Config 58 return Config.getConfig('MSGMS')
59 getConfig = staticmethod(getConfig) 60
61 - def getSandboxModules(self):
62 """Return list of MSGMS module dependencies.""" 63 import Ganga.Lib.MonitoringServices.MSGMS 64 return IMonitoringService.getSandboxModules(self) + [ 65 Ganga.Lib.MonitoringServices.MSGMS, 66 Ganga.Lib.MonitoringServices.MSGMS.MSGMS, 67 ] + MSGUtil.getSandboxModules()
68
69 - def getJobInfo(self): # called on client, so job_info is Job object
70 """Create job_info from Job object.""" 71 if self.job_info.master is None: # no master job; this job is not splitjob 72 ganga_job_id = str(self.job_info.id) 73 ganga_job_uuid = self.job_info.info.uuid 74 ganga_master_uuid = 0 75 else: # there is a master job; we are in a subjob 76 ganga_job_id = str(self.job_info.master.id) + '.' + str(self.job_info.id) 77 ganga_job_uuid = self.job_info.info.uuid 78 ganga_master_uuid = self.job_info.master.info.uuid 79 from Ganga.Utility import Config 80 return { 'ganga_job_uuid' : ganga_job_uuid 81 , 'ganga_master_uuid' : ganga_master_uuid 82 , 'ganga_user_repository' : Config.getConfig('Configuration')['user'] 83 + '@' + Config.getConfig('System')['GANGA_HOSTNAME'] 84 + ':' + Config.getConfig('Configuration')['gangadir'] 85 , 'ganga_job_id' : ganga_job_id 86 , 'subjobs' : len(self.job_info.subjobs) 87 , 'backend' : self.job_info.backend.__class__.__name__ 88 , 'application' : self.job_info.application.__class__.__name__ 89 , 'job_name' : self.job_info.name 90 , 'hostname' : '' # place-holder updated in getMessage 91 , 'event' : '' # place-holder updated in getMessage 92 }
93
94 - def getMessage(self, event):
95 """Create message from job_info adding hostname and event.""" 96 import types 97 if isinstance(self.job_info, types.DictType): 98 # on worker node so just copy job_info 99 message = self.job_info.copy() 100 else: 101 # on client so just create job_info 102 message = self.getJobInfo() 103 message['hostname'] = hostname() 104 message['event'] = event 105 return message
106
107 - def send(self, message):
108 """Send the message to the configured destination.""" 109 # get publisher 110 p = _get_publisher( 111 self.config_info['server'], 112 self.config_info['port'], 113 self.config_info['username'], 114 self.config_info['password'], 115 ) 116 # send message 117 headers = {'persistent':'true'} 118 p.send(self.config_info['message_destination'], repr(message), headers)
119
120 - def submit(self, **opts): # called on client, so job_info is Job object
121 """Log submit event on client.""" 122 # if this job has a master and it is the first subjob then sent submitted for master job 123 if self.job_info.master is not None: 124 if self.job_info.id == 0: 125 masterjob_msg = self.getMessage('submitted') 126 masterjob_msg['subjobs'] = len(self.job_info.master.subjobs) 127 masterjob_msg['ganga_job_id'] = str(masterjob_msg['ganga_job_id']).split('.')[0] 128 # override ganga_job_uuid as the message 'from the master' is really sent from the subjob 129 masterjob_msg['ganga_job_uuid'] = masterjob_msg['ganga_master_uuid'] 130 masterjob_msg['ganga_master_uuid'] = 0 131 self.send(masterjob_msg) 132 133 self.job_info.info.monitoring_links.append(('http://gangamon.cern.ch/ganga/#tid=%s'%self.job_info.info.uuid,'dashboard')) 134 135 # send submitted for this job 136 msg = self.getMessage('submitted') 137 self.send(msg) 138
139 - def start(self, **opts): # called on worker node, so job_info is dictionary
140 """Log start event on worker node.""" 141 message = self.getMessage('running') 142 self.send(message) 143
144 - def progress(self, **opts): # called on worker node, so job_info is dictionary
145 """Log progress event on worker node. NOP.""" 146 pass 147
148 - def stop(self, exitcode, **opts): # called on worker node, so job_info is dictionary
149 """Log stop event on worker node.""" 150 if exitcode == 0: 151 event = 'finished' 152 else: 153 event = 'failed' 154 message = self.getMessage(event) 155 self.send(message) 156 157 158 # utility method copied from Ganga.Utility.util
159 -def hostname():
160 """ Try to get the hostname in the most possible reliable way as described in the Python 161 LibRef.""" 162 import socket 163 try: 164 return socket.gethostbyaddr(socket.gethostname())[0] 165 # [bugfix #20333]: 166 # while working offline and with an improper /etc/hosts configuration 167 # the localhost cannot be resolved 168 except: 169 return 'localhost'
170