diff --git a/README.md b/README.md index df4209656d1e0616c589282240d77d7c0b40eeb3..5d7b9155b49e6906eaa6451a4660fce3536a5c73 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,10 @@ def callback_function(ch, method, properties, body): # start consume messagre from queue with callback function rc_rmq.start_consume({ 'queue': 'queue_name', + 'routing_key: 'your_key', 'cb': callback_function }) +# don't forget to close connection +rc_rmq.disconnect() ``` diff --git a/create_account.py b/create_account.py new file mode 100755 index 0000000000000000000000000000000000000000..b316cd0bca978d77f27a51ea067a3634c675049b --- /dev/null +++ b/create_account.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python +import sys +import rc_util + +if len(sys.argv) < 2: + print("Usage: {} USERNAME [EMAIL] [FULL_NAME] [REASON]".format(sys.argv[0]), file=sys.stderr) + exit(1) + +domain = 'uab.edu' +user_name = sys.argv[1] +email = sys.argv[2] if len(sys.argv) >= 3 else '' +full_name = sys.argv[3] if len(sys.argv) >= 4 else '' +reason = sys.argv[4] if len(sys.argv) >= 5 else '' + +if email == '': + if '@' in user_name: + email = user_name + else: + email = user_name + '@' + domain + +rc_util.add_account(user_name, email=email, full=full_name, reason=reason) +print("Account requested for user: {}".format(user_name)) + +print("Waiting for confirmation...") +rc_util.consume(user_name) diff --git a/rc_rmq.py b/rc_rmq.py index 42e49137457f7ffdbe6f3156881a1fceca029ad8..37afa3f44f872454a3b96fccf2d5a6a538fe01cf 100644 --- a/rc_rmq.py +++ b/rc_rmq.py @@ -65,15 +65,16 @@ class RCRMQ(object): exchange_type=self.EXCHANGE_TYPE, durable=True) - if self.QUEUE is not None: - self._channel.queue_declare(queue=self.QUEUE, durable=self.DURABLE) - self._channel.queue_bind(exchange=self.EXCHANGE, - queue=self.QUEUE, - routing_key=self.ROUTING_KEY) + def bind_queue(self): + self._channel.queue_declare(queue=self.QUEUE, durable=self.DURABLE) + self._channel.queue_bind(exchange=self.EXCHANGE, + queue=self.QUEUE, + routing_key=self.ROUTING_KEY) def disconnect(self): self._channel.close() self._connection.close() + self._connection = None def delete_queue(self): self._channel.queue_delete(self.QUEUE) @@ -89,9 +90,6 @@ class RCRMQ(object): routing_key=self.ROUTING_KEY, body=json.dumps(obj['msg'])) - if not self._consuming: - self.disconnect() - def start_consume(self, obj): if 'queue' in obj: self.QUEUE = obj['queue'] @@ -105,6 +103,8 @@ class RCRMQ(object): if self._connection is None: self.connect() + self.bind_queue() + self._consumer_tag = self._channel.basic_consume(self.QUEUE,obj['cb']) self._consuming = True try: @@ -112,7 +112,5 @@ class RCRMQ(object): except KeyboardInterrupt: self._channel.stop_consuming() - self.disconnect() - def stop_consume(self): self._channel.basic_cancel(self._consumer_tag) diff --git a/rc_util.py b/rc_util.py index 8b90fce92b86c7e8465d3664c0c1271bdca404db..0e7c4c1e6ec7d0cba4cc7370cf35370efa7f7354 100644 --- a/rc_util.py +++ b/rc_util.py @@ -1,18 +1,23 @@ +import logging +import argparse from rc_rmq import RCRMQ import json rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) tasks = {'ohpc_account': None, 'ood_account': None, 'slurm_account': None} +logger_fmt = '%(asctime)s [%(module)s] - %(message)s' -def add_account(username, full='', reason=''): +def add_account(username, email, full='', reason=''): rc_rmq.publish_msg({ 'routing_key': 'request.' + username, 'msg': { "username": username, + "email": email, "fullname": full, "reason": reason } }) + rc_rmq.disconnect() def worker(ch, method, properties, body): msg = json.loads(body) @@ -27,17 +32,41 @@ def worker(ch, method, properties, body): print("{} is not done yet.".format(key)) done = False if done: - confirm_rmq.stop_consume() - confirm_rmq.delete_queue() + rc_rmq.stop_consume() + rc_rmq.delete_queue() -def consume(username, callback, debug=False): +def consume(username, callback=worker, debug=False): if debug: sleep(5) else: - confirm_rmq.start_consume({ + rc_rmq.start_consume({ 'queue': username, 'routing_key': 'confirm.' + username, 'cb': callback }) + rc_rmq.disconnect() return { 'success' : True } + +def get_args(): + # Parse arguments + parser = argparse.ArgumentParser() + parser.add_argument('-v', '--verbose', action='store_true', help='verbose output') + parser.add_argument('-n', '--dry-run', action='store_true', help='enable dry run mode') + return parser.parse_args() + +def get_logger(args=None): + if args is None: + args = get_args() + + logger_lvl = logging.WARNING + + if args.verbose: + logger_lvl = logging.DEBUG + + if args.dry_run: + logger_lvl = logging.INFO + + logging.basicConfig(format=logger_fmt, level=logger_lvl) + return logging.getLogger(__name__) +