Skip to content
Snippets Groups Projects
Commit 29ad8ead authored by Eesaan Atluri's avatar Eesaan Atluri
Browse files

Merge branch 'feat-redesign-mq' into feat-subscribe-mail-lists

parents ed4d9510 495e522a
No related branches found
No related tags found
6 merge requests!147Merge previous default branch feat-cod-rmq into main,!85kill nginx process running under user from login node,!51Fix acct create wait,!39WIP:Feat cod rmq,!38WIP: Feat cod rmq,!30Feat subscribe mail lists
...@@ -41,7 +41,10 @@ def callback_function(ch, method, properties, body): ...@@ -41,7 +41,10 @@ def callback_function(ch, method, properties, body):
# start consume messagre from queue with callback function # start consume messagre from queue with callback function
rc_rmq.start_consume({ rc_rmq.start_consume({
'queue': 'queue_name', 'queue': 'queue_name',
'routing_key: 'your_key',
'cb': callback_function 'cb': callback_function
}) })
# don't forget to close connection
rc_rmq.disconnect()
``` ```
#!/usr/bin/env python
import sys
import rc_util
if len(sys.argv) < 2:
print("Usage: {} USERNAME [FULL_NAME] [REASON]".format(sys.argv[0]), file=sys.stderr)
exit(1)
user_name = sys.argv[1]
full_name = sys.argv[2] if len(sys.argv) >= 3 else ''
reason = sys.argv[3] if len(sys.argv) >= 4 else ''
rc_util.add_account(user_name, full=full_name, reason=reason)
print("Account requested for user: {}".format(user_name))
print("Waiting for confirmation...")
rc_util.consume(user_name)
...@@ -65,15 +65,16 @@ class RCRMQ(object): ...@@ -65,15 +65,16 @@ class RCRMQ(object):
exchange_type=self.EXCHANGE_TYPE, exchange_type=self.EXCHANGE_TYPE,
durable=True) durable=True)
if self.QUEUE is not None: def bind_queue(self):
self._channel.queue_declare(queue=self.QUEUE, durable=self.DURABLE) self._channel.queue_declare(queue=self.QUEUE, durable=self.DURABLE)
self._channel.queue_bind(exchange=self.EXCHANGE, self._channel.queue_bind(exchange=self.EXCHANGE,
queue=self.QUEUE, queue=self.QUEUE,
routing_key=self.ROUTING_KEY) routing_key=self.ROUTING_KEY)
def disconnect(self): def disconnect(self):
self._channel.close() self._channel.close()
self._connection.close() self._connection.close()
self._connection = None
def delete_queue(self): def delete_queue(self):
self._channel.queue_delete(self.QUEUE) self._channel.queue_delete(self.QUEUE)
...@@ -89,9 +90,6 @@ class RCRMQ(object): ...@@ -89,9 +90,6 @@ class RCRMQ(object):
routing_key=self.ROUTING_KEY, routing_key=self.ROUTING_KEY,
body=json.dumps(obj['msg'])) body=json.dumps(obj['msg']))
if not self._consuming:
self.disconnect()
def start_consume(self, obj): def start_consume(self, obj):
if 'queue' in obj: if 'queue' in obj:
self.QUEUE = obj['queue'] self.QUEUE = obj['queue']
...@@ -105,6 +103,8 @@ class RCRMQ(object): ...@@ -105,6 +103,8 @@ class RCRMQ(object):
if self._connection is None: if self._connection is None:
self.connect() self.connect()
self.bind_queue()
self._consumer_tag = self._channel.basic_consume(self.QUEUE,obj['cb']) self._consumer_tag = self._channel.basic_consume(self.QUEUE,obj['cb'])
self._consuming = True self._consuming = True
try: try:
...@@ -112,7 +112,5 @@ class RCRMQ(object): ...@@ -112,7 +112,5 @@ class RCRMQ(object):
except KeyboardInterrupt: except KeyboardInterrupt:
self._channel.stop_consuming() self._channel.stop_consuming()
self.disconnect()
def stop_consume(self): def stop_consume(self):
self._channel.basic_cancel(self._consumer_tag) self._channel.basic_cancel(self._consumer_tag)
...@@ -13,6 +13,7 @@ def add_account(username, full='', reason=''): ...@@ -13,6 +13,7 @@ def add_account(username, full='', reason=''):
"reason": reason "reason": reason
} }
}) })
rc_rmq.disconnect()
def worker(ch, method, properties, body): def worker(ch, method, properties, body):
msg = json.loads(body) msg = json.loads(body)
...@@ -27,17 +28,18 @@ def worker(ch, method, properties, body): ...@@ -27,17 +28,18 @@ def worker(ch, method, properties, body):
print("{} is not done yet.".format(key)) print("{} is not done yet.".format(key))
done = False done = False
if done: if done:
confirm_rmq.stop_consume() rc_rmq.stop_consume()
confirm_rmq.delete_queue() rc_rmq.delete_queue()
def consume(username, callback, debug=False): def consume(username, callback=worker, debug=False):
if debug: if debug:
sleep(5) sleep(5)
else: else:
confirm_rmq.start_consume({ rc_rmq.start_consume({
'queue': username, 'queue': username,
'routing_key': 'confirm.' + username, 'routing_key': 'confirm.' + username,
'cb': callback 'cb': callback
}) })
rc_rmq.disconnect()
return { 'success' : True } return { 'success' : True }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment