1 """Utilities for using MSG within Ganga.
2
3 N.B. Take care to minimise dependencies on external packages since this module
4 will very likely be copied to the sandbox on the worker node.
5 """
6
7
8
9 try:
10 import logging
11 import Ganga.Utility.Config as Config
12 config = Config.getConfig('Logging')
13 try:
14
15 config['stomp.py']
16 except Config.ConfigError:
17
18 logging.getLogger('stomp.py').setLevel(logging.CRITICAL)
19
20 config.addOption('stomp.py', 'CRITICAL', 'logger for stomp.py external package')
21 except:
22
23 pass
24
25
26
28 """Return the list of sandbox modules required for MSG monitoring services."""
29 import stomp, stomputil
30 import Ganga.Lib.MonitoringServices.MSGMS
31 return [
32 Ganga,
33 Ganga.Lib,
34 Ganga.Lib.MonitoringServices,
35 Ganga.Lib.MonitoringServices.MSGMS,
36 Ganga.Lib.MonitoringServices.MSGMS.MSGUtil,
37 stomp,
38 stomp.cli,
39 stomp.exception,
40 stomp.listener,
41 stomp.stomp,
42 stomp.utils,
43 stomputil,
44 stomputil.publisher,
45 ]
46
47
48
49 from stomputil.publisher import IDLE_TIMEOUT, EXIT_TIMEOUT
50 -def createPublisher(server, port, user='', password='', idle_timeout=IDLE_TIMEOUT, exit_timeout=EXIT_TIMEOUT):
51 """Create a new publisher thread which extends GangaThread where available
52 (i.e. on the client) or Thread otherwise (i.e. on the worker node).
53
54 N.B. If GangaThread is not available then an exit handler is added, with the
55 given timeout.
56
57 @param server: The server host name.
58 @param user: The user name.
59 @param password: The password.
60 @param logger: The logger instance.
61 @param idle_timeout: Maximum seconds to idle before closing connection.
62 Negative value indicates never close connection.
63 @param exit_timeout: Maximum seconds to clear message queue on exit.
64 Negative value indicates clear queue without timeout.
65
66 Usage::
67 from Ganga.Lib.MonitoringServices.MSG import MSGUtil
68 p = MSGUTIL.createPublisher('ganga.msg.cern.ch', 6163)
69 p.start()
70 p.send('/topic/ganga.dashboard.test', 'Hello World!')
71
72 See also stomputil.publisher
73 """
74
75 try:
76 from Ganga.Core.GangaThread import GangaThread as Thread
77 managed_thread = True
78 except ImportError:
79 from threading import Thread
80 managed_thread = False
81
82 try:
83 import Ganga.Utility.logging
84 logger = Ganga.Utility.logging.getLogger()
85 except ImportError:
86 logger = None
87
88 import stomputil
89 publisher = stomputil.createPublisher(Thread, server, port, user, password, logger, idle_timeout)
90 if managed_thread:
91
92 publisher.setCritical(False)
93 else:
94
95 publisher.addExitHandler(exit_timeout)
96 return publisher
97