diff --git a/create_account.py b/create_account.py index d46906ff10aa105ffa2688a4406881ea85692840..99f6d590a8d75f61e4e32df91e089d7244a23ff7 100755 --- a/create_account.py +++ b/create_account.py @@ -17,7 +17,9 @@ args = parser.parse_args() timeout = 60 -if args.email == '': +queuename = rc_util.encode_name(args.username) + +if args.email == "": args.email = args.username if '@' not in args.email: args.email = args.username + '@' + args.domain @@ -43,12 +45,20 @@ def callback(channel, method, properties, body): rc_util.rc_rmq.delete_queue() -rc_util.add_account(args.username, email=args.email, full=args.full_name, reason=args.reason) -print(f'Account for {args.username} requested.') +rc_util.add_account( + args.username, + queuename=queuename, + email=args.email, + full=args.full_name, + reason=args.reason, +) +print(f"Account for {args.username} requested.") # Set initial timeout timer signal.signal(signal.SIGALRM, timeout_handler) signal.setitimer(signal.ITIMER_REAL, timeout) -print('Waiting for completion...') -rc_util.consume(args.username, routing_key=f'complete.{args.username}', callback=callback) +print("Waiting for completion...") +rc_util.consume( + queuename, routing_key=f"complete.{queuename}", callback=callback +) diff --git a/prod_rmq_agents/dir_verify.py b/prod_rmq_agents/dir_verify.py index 156db1cff7f62ac3dea884614f37d716b6846acf..108ad85001270876bb453ec40e47c15a5d049fef 100644 --- a/prod_rmq_agents/dir_verify.py +++ b/prod_rmq_agents/dir_verify.py @@ -55,10 +55,9 @@ def dir_verify(ch, method, properties, body): logger.error('', exc_info=True) # send confirm message - rc_rmq.publish_msg({ - 'routing_key': 'confirm.' + username, - 'msg': msg - }) + rc_rmq.publish_msg( + {"routing_key": "confirm." + msg["queuename"], "msg": msg} + ) logger.debug(f'User {username} confirmation sent') diff --git a/prod_rmq_agents/get-next-uid-gid.py b/prod_rmq_agents/get-next-uid-gid.py index a59255d9f2e147b467cb329c997d8102e26865d1..9a1e2f82edfefbea7ea9a3c00a63ff6f2edeb014 100644 --- a/prod_rmq_agents/get-next-uid-gid.py +++ b/prod_rmq_agents/get-next-uid-gid.py @@ -82,12 +82,12 @@ def resolve_uid_gid(ch, method, properties, body): ch.basic_ack(delivery_tag=method.delivery_tag) # Send confirm message - logger.debug('rc_rmq.publish_msg()') - rc_rmq.publish_msg({ - 'routing_key': 'confirm.' + username, - 'msg': msg - }) - logger.info('confirmation sent') + logger.debug("rc_rmq.publish_msg()") + rc_rmq.publish_msg( + {"routing_key": "confirm." + msg["queuename"], "msg": msg} + ) + logger.info("confirmation sent") + logger.info("Start listening to queue: {}".format(task)) rc_rmq.start_consume({ diff --git a/prod_rmq_agents/git_commit.py b/prod_rmq_agents/git_commit.py index 5c0de34d8582a97750228df0e37bb034e12955c7..81a0e2b30c647d5aff72073991226036b797e7d1 100644 --- a/prod_rmq_agents/git_commit.py +++ b/prod_rmq_agents/git_commit.py @@ -80,12 +80,11 @@ def git_commit(ch, method, properties, body): logger.error('', exc_info=True) # Send confirm message - logger.debug('rc_rmq.publish_msge()') - rc_rmq.publish_msg({ - 'routing_key': 'confirm.' + username, - 'msg': msg - }) - logger.info('confirmation sent') + logger.debug("rc_rmq.publish_msge()") + rc_rmq.publish_msg( + {"routing_key": "confirm." + msg["queuename"], "msg": msg} + ) + logger.info("confirmation sent") # Acknowledge message logger.debug('ch.basic_ack()') diff --git a/prod_rmq_agents/notify_user.py b/prod_rmq_agents/notify_user.py index 431692b19a5b9d307c38e21c5e99adc1bb9210e6..b378dea929da4b55f48507318cb608d9fbc1ddee 100644 --- a/prod_rmq_agents/notify_user.py +++ b/prod_rmq_agents/notify_user.py @@ -81,10 +81,9 @@ def notify_user(ch, method, properties, body): msg['errmsg'] = errmsg if errmsg else 'Unexpected error' # Send confirm message - rc_rmq.publish_msg({ - 'routing_key': 'confirm.' + username, - 'msg': msg - }) + rc_rmq.publish_msg( + {"routing_key": "confirm." + msg["queuename"], "msg": msg} + ) logger.debug(f'User {username} confirmation sent') diff --git a/prod_rmq_agents/subscribe_mail_lists.py b/prod_rmq_agents/subscribe_mail_lists.py index 236d0b5b9e6d64b0c0e4732b616796fd85718d8f..8d9a91aba8fd5f4a1d8cbf4cc10b1c0e2cdd12c9 100644 --- a/prod_rmq_agents/subscribe_mail_lists.py +++ b/prod_rmq_agents/subscribe_mail_lists.py @@ -66,12 +66,12 @@ def mail_list_subscription(ch, method, properties, body): ch.basic_ack(delivery_tag=method.delivery_tag) # send confirm message - logger.debug('rc_rmq.publish_msg()') - rc_rmq.publish_msg({ - 'routing_key': 'confirm.' + username, - 'msg': msg - }) - logger.info('confirmation sent') + logger.debug("rc_rmq.publish_msg()") + rc_rmq.publish_msg( + {"routing_key": "confirm." + msg["queuename"], "msg": msg} + ) + logger.info("confirmation sent") + logger.info("Start listening to queue: {}".format(task)) rc_rmq.start_consume({ diff --git a/prod_rmq_agents/task_manager.py b/prod_rmq_agents/task_manager.py index a65db78ac76c190b3bc30465c202548cb4fbd9eb..135c714474078bb5a436b87d696a64ba4aeccc54 100644 --- a/prod_rmq_agents/task_manager.py +++ b/prod_rmq_agents/task_manager.py @@ -99,22 +99,25 @@ def insert_db(username, msg): if not record: # SQL insert - table.insert({ - 'username': username, - 'uid': msg.get('uid', -1), - 'gid': msg.get('gid', -1), - 'email': msg.get('email', ''), - 'reason': msg.get('reason', ''), - 'fullname': msg.get('fullname', ''), - 'create_account': None, - 'git_commit': None, - 'dir_verify': None, - 'subscribe_mail_list': None, - 'notify_user': None, - 'sent': None, - 'reported': False, - 'last_update': datetime.now() - }) + table.insert( + { + "username": username, + "uid": msg.get("uid", -1), + "gid": msg.get("gid", -1), + "email": msg.get("email", ""), + "reason": msg.get("reason", ""), + "fullname": msg.get("fullname", ""), + "create_account": None, + "git_commit": None, + "dir_verify": None, + "subscribe_mail_list": None, + "notify_user": None, + "sent": None, + "reported": False, + "last_update": datetime.now(), + "queuename": msg.get("queuename", ""), + } + ) def update_db(username, data): @@ -124,9 +127,10 @@ def update_db(username, data): def task_manager(ch, method, properties, body): msg = json.loads(body) - username = method.routing_key.split('.')[1] - task_name = msg['task'] - success = msg['success'] + queuename = method.routing_key.split(".")[1] + username = msg["username"] + task_name = msg["task"] + success = msg["success"] send = completed = terminated = False routing_key = "" @@ -137,12 +141,17 @@ def task_manager(ch, method, properties, body): user_db = table.find_one(username=username) current = tracking[username] = copy.deepcopy(record) - current['errmsg'] = [] - current['uid'] = user_db['uid'] if user_db else msg['uid'] - current['gid'] = user_db['gid'] if user_db else msg['gid'] - current['email'] = user_db['email'] if user_db else msg['email'] - current['reason'] = user_db['reason'] if user_db else msg['reason'] - current['fullname'] = user_db['fullname'] if user_db else msg['fullname'] + current["errmsg"] = [] + current["queuename"] = ( + user_db["queuename"] if user_db else msg["queuename"] + ) + current["uid"] = user_db["uid"] if user_db else msg["uid"] + current["gid"] = user_db["gid"] if user_db else msg["gid"] + current["email"] = user_db["email"] if user_db else msg["email"] + current["reason"] = user_db["reason"] if user_db else msg["reason"] + current["fullname"] = ( + user_db["fullname"] if user_db else msg["fullname"] + ) if user_db: # Restore task status @@ -184,34 +193,35 @@ def task_manager(ch, method, properties, body): # Define message that's going to be published message = { - 'username': username, - 'uid': current['uid'], - 'gid': current['gid'], - 'email': current['email'], - 'reason': current['reason'], - 'fullname': current['fullname'] + "username": username, + "queuename": queuename, + "uid": current["uid"], + "gid": current["gid"], + "email": current["email"], + "reason": current["reason"], + "fullname": current["fullname"], } try: - if task_name in current['request']: - current['request'][task_name] = success - routing_key = 'verify.' + username + if task_name in current["request"]: + current["request"][task_name] = success + routing_key = "verify." + queuename # Terminate the process if failed if not success: terminated = True - routing_key = 'complete.' + username - message['success'] = False - message['errmsg'] = current['errmsg'] + routing_key = "complete." + queuename + message["success"] = False + message["errmsg"] = current["errmsg"] send = True current['waiting'] = {'git_commit', 'dir_verify', 'subscribe_mail_list'} logger.debug(f'Request level {task_name}? {success}') - elif task_name in current['verify']: - current['verify'][task_name] = success - current['waiting'].discard(task_name) - routing_key = 'notify.' + username + elif task_name in current["verify"]: + current["verify"][task_name] = success + current["waiting"].discard(task_name) + routing_key = "notify." + queuename if not current['waiting']: send = True @@ -220,18 +230,18 @@ def task_manager(ch, method, properties, body): # Terminate if dir_verify failed and all agents has responsed if send and not current['verify']['dir_verify']: terminated = True - routing_key = 'complete.' + username - message['success'] = False - message['errmsg'] = current['errmsg'] + routing_key = "complete." + queuename + message["success"] = False + message["errmsg"] = current["errmsg"] logger.debug(f'Verify level {task_name}? {success}') - elif task_name in current['notify']: - current['notify'][task_name] = success - current['waiting'].discard(task_name) - routing_key = 'complete.' + username - message['success'] = success - message['errmsg'] = current['errmsg'] + elif task_name in current["notify"]: + current["notify"][task_name] = success + current["waiting"].discard(task_name) + routing_key = "complete." + queuename + message["success"] = success + message["errmsg"] = current["errmsg"] send = True diff --git a/rc_util.py b/rc_util.py index e16fe306583750d200fb56de6ff9408ea55a70e7..74d8b7e57888f3e79c95303aa3e6e63ee37dd0bb 100644 --- a/rc_util.py +++ b/rc_util.py @@ -2,22 +2,26 @@ import logging import argparse from rc_rmq import RCRMQ import json +from urllib.parse import quote rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) tasks = {'create_account': None, 'git_commit': None, 'dir_verify': None, 'subscribe_mail_list': None, 'notify_user': None} logger_fmt = '%(asctime)s [%(module)s] - %(message)s' -def add_account(username, email, full='', reason=''): - rc_rmq.publish_msg({ - 'routing_key': 'request.' + username, - 'msg': { +def add_account(username, queuename, email, full="", reason=""): + rc_rmq.publish_msg( + { + "routing_key": "request." + queuename, + "msg": { "username": username, "email": email, "fullname": full, - "reason": reason + "reason": reason, + "queuename": queuename, + }, } - }) - rc_rmq.disconnect() + ) + rc_rmq.disconnect() def worker(ch, method, properties, body): msg = json.loads(body) @@ -34,18 +38,20 @@ def worker(ch, method, properties, body): rc_rmq.stop_consume() rc_rmq.delete_queue() -def consume(username, routing_key='', callback=worker, debug=False): - if routing_key == '': - routing_key = 'complete.' + username +def consume(queuename, routing_key="", callback=worker, debug=False): + if routing_key == "": + routing_key = "complete." + queuename if debug: sleep(5) else: - rc_rmq.start_consume({ - 'queue': username, - 'routing_key': routing_key, - 'cb': callback - }) + rc_rmq.start_consume( + { + "queue": queuename, + "routing_key": routing_key, + "cb": callback, + } + ) rc_rmq.disconnect() return { 'success' : True } @@ -72,3 +78,9 @@ def get_logger(args=None): logging.basicConfig(format=logger_fmt, level=logger_lvl) return logging.getLogger(__name__) + +def encode_name(uname): + uname_quote = quote(uname) + if "." in uname_quote: + uname_quote = uname_quote.replace(".", "%2E") + return uname_quote