From 346992a78e5f179ce8cb0ee7d98b7b683428a8ce Mon Sep 17 00:00:00 2001
From: "Bo-Chun Louis Chen(VM)" <louistw@uab.edu>
Date: Sat, 22 Feb 2020 04:45:57 +0000
Subject: [PATCH] Add consume method into RCRMQ class

---
 rc_rmq.py | 42 ++++++++++++++++++++++++++++++++++++++----
 1 file changed, 38 insertions(+), 4 deletions(-)

diff --git a/rc_rmq.py b/rc_rmq.py
index 8871768..8a005de 100644
--- a/rc_rmq.py
+++ b/rc_rmq.py
@@ -12,14 +12,16 @@ class RCRMQ(object):
     VHOST = '/'
     EXCHANGE = ''
     EXCHANGE_TYPE = 'direct'
-    QUEUE = ''
+    QUEUE = None
+    DURABLE = False
+    ROUTING_KEY = None
 
     def __init__(self, config=None):
         if config:
             if 'exchange' in config:
                 self.EXCHANGE = config['exchange']
-            if 'queue' in config:
-                self.QUEUE = config['queue']
+            if 'exchange_type' in config:
+                self.EXCHANGE_TYPE = config['exchange_type']
 
         hostname = socket.gethostname().split(".", 1)[0]
 
@@ -42,11 +44,43 @@ class RCRMQ(object):
                 exchange_type=self.EXCHANGE_TYPE,
                 durable=True)
 
+        if self.QUEUE is not None:
+            self._channel.queue_declare(queue=self.QUEUE, durable=self.DURABLE)
+            self._channel.queue_bind(exchange=self.EXCHANGE,
+                    queue=self.QUEUE,
+                    routing_key=self.ROUTING_KEY)
+
     def disconnect(self):
+        self._channel.close()
         self._connection.close()
 
     def publish_msg(self, obj):
+        if 'routing_key' in obj:
+            self.ROUTING_KEY = obj['routing_key']
+
+        self.connect()
+
+        self._channel.basic_publish(exchange=self.EXCHANGE,
+                routing_key=self.ROUTING_KEY,
+                body=json.dumps(obj['msg']))
+
+        self.disconnect()
+
+    def start_consume(self, obj):
+        if 'queue' in obj:
+            self.QUEUE = obj['queue']
+            self.ROUTING_KEY = obj['routing_key'] if 'routing_key' in obj else self.QUEUE
+        if 'durable' in obj:
+            self.DURABLE = obj['durable']
+
         self.connect()
-        self._channel.basic_publish(exchange=self.EXCHANGE, routing_key=obj['routing_key'], body=json.dumps(obj['msg']))
+
+        self._consumer_tag = self._channel.basic_consume(self.QUEUE,obj['cb'])
+        self._channel.start_consuming()
+
         self.disconnect()
 
+    def stop_consume(self):
+        self._channel.basic_cancel(self._consumer_tag)
+        if not self.DURABLE:
+            self._channel.queue_delete(self.QUEUE)
-- 
GitLab