diff --git a/account_manager.py b/account_manager.py index 98e7a3b89088fc6047c4389e45a02ee476a5632c..0b9af882ed4b7ba5643433b241bc5f7691d522e1 100755 --- a/account_manager.py +++ b/account_manager.py @@ -3,22 +3,27 @@ import json import rc_util import argparse import signal -import uuid -import pika -import rc_util from rc_rmq import RCRMQ import rabbit_config as rcfg -import time -parser = argparse.ArgumentParser(description = "Account management driver script") -parser.add_argument( - "username", help="Username that should be locked/unlocked") +parser = argparse.ArgumentParser( + description="Account management driver script" +) +parser.add_argument("username", help="Username that should be locked/unlocked") parser.add_argument( - "state", help="Choose from states (ok,block,certify) to put the user in") + "state", help="Choose from states (ok,block,certify) to put the user in" +) parser.add_argument( - "-s", "--service", nargs='+', default='all', choices=['ssh', 'newjobs', 'expiration', 'all'], help="List one or more services to be blocked (default: %(default)s)") + "-s", + "--service", + nargs="+", + default="all", + choices=["ssh", "newjobs", "expiration", "all"], + help="List one or more services to be blocked (default: %(default)s)", +) parser.add_argument( - "-v", "--verbose", action="store_true", help="verbose output") + "-v", "--verbose", action="store_true", help="verbose output" +) parser.add_argument( "-n", "--dry-run", action="store_true", help="enable dry run mode" ) @@ -44,7 +49,7 @@ msg["queuename"] = queuename # publish msg with acctmgr.{uname} routing key. rc_rmq.publish_msg( { - "routing_key": f'acctmgr.request.{queuename}', + "routing_key": f"acctmgr.request.{queuename}", "msg": msg, } ) @@ -60,18 +65,23 @@ def callback(ch, method, properties, body): username = msg["username"] if msg["success"]: - print(f"Account for {username} has been {msg['action']}ed.\n Updating the user state in DB") + print( + f"Account for {username} has been {msg['action']}ed.\n Updating" + " the user state in DB" + ) else: - print(f"There's some issue in account management agents for {username}") + print( + f"There's some issue in account management agents for {username}" + ) errmsg = msg.get("errmsg", []) for err in errmsg: print(err) - ch.basic_ack(delivery_tag=method.delivery_tag) rc_rmq.stop_consume() rc_rmq.delete_queue(queuename) + print(f"Request {username} account state set to {state}.") # Set initial timeout timer @@ -82,7 +92,7 @@ print("Waiting for completion...") rc_rmq.start_consume( { "queue": queuename, - "routing_key": f'certified.{queuename}', + "routing_key": f"certified.{queuename}", "cb": callback, } ) diff --git a/prod_rmq_agents/acct_mgmt_workflow.py b/prod_rmq_agents/acct_mgmt_workflow.py index 7b7bc0cf7cb48ff9e045a918e8f599424a1c1a7a..2be72986d8b30fcbb9972a9761696a87c89ee34b 100755 --- a/prod_rmq_agents/acct_mgmt_workflow.py +++ b/prod_rmq_agents/acct_mgmt_workflow.py @@ -1,14 +1,8 @@ #!/usr/bin/env python3 import json import rc_util -import argparse -import signal -import uuid -import pika -import rc_util from rc_rmq import RCRMQ import rabbit_config as rcfg -import time task = "acctmgr" @@ -17,6 +11,7 @@ rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"}) tracking = {} + def manage_acct(ch, method, properties, body): msg = json.loads(body) op = method.routing_key.split(".")[1] @@ -28,38 +23,34 @@ def manage_acct(ch, method, properties, body): if username in tracking: current = tracking[username] else: - current = tracking[username] = {} + current = tracking[username] = {} - if op == 'request': - if state == 'blocked' or state == 'certification': + if op == "request": + if state == "blocked" or state == "certification": msg["action"] = "lock" - elif state == 'ok': + elif state == "ok": msg["action"] = "unlock" else: print("Invalid state provided. Check the help menu.") - - if service == 'all': + + if service == "all": current["new_jobs"] = None current["expire_account"] = None # send a broadcast message to all agents rc_rmq.publish_msg( { "routing_key": f"{msg['action']}.{queuename}", - "msg": msg, + "msg": msg, } ) else: for each_service in service: current[each_service] = None rc_rmq.publish_msg( - { - "routing_key": f"{each_service}.{queuename}", - "msg": msg - } + {"routing_key": f"{each_service}.{queuename}", "msg": msg} ) - - elif op == 'done': + elif op == "done": # Check if each task/agent returned success current[msg["task"]] = msg["success"] @@ -71,22 +62,20 @@ def manage_acct(ch, method, properties, body): if done: rc_util.update_state(username, state) - + # Send done msg to account_manager.py rc_rmq.publish_msg( { - "routing_key": f'certified.{queuename}', + "routing_key": f"certified.{queuename}", "msg": msg, } ) ch.basic_ack(delivery_tag=method.delivery_tag) -rc_rmq.bind_queue(queue=task, routing_key='acctmgr.request.*', durable=True) -rc_rmq.bind_queue(queue=task, routing_key='acctmgr.done.*', durable=True) -print("Waiting for completion...") -rc_rmq.start_consume( - {"queue": task, "cb": manage_acct} -) +rc_rmq.bind_queue(queue=task, routing_key="acctmgr.request.*", durable=True) +rc_rmq.bind_queue(queue=task, routing_key="acctmgr.done.*", durable=True) +print("Waiting for completion...") +rc_rmq.start_consume({"queue": task, "cb": manage_acct}) diff --git a/prod_rmq_agents/new_jobs.py b/prod_rmq_agents/new_jobs.py index cb51d18e4f21e77abe2d1391724ca18b474a8727..588be10320513398cb5ff564ff94e74f02231140 100644 --- a/prod_rmq_agents/new_jobs.py +++ b/prod_rmq_agents/new_jobs.py @@ -1,10 +1,7 @@ #!/usr/bin/env python -import os import json -import pika import rc_util from os import popen -from pathlib import Path from rc_rmq import RCRMQ import rabbit_config as rcfg @@ -25,26 +22,35 @@ def new_jobs(ch, method, properties, body): queuename = msg["queuename"] try: - block_new_jobs_cmd = f"/cm/shared/apps/slurm/19.05.5/bin/sacctmgr --immediate update user {username} set maxjobs=0" - unblock_new_jobs_cmd = f"/cm/shared/apps/slurm/19.05.5/bin/sacctmgr --immediate update user {username} set maxjobs=-1" - - if action == 'lock': - block_new_jobs = popen(block_new_jobs_cmd).read().rstrip() - elif action == 'unlock': - unblock_new_jobs = popen(unblock_new_jobs_cmd).read().rstrip() + block_new_jobs_cmd = ( + "/cm/shared/apps/slurm/19.05.5/bin/sacctmgr --immediate update" + f" user {username} set maxjobs=0" + ) + unblock_new_jobs_cmd = ( + "/cm/shared/apps/slurm/19.05.5/bin/sacctmgr --immediate update" + f" user {username} set maxjobs=-1" + ) + + if action == "lock": + popen(block_new_jobs_cmd).read().rstrip() + elif action == "unlock": + popen(unblock_new_jobs_cmd).read().rstrip() msg["success"] = True - logger.info(f"Succeeded in blocking {username}'s jobs getting to run state") + logger.info( + f"Succeeded in blocking {username}'s jobs getting to run state" + ) except Exception: msg["success"] = False - msg["errmsg"] = "Exception raised while setting maxjobs that can enter run state, check the logs for stack trace" + msg["errmsg"] = ( + "Exception raised while setting maxjobs that can enter run state," + " check the logs for stack trace" + ) logger.error("", exc_info=True) - rc_rmq.publish_msg( - {"routing_key": f'acctmgr.done.{queuename}', - "msg": msg} + {"routing_key": f"acctmgr.done.{queuename}", "msg": msg} ) logger.debug(f"User {username} confirmation sent for {action}ing {task}") @@ -53,12 +59,10 @@ def new_jobs(ch, method, properties, body): logger.info(f"Start listening to queue: {task}") -rc_rmq.bind_queue(queue=task, routing_key='lock.*', durable=True) -rc_rmq.bind_queue(queue=task, routing_key='unlock.*', durable=True) -rc_rmq.bind_queue(queue=task, routing_key='newjobs.*', durable=True) -rc_rmq.start_consume( - {"queue": task, "cb": new_jobs} -) +rc_rmq.bind_queue(queue=task, routing_key="lock.*", durable=True) +rc_rmq.bind_queue(queue=task, routing_key="unlock.*", durable=True) +rc_rmq.bind_queue(queue=task, routing_key="newjobs.*", durable=True) +rc_rmq.start_consume({"queue": task, "cb": new_jobs}) logger.info("Disconnected") rc_rmq.disconnect()