1 import stomp
2 from tlib.base.LogHelper import get_logger
3 from uuid import uuid4
7
8 _id = uuid4().hex
9
10 - def __init__(self, name, headers=None, **keyword_headers):
22
24 self.logger.error("Method on_error must be implemented by classes receiving messages")
25 raise NotImplementedError("Method on_error must be implemented by classes receiving messages")
26
28 self.logger.error("Method on_message must be implemented by classes receiving messages")
29 raise NotImplementedError("Method on_message must be implemented by classes receiving messages")
30
31 @property
34
35 @property
38
39 @property
42
43 @property
45 return self._keyword_headers
46
49 _logger = None
50
51 - def __init__(self, host, port, default_queue=None):
52 self._logger = get_logger(name="ActiveMQ helper")
53 self._default_queue = default_queue
54 self._conn = stomp.Connection([(host, int(port))])
55
56 self._logger.debug("Will connect to ActiveMQ on {host}:{port}".format(host=host, port=port))
57 if default_queue:
58 self._logger.debug("Using '{queue} as default".format(queue=default_queue))
59
61 """
62 Get configuration and connect to ActiveMQ
63 """
64 if not self._conn.is_connected():
65 self._logger.debug("Connecting to ActiveMQ")
66 self._conn.start()
67 self._conn.connect()
68 else:
69 self._logger.warn("Already connected to ActiveMQ")
70
72 """
73 If there is a connection with ActiveMQ, close it.
74 """
75 if self._conn.is_connected():
76 self._logger.debug("Disconnecting from ActiveMQ")
77
78 self._conn.disconnect()
79 else:
80 self._logger.warn("Already disconnected from ActiveMQ")
81
83 """
84 Adds a listener with the specified name
85
86 :param listener: Listener that will receive messages
87 :type listener: ActiveMQListener
88 :param queue: Queue to subscribe to
89 :type queue: str
90 :raises: ValueError
91 """
92
93 if not listener:
94 self._logger.error("Listener was not provided")
95 raise ValueError("Listener was not provided")
96
97 if self._conn.get_listener(listener.name):
98 self._logger.error("Listener name must be unique: %s" % listener.name)
99 raise RuntimeError("Listener name must be unique: %s" % listener.name)
100
101
102 self.connect()
103
104 self._logger.debug("Registering listener '{name}'".format(name=listener.name))
105 self._conn.set_listener(listener.name, listener)
106 self._conn.subscribe(destination=queue, id=listener.id, headers=listener.headers, **listener.keyword_headers)
107
109 """
110 Removes a listener
111
112 :param listener: Listener that will receive messages
113 :type listener: ActiveMQListener
114 """
115
116 if not listener:
117 self._logger.error("Listener was not provided")
118 ValueError("Listener was not provided")
119
120 if self._conn.is_connected():
121 self._logger.debug("Unsubscribing listener '{name}'".format(name=listener.name))
122 if self._conn.get_listener(listener.name):
123 self._conn.unsubscribe(listener.id, headers=listener.headers, **listener.keyword_headers)
124 self._conn.remove_listener(listener.name)
125 else:
126 self._logger.error("Listener with name {name} is not registered".format(name=listener.name))
127 raise RuntimeError("Listener with name {name} is not registered".format(name=listener.name))
128 else:
129 self._logger.warn("Not connected to ActiveMQ. Can't unsubscribing")
130
132 """
133 Send message to ActiveMQ.
134 Id queue is not provided, it uses default queue
135
136 :param: message: queue body
137 :param: queue_destination: queue name
138 """
139
140 self.connect()
141
142 if queue:
143 self._logger.debug("Sending message to queue '{queue}'".format(queue=queue))
144 self._conn.send(body=message, destination=queue)
145 elif self._default_queue:
146 self._logger.debug("Sending message to queue '{queue}'".format(queue=self._default_queue))
147 self._conn.send(body=message, destination=self._default_queue)
148 else:
149 self._logger.error("Queue was not provided and there is no default queue")
150 raise RuntimeError("Queue was not provided and there is no default queue")
151