Package tlib :: Package base :: Module ActiveMQHelper
[hide private]
[frames] | no frames]

Source Code for Module tlib.base.ActiveMQHelper

  1  import stomp 
  2  from tlib.base.LogHelper import get_logger 
  3  from uuid import uuid4 
4 5 6 -class ActiveMQListener(object):
7 8 _id = uuid4().hex 9
10 - def __init__(self, name, headers=None, **keyword_headers):
11 self.logger = get_logger(name="ActiveMQ listener") 12 self._name = name 13 self._headers = headers if headers else {} 14 self._keyword_headers = keyword_headers 15 16 self.logger.debug("""Initialized ActiveMQListener with this parameters: 17 Name: {name} 18 Id: {id} 19 headers: {headers} 20 keyword_headers: {keyword_headers}""".format(name=name, id=self._id, headers=headers, 21 keyword_headers=keyword_headers))
22
23 - def on_error(self, headers, message):
24 self.logger.error("Method on_error must be implemented by classes receiving messages") 25 raise NotImplementedError("Method on_error must be implemented by classes receiving messages")
26
27 - def on_message(self, headers, message):
28 self.logger.error("Method on_message must be implemented by classes receiving messages") 29 raise NotImplementedError("Method on_message must be implemented by classes receiving messages")
30 31 @property
32 - def name(self):
33 return self._name
34 35 @property
36 - def id(self):
37 return self._id
38 39 @property
40 - def headers(self):
41 return self._headers
42 43 @property
44 - def keyword_headers(self):
45 return self._keyword_headers
46
47 48 -class ActiveMQHelper(object):
49 _logger = None 50
51 - def __init__(self, host, port, default_queue=None):
52 self._logger = get_logger(name="ActiveMQ helper") 53 self._default_queue = default_queue 54 self._conn = stomp.Connection([(host, int(port))]) 55 56 self._logger.debug("Will connect to ActiveMQ on {host}:{port}".format(host=host, port=port)) 57 if default_queue: 58 self._logger.debug("Using '{queue} as default".format(queue=default_queue))
59
60 - def connect(self):
61 """ 62 Get configuration and connect to ActiveMQ 63 """ 64 if not self._conn.is_connected(): 65 self._logger.debug("Connecting to ActiveMQ") 66 self._conn.start() 67 self._conn.connect() 68 else: 69 self._logger.warn("Already connected to ActiveMQ")
70
71 - def disconnect(self):
72 """ 73 If there is a connection with ActiveMQ, close it. 74 """ 75 if self._conn.is_connected(): 76 self._logger.debug("Disconnecting from ActiveMQ") 77 # self._conn.stop() 78 self._conn.disconnect() 79 else: 80 self._logger.warn("Already disconnected from ActiveMQ")
81
82 - def subscribe(self, listener, queue):
83 """ 84 Adds a listener with the specified name 85 86 :param listener: Listener that will receive messages 87 :type listener: ActiveMQListener 88 :param queue: Queue to subscribe to 89 :type queue: str 90 :raises: ValueError 91 """ 92 #Input validation 93 if not listener: 94 self._logger.error("Listener was not provided") 95 raise ValueError("Listener was not provided") 96 97 if self._conn.get_listener(listener.name): 98 self._logger.error("Listener name must be unique: %s" % listener.name) 99 raise RuntimeError("Listener name must be unique: %s" % listener.name) 100 101 #We need to be connected to subscribe 102 self.connect() 103 104 self._logger.debug("Registering listener '{name}'".format(name=listener.name)) 105 self._conn.set_listener(listener.name, listener) 106 self._conn.subscribe(destination=queue, id=listener.id, headers=listener.headers, **listener.keyword_headers)
107
108 - def unsubscribe(self, listener):
109 """ 110 Removes a listener 111 112 :param listener: Listener that will receive messages 113 :type listener: ActiveMQListener 114 """ 115 #Input validation 116 if not listener: 117 self._logger.error("Listener was not provided") 118 ValueError("Listener was not provided") 119 120 if self._conn.is_connected(): 121 self._logger.debug("Unsubscribing listener '{name}'".format(name=listener.name)) 122 if self._conn.get_listener(listener.name): 123 self._conn.unsubscribe(listener.id, headers=listener.headers, **listener.keyword_headers) 124 self._conn.remove_listener(listener.name) 125 else: 126 self._logger.error("Listener with name {name} is not registered".format(name=listener.name)) 127 raise RuntimeError("Listener with name {name} is not registered".format(name=listener.name)) 128 else: 129 self._logger.warn("Not connected to ActiveMQ. Can't unsubscribing")
130
131 - def send_message(self, message, queue=None):
132 """ 133 Send message to ActiveMQ. 134 Id queue is not provided, it uses default queue 135 136 :param: message: queue body 137 :param: queue_destination: queue name 138 """ 139 #We need to be connected to send messages 140 self.connect() 141 142 if queue: 143 self._logger.debug("Sending message to queue '{queue}'".format(queue=queue)) 144 self._conn.send(body=message, destination=queue) 145 elif self._default_queue: 146 self._logger.debug("Sending message to queue '{queue}'".format(queue=self._default_queue)) 147 self._conn.send(body=message, destination=self._default_queue) 148 else: 149 self._logger.error("Queue was not provided and there is no default queue") 150 raise RuntimeError("Queue was not provided and there is no default queue")
151