From 08141d636e5167d95e3e6b1e24493af992b47a09 Mon Sep 17 00:00:00 2001 From: atlurie <atlurie@uab.edu> Date: Mon, 18 Apr 2022 22:09:23 -0500 Subject: [PATCH] move acctmgr agent workflow out of driver script --- account_manager.py | 73 +++++++++++++++------------------------------- 1 file changed, 24 insertions(+), 49 deletions(-) diff --git a/account_manager.py b/account_manager.py index 8992745..9c8c17f 100755 --- a/account_manager.py +++ b/account_manager.py @@ -26,78 +26,54 @@ args = parser.parse_args() timeout = 60 +queuename = rc_util.encode_name(args.username) + username = args.username state = args.state service = args.service -corr_id = str(uuid.uuid4()) - # Instantiate rabbitmq object rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"}) -callback_queue = rc_rmq.bind_queue(exclusive=True) msg = {} +msg["username"] = username +msg["state"] = state +msg["service"] = service +msg["queuename"] = queuename + +# publish msg with acctmgr.{uname} routing key. +rc_rmq.publish_msg( + { + "routing_key": f'acctmgr.request.{queuename}', + "msg": msg, + } +) -if state == 'blocked' or state == 'certification': - action = "lock" -elif state == 'ok': - action = "unlock" -else: - print("Invalid state provided. Check the help menu.") - -if args.service == 'all': - # send a broadcast message to all agents - rc_rmq.publish_msg( - { - "routing_key": f"{action}.{username}", - "props": pika.BasicProperties( - correlation_id=corr_id, reply_to=callback_queue - ), - "msg": {"username": username, "action": action, "service": service}, - } - ) -else: - for each_service in service: - rc_rmq.publish_msg( - { - "routing_key": f"{each_service}.{username}", - "props": pika.BasicProperties( - correlation_id=corr_id, reply_to=callback_queue - ), - "msg": {"username": username, "action": action, "service": service}, - } - ) def timeout_handler(signum, frame): print("Process timeout, there's some issue with agents") rc_rmq.stop_consume() -def callback(channel, method, properties, body): +def callback(ch, method, properties, body): msg = json.loads(body) username = msg["username"] - # Check if each task returned success - for each_task in msg["success"].values(): - if each_task == True: - success=True - else: - success=False - break - - if success: - print(f"Account for {username} has been {action}ed.\n Updating the user state in DB") - rc_util.update_state(username, state) + print(msg) + if msg["success"]: + print(f"Account for {username} has been {msg['action']}ed.\n Updating the user state in DB") else: print(f"There's some issue in account management agents for {username}") errmsg = msg.get("errmsg", []) for err in errmsg: print(err) + + ch.basic_ack(delivery_tag=method.delivery_tag) rc_rmq.stop_consume() - rc_rmq.disconnect() + rc_rmq.delete_queue(queuename) -print(f"{action} action for {args.username} requested.") +print(f"Request {username} account state set to {state}.") # Set initial timeout timer signal.signal(signal.SIGALRM, timeout_handler) @@ -106,9 +82,8 @@ signal.setitimer(signal.ITIMER_REAL, timeout) print("Waiting for completion...") rc_rmq.start_consume( { - "queue": callback_queue, - "exclusive": True, - "bind": False, + "queue": queuename, + "routing_key": f'certified.{queuename}', "cb": callback, } ) -- GitLab