diff --git a/rc_rmq.py b/rc_rmq.py index 887176890d0b7eb2d4986869ad5f08ca032e12f4..8a005de245c5be68fc721e6f0ecfc327d7e890a2 100644 --- a/rc_rmq.py +++ b/rc_rmq.py @@ -12,14 +12,16 @@ class RCRMQ(object): VHOST = '/' EXCHANGE = '' EXCHANGE_TYPE = 'direct' - QUEUE = '' + QUEUE = None + DURABLE = False + ROUTING_KEY = None def __init__(self, config=None): if config: if 'exchange' in config: self.EXCHANGE = config['exchange'] - if 'queue' in config: - self.QUEUE = config['queue'] + if 'exchange_type' in config: + self.EXCHANGE_TYPE = config['exchange_type'] hostname = socket.gethostname().split(".", 1)[0] @@ -42,11 +44,43 @@ class RCRMQ(object): exchange_type=self.EXCHANGE_TYPE, durable=True) + if self.QUEUE is not None: + self._channel.queue_declare(queue=self.QUEUE, durable=self.DURABLE) + self._channel.queue_bind(exchange=self.EXCHANGE, + queue=self.QUEUE, + routing_key=self.ROUTING_KEY) + def disconnect(self): + self._channel.close() self._connection.close() def publish_msg(self, obj): + if 'routing_key' in obj: + self.ROUTING_KEY = obj['routing_key'] + + self.connect() + + self._channel.basic_publish(exchange=self.EXCHANGE, + routing_key=self.ROUTING_KEY, + body=json.dumps(obj['msg'])) + + self.disconnect() + + def start_consume(self, obj): + if 'queue' in obj: + self.QUEUE = obj['queue'] + self.ROUTING_KEY = obj['routing_key'] if 'routing_key' in obj else self.QUEUE + if 'durable' in obj: + self.DURABLE = obj['durable'] + self.connect() - self._channel.basic_publish(exchange=self.EXCHANGE, routing_key=obj['routing_key'], body=json.dumps(obj['msg'])) + + self._consumer_tag = self._channel.basic_consume(self.QUEUE,obj['cb']) + self._channel.start_consuming() + self.disconnect() + def stop_consume(self): + self._channel.basic_cancel(self._consumer_tag) + if not self.DURABLE: + self._channel.queue_delete(self.QUEUE)