diff --git a/prod_rmq_agents/expire_account.py b/prod_rmq_agents/expire_account.py index c5189db0a9c56ef5918e2cb26c0d5d97ccb2fb0b..94e421276fdb68e2bc07800d75beb0f696fb64c3 100644 --- a/prod_rmq_agents/expire_account.py +++ b/prod_rmq_agents/expire_account.py @@ -22,13 +22,10 @@ def expire_account(ch, method, properties, body): msg = json.loads(body) username = msg["username"] action = msg["action"] - msg["success"] = {} - msg["success"][task] = False + msg["task"] = task + queuename = msg["queuename"] yesterday = date.today() - timedelta(days=1) - corr_id = properties.correlation_id - reply_to = properties.reply_to - try: expire_account_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "user;use {username}; set expirationdate {yesterday}; commit;"' unexpire_account_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "user;use {username}; set expirationdate 2037/12/31; commit;"' @@ -38,23 +35,19 @@ def expire_account(ch, method, properties, body): elif action == 'unlock': unblock_ssh = popen(unexpire_account_cmd).read().rstrip() - msg["success"][task] = True + msg["success"] = True logger.info(f"ssh expiration set to yesterday for user {username}") except Exception: - msg["success"][task] = False + msg["success"] = False msg["errmsg"] = "Exception raised, while expiring user's ssh access, check the logs for stack trace" logger.error("", exc_info=True) # send response to callback queue with it's correlation ID - if reply_to: - rc_rmq.publish_msg( - {"routing_key": reply_to, - "props": pika.BasicProperties( - correlation_id=corr_id, - ), - "msg": msg} - ) + rc_rmq.publish_msg( + {"routing_key": f'acctmgr.done.{queuename}', + "msg": msg} + ) logger.debug(f"User {username} confirmation sent for {action}ing {task}") diff --git a/prod_rmq_agents/new_jobs.py b/prod_rmq_agents/new_jobs.py index 5f260773d6857e86aa630ca537087fbcc41fe632..cb51d18e4f21e77abe2d1391724ca18b474a8727 100644 --- a/prod_rmq_agents/new_jobs.py +++ b/prod_rmq_agents/new_jobs.py @@ -21,11 +21,8 @@ def new_jobs(ch, method, properties, body): msg = json.loads(body) username = msg["username"] action = msg["action"] - msg["success"] = {} - msg["success"][task] = False - - corr_id = properties.correlation_id - reply_to = properties.reply_to + msg["task"] = task + queuename = msg["queuename"] try: block_new_jobs_cmd = f"/cm/shared/apps/slurm/19.05.5/bin/sacctmgr --immediate update user {username} set maxjobs=0" @@ -36,23 +33,19 @@ def new_jobs(ch, method, properties, body): elif action == 'unlock': unblock_new_jobs = popen(unblock_new_jobs_cmd).read().rstrip() - msg["success"][task] = True + msg["success"] = True logger.info(f"Succeeded in blocking {username}'s jobs getting to run state") except Exception: - msg["success"][task] = False + msg["success"] = False msg["errmsg"] = "Exception raised while setting maxjobs that can enter run state, check the logs for stack trace" logger.error("", exc_info=True) - # send response to callback queue with it's correlation ID - if reply_to: - rc_rmq.publish_msg( - {"routing_key": reply_to, - "props": pika.BasicProperties( - correlation_id=corr_id, - ), - "msg": msg} - ) + + rc_rmq.publish_msg( + {"routing_key": f'acctmgr.done.{queuename}', + "msg": msg} + ) logger.debug(f"User {username} confirmation sent for {action}ing {task}") diff --git a/prod_rmq_agents/ssh_access.py b/prod_rmq_agents/ssh_access.py index 6d85b4326d149606efd32afad8a6b0418bc0a63d..e289e579ab618a8ccc3f66ad070db22c1016f48f 100644 --- a/prod_rmq_agents/ssh_access.py +++ b/prod_rmq_agents/ssh_access.py @@ -21,9 +21,7 @@ def ssh_access(ch, method, properties, body): msg = json.loads(body) username = msg["username"] action = msg["action"] - msg["success"] = {} - msg["success"][task] = False - + msg["task"] = task corr_id = properties.correlation_id reply_to = properties.reply_to @@ -36,23 +34,21 @@ def ssh_access(ch, method, properties, body): elif action == 'unlock': unblock_ssh = popen(unblock_ssh_cmd).read().rstrip() - msg["success"][task] = True + msg["success"] = True logger.info(f"User {username} is added to nossh group") except Exception: - msg["success"][task] = False + msg["success"] = False msg["errmsg"] = "Exception raised, while blocking user's ssh access, check the logs for stack trace" logger.error("", exc_info=True) # send response to callback queue with it's correlation ID - if reply_to: - rc_rmq.publish_msg( - {"routing_key": reply_to, - "props": pika.BasicProperties( - correlation_id=corr_id, - ), - "msg": msg} - ) + rc_rmq.publish_msg( + { + "routing_key": f'acctmgr.done.{queuename}', + "msg": msg + } + ) logger.debug(f"User {username} confirmation sent for {action}ing {task}")