Skip to content
Snippets Groups Projects
Unverified Commit b8c7d6bd authored by Ravi Tripathi's avatar Ravi Tripathi Committed by GitHub
Browse files

Merge pull request #111 from diedpigs/feat-update-rcrmq

Update rcrmq class
parents 3e80846f 9e2f78c2
No related branches found
No related tags found
No related merge requests found
...@@ -13,9 +13,6 @@ class RCRMQ(object): ...@@ -13,9 +13,6 @@ class RCRMQ(object):
VHOST = "/" VHOST = "/"
EXCHANGE = "" EXCHANGE = ""
EXCHANGE_TYPE = "direct" EXCHANGE_TYPE = "direct"
QUEUE = None
DURABLE = True
ROUTING_KEY = None
DEBUG = False DEBUG = False
def __init__(self, config=None, debug=False): def __init__(self, config=None, debug=False):
...@@ -83,55 +80,64 @@ class RCRMQ(object): ...@@ -83,55 +80,64 @@ class RCRMQ(object):
durable=True, durable=True,
) )
def bind_queue(self): def bind_queue(
self._channel.queue_declare(queue=self.QUEUE, durable=self.DURABLE) self, queue="", routing_key=None, durable=True, exclusive=False
):
if self._connection is None:
self.connect()
result = self._channel.queue_declare(
queue=queue, durable=durable, exclusive=exclusive
)
self._channel.queue_bind( self._channel.queue_bind(
exchange=self.EXCHANGE, exchange=self.EXCHANGE,
queue=self.QUEUE, queue=result.method.queue,
routing_key=self.ROUTING_KEY, routing_key=routing_key,
) )
return result.method.queue
def disconnect(self): def disconnect(self):
self._channel.close() self._channel.close()
self._connection.close() self._connection.close()
self._connection = None self._connection = None
def delete_queue(self): def delete_queue(self, queue):
self._channel.queue_delete(self.QUEUE) self._channel.queue_delete(queue)
def publish_msg(self, obj): def publish_msg(self, obj):
if "routing_key" in obj: routing_key = obj.get("routing_key")
self.ROUTING_KEY = obj["routing_key"] props = obj.get("props")
if self._connection is None: if self._connection is None:
self.connect() self.connect()
self._channel.basic_publish( self._channel.basic_publish(
exchange=self.EXCHANGE, exchange=self.EXCHANGE,
routing_key=self.ROUTING_KEY, routing_key=routing_key,
properties=props,
body=json.dumps(obj["msg"]), body=json.dumps(obj["msg"]),
) )
def start_consume(self, obj): def start_consume(self, obj):
if "queue" in obj: queue = obj.get("queue", "")
self.QUEUE = obj["queue"] routing_key = obj.get("routing_key", queue or None)
self.ROUTING_KEY = ( durable = obj.get("durable", True)
obj["routing_key"] if "routing_key" in obj else self.QUEUE exclusive = obj.get("exclusive", False)
) bind = obj.get("bind", True)
if "durable" in obj:
self.DURABLE = obj["durable"]
if self.DEBUG:
print(
"Queue: " + self.QUEUE + "\nRouting_key: " + self.ROUTING_KEY
)
if self._connection is None: if self._connection is None:
self.connect() self.connect()
self.bind_queue() if bind:
self.bind_queue(queue, routing_key, durable, exclusive)
if self.DEBUG:
print("Queue: " + queue + "\nRouting_key: " + routing_key)
self._consumer_tag = self._channel.basic_consume(self.QUEUE, obj["cb"]) self._consumer_tag = self._channel.basic_consume(queue, obj["cb"])
self._consuming = True self._consuming = True
try: try:
self._channel.start_consuming() self._channel.start_consuming()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment