From 4ff2ffe15b507ffc8f14689c8ecdb01c8671744b Mon Sep 17 00:00:00 2001 From: atlurie <atlurie@uab.edu> Date: Mon, 18 Apr 2022 22:27:58 -0500 Subject: [PATCH] Move away from using RPC queues and use regular queues --- prod_rmq_agents/expire_account.py | 23 ++++++++--------------- prod_rmq_agents/new_jobs.py | 25 +++++++++---------------- prod_rmq_agents/ssh_access.py | 22 +++++++++------------- 3 files changed, 26 insertions(+), 44 deletions(-) diff --git a/prod_rmq_agents/expire_account.py b/prod_rmq_agents/expire_account.py index c5189db..94e4212 100644 --- a/prod_rmq_agents/expire_account.py +++ b/prod_rmq_agents/expire_account.py @@ -22,13 +22,10 @@ def expire_account(ch, method, properties, body): msg = json.loads(body) username = msg["username"] action = msg["action"] - msg["success"] = {} - msg["success"][task] = False + msg["task"] = task + queuename = msg["queuename"] yesterday = date.today() - timedelta(days=1) - corr_id = properties.correlation_id - reply_to = properties.reply_to - try: expire_account_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "user;use {username}; set expirationdate {yesterday}; commit;"' unexpire_account_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "user;use {username}; set expirationdate 2037/12/31; commit;"' @@ -38,23 +35,19 @@ def expire_account(ch, method, properties, body): elif action == 'unlock': unblock_ssh = popen(unexpire_account_cmd).read().rstrip() - msg["success"][task] = True + msg["success"] = True logger.info(f"ssh expiration set to yesterday for user {username}") except Exception: - msg["success"][task] = False + msg["success"] = False msg["errmsg"] = "Exception raised, while expiring user's ssh access, check the logs for stack trace" logger.error("", exc_info=True) # send response to callback queue with it's correlation ID - if reply_to: - rc_rmq.publish_msg( - {"routing_key": reply_to, - "props": pika.BasicProperties( - correlation_id=corr_id, - ), - "msg": msg} - ) + rc_rmq.publish_msg( + {"routing_key": f'acctmgr.done.{queuename}', + "msg": msg} + ) logger.debug(f"User {username} confirmation sent for {action}ing {task}") diff --git a/prod_rmq_agents/new_jobs.py b/prod_rmq_agents/new_jobs.py index 5f26077..cb51d18 100644 --- a/prod_rmq_agents/new_jobs.py +++ b/prod_rmq_agents/new_jobs.py @@ -21,11 +21,8 @@ def new_jobs(ch, method, properties, body): msg = json.loads(body) username = msg["username"] action = msg["action"] - msg["success"] = {} - msg["success"][task] = False - - corr_id = properties.correlation_id - reply_to = properties.reply_to + msg["task"] = task + queuename = msg["queuename"] try: block_new_jobs_cmd = f"/cm/shared/apps/slurm/19.05.5/bin/sacctmgr --immediate update user {username} set maxjobs=0" @@ -36,23 +33,19 @@ def new_jobs(ch, method, properties, body): elif action == 'unlock': unblock_new_jobs = popen(unblock_new_jobs_cmd).read().rstrip() - msg["success"][task] = True + msg["success"] = True logger.info(f"Succeeded in blocking {username}'s jobs getting to run state") except Exception: - msg["success"][task] = False + msg["success"] = False msg["errmsg"] = "Exception raised while setting maxjobs that can enter run state, check the logs for stack trace" logger.error("", exc_info=True) - # send response to callback queue with it's correlation ID - if reply_to: - rc_rmq.publish_msg( - {"routing_key": reply_to, - "props": pika.BasicProperties( - correlation_id=corr_id, - ), - "msg": msg} - ) + + rc_rmq.publish_msg( + {"routing_key": f'acctmgr.done.{queuename}', + "msg": msg} + ) logger.debug(f"User {username} confirmation sent for {action}ing {task}") diff --git a/prod_rmq_agents/ssh_access.py b/prod_rmq_agents/ssh_access.py index 6d85b43..e289e57 100644 --- a/prod_rmq_agents/ssh_access.py +++ b/prod_rmq_agents/ssh_access.py @@ -21,9 +21,7 @@ def ssh_access(ch, method, properties, body): msg = json.loads(body) username = msg["username"] action = msg["action"] - msg["success"] = {} - msg["success"][task] = False - + msg["task"] = task corr_id = properties.correlation_id reply_to = properties.reply_to @@ -36,23 +34,21 @@ def ssh_access(ch, method, properties, body): elif action == 'unlock': unblock_ssh = popen(unblock_ssh_cmd).read().rstrip() - msg["success"][task] = True + msg["success"] = True logger.info(f"User {username} is added to nossh group") except Exception: - msg["success"][task] = False + msg["success"] = False msg["errmsg"] = "Exception raised, while blocking user's ssh access, check the logs for stack trace" logger.error("", exc_info=True) # send response to callback queue with it's correlation ID - if reply_to: - rc_rmq.publish_msg( - {"routing_key": reply_to, - "props": pika.BasicProperties( - correlation_id=corr_id, - ), - "msg": msg} - ) + rc_rmq.publish_msg( + { + "routing_key": f'acctmgr.done.{queuename}', + "msg": msg + } + ) logger.debug(f"User {username} confirmation sent for {action}ing {task}") -- GitLab