diff --git a/account_manager.py b/account_manager.py index 1604784968e2fc589bbc0693a51f907e8bef5743..3d1dba2f3eb6cabb3900484dc838914f944db9d2 100755 --- a/account_manager.py +++ b/account_manager.py @@ -11,15 +11,26 @@ import rabbit_config as rcfg import rc_util from rc_rmq import RCRMQ -parser = argparse.ArgumentParser(description = "Account management driver script") -parser.add_argument( - "username", help="Username that should be locked/unlocked") +parser = argparse.ArgumentParser( + description="Account management driver script" +) +parser.add_argument("username", help="Username that should be locked/unlocked") parser.add_argument( - "state", choices=['ok', 'hold', 'certification', 'pre_certification'], help="Choose from states (ok,hold,certification,pre_certification)") + "state", + choices=["ok", "hold", "certification", "pre_certification"], + help="Choose from states (ok,hold,certification,pre_certification)", +) parser.add_argument( - "-s", "--service", nargs='+', default='all', choices=['ssh', 'newjobs', 'expiration', 'all'], help="List one or more services to be blocked (default: %(default)s)") + "-s", + "--service", + nargs="+", + default="all", + choices=["ssh", "newjobs", "expiration", "all"], + help="List one or more services to be blocked (default: %(default)s)", +) parser.add_argument( - "-v", "--verbose", action="store_true", help="verbose output") + "-v", "--verbose", action="store_true", help="verbose output" +) parser.add_argument( "-n", "--dry-run", action="store_true", help="enable dry run mode" ) @@ -46,7 +57,7 @@ msg["updated_by"], msg["host"] = rc_util.get_caller_info() # publish msg with acctmgr.{uname} routing key. rc_rmq.publish_msg( { - "routing_key": f'acctmgr.request.{queuename}', + "routing_key": f"acctmgr.request.{queuename}", "msg": msg, } ) @@ -62,18 +73,23 @@ def callback(ch, method, properties, body): username = msg["username"] if msg["success"]: - print(f"Account for {username} has been {msg['action']}ed.\n Updating the user state in DB") + print( + f"Account for {username} has been {msg['action']}ed.\n Updating" + " the user state in DB" + ) else: - print(f"There's some issue in account management agents for {username}") + print( + f"There's some issue in account management agents for {username}" + ) errmsg = msg.get("errmsg", []) for err in errmsg: print(err) - ch.basic_ack(delivery_tag=method.delivery_tag) rc_rmq.stop_consume() rc_rmq.delete_queue(queuename) + print(f"Request {username} account state set to {state}.") # Set initial timeout timer @@ -84,7 +100,7 @@ print("Waiting for completion...") rc_rmq.start_consume( { "queue": queuename, - "routing_key": f'certified.{queuename}', + "routing_key": f"certified.{queuename}", "cb": callback, } ) diff --git a/prod_rmq_agents/acct_mgmt_workflow.py b/prod_rmq_agents/acct_mgmt_workflow.py index 79a60f937d77b43d2afc036f6bb47e33287503a2..fd83dd3f8b495badefe0b30ab6025c68abbdac01 100755 --- a/prod_rmq_agents/acct_mgmt_workflow.py +++ b/prod_rmq_agents/acct_mgmt_workflow.py @@ -18,6 +18,7 @@ rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"}) tracking = {} + def manage_acct(ch, method, properties, body): msg = json.loads(body) op = method.routing_key.split(".")[1] @@ -30,24 +31,24 @@ def manage_acct(ch, method, properties, body): if username in tracking: current = tracking[username] else: - current = tracking[username] = {} + current = tracking[username] = {} - if op == 'request': - if state == 'hold' or state == 'certification': + if op == "request": + if state == "hold" or state == "certification": msg["action"] = "lock" - elif state == 'ok' or state == 'pre_certification': + elif state == "ok" or state == "pre_certification": msg["action"] = "unlock" else: print("Invalid state provided. Check the help menu.") - if service == 'all': + if service == "all": current["new_jobs"] = None current["expire_account"] = None # send a broadcast message to all agents rc_rmq.publish_msg( { "routing_key": f"{msg['action']}.{queuename}", - "msg": msg, + "msg": msg, } ) else: @@ -56,12 +57,11 @@ def manage_acct(ch, method, properties, body): rc_rmq.publish_msg( { "routing_key": f"{each_service}.{queuename}", - "msg": msg + "msg": msg, } ) - - elif op == 'done': + elif op == "done": # Check if each task/agent returned success current[msg["task"]] = msg["success"] @@ -79,7 +79,7 @@ def manage_acct(ch, method, properties, body): # Send done msg to account_manager.py rc_rmq.publish_msg( { - "routing_key": f'certified.{queuename}', + "routing_key": f"certified.{queuename}", "msg": msg, } ) @@ -87,13 +87,15 @@ def manage_acct(ch, method, properties, body): ch.basic_ack(delivery_tag=method.delivery_tag) except Exception: - msg["errmsg"] = "Exception raised in account manager workflow agent , check the logs for stack trace" - logger.error("", exc_info=True) + msg["errmsg"] = ( + "Exception raised in account manager workflow agent , check the" + " logs for stack trace" + ) + logger.error("", exc_info=True) + -rc_rmq.bind_queue(queue=task, routing_key='acctmgr.request.*', durable=True) -rc_rmq.bind_queue(queue=task, routing_key='acctmgr.done.*', durable=True) +rc_rmq.bind_queue(queue=task, routing_key="acctmgr.request.*", durable=True) +rc_rmq.bind_queue(queue=task, routing_key="acctmgr.done.*", durable=True) print("Waiting for completion...") -rc_rmq.start_consume( - {"queue": task, "cb": manage_acct} -) +rc_rmq.start_consume({"queue": task, "cb": manage_acct}) diff --git a/prod_rmq_agents/expire_account.py b/prod_rmq_agents/expire_account.py index b8de124510c0c9bf6cdd986180803bfffb5301d7..0ebfd729cc0fcca07635d8fead4f245c4bf95eae 100644 --- a/prod_rmq_agents/expire_account.py +++ b/prod_rmq_agents/expire_account.py @@ -29,12 +29,18 @@ def expire_account(ch, method, properties, body): yesterday = date.today() - timedelta(days=1) try: - expire_account_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "user;use {username}; set expirationdate {yesterday}; commit;"' - unexpire_account_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "user;use {username}; set expirationdate 2037/12/31; commit;"' - - if action == 'lock': + expire_account_cmd = ( + f'/cm/local/apps/cmd/bin/cmsh -n -c "user;use {username}; set' + f' expirationdate {yesterday}; commit;"' + ) + unexpire_account_cmd = ( + f'/cm/local/apps/cmd/bin/cmsh -n -c "user;use {username}; set' + ' expirationdate 2037/12/31; commit;"' + ) + + if action == "lock": block_ssh = popen(expire_account_cmd).read().rstrip() - elif action == 'unlock': + elif action == "unlock": unblock_ssh = popen(unexpire_account_cmd).read().rstrip() msg["success"] = True @@ -42,13 +48,15 @@ def expire_account(ch, method, properties, body): except Exception: msg["success"] = False - msg["errmsg"] = "Exception raised, while expiring user's ssh access, check the logs for stack trace" + msg["errmsg"] = ( + "Exception raised, while expiring user's ssh access, check the" + " logs for stack trace" + ) logger.error("", exc_info=True) # send response to callback queue with it's correlation ID rc_rmq.publish_msg( - {"routing_key": f'acctmgr.done.{queuename}', - "msg": msg} + {"routing_key": f"acctmgr.done.{queuename}", "msg": msg} ) logger.debug(f"User {username} confirmation sent for {action}ing {task}") @@ -57,13 +65,11 @@ def expire_account(ch, method, properties, body): logger.info(f"Start listening to queue: {task}") -rc_rmq.bind_queue(queue=task, routing_key='lock.*', durable=True) -rc_rmq.bind_queue(queue=task, routing_key='unlock.*', durable=True) -rc_rmq.bind_queue(queue=task, routing_key='expiration.*', durable=True) +rc_rmq.bind_queue(queue=task, routing_key="lock.*", durable=True) +rc_rmq.bind_queue(queue=task, routing_key="unlock.*", durable=True) +rc_rmq.bind_queue(queue=task, routing_key="expiration.*", durable=True) -rc_rmq.start_consume( - {"queue": task, "cb": expire_account} -) +rc_rmq.start_consume({"queue": task, "cb": expire_account}) logger.info("Disconnected") rc_rmq.disconnect() diff --git a/prod_rmq_agents/new_jobs.py b/prod_rmq_agents/new_jobs.py index f8d9abecd632176049631ea8360307af3c0ef501..861e6418094deb96c424ddc6c8047bce931f9178 100644 --- a/prod_rmq_agents/new_jobs.py +++ b/prod_rmq_agents/new_jobs.py @@ -27,26 +27,35 @@ def new_jobs(ch, method, properties, body): queuename = msg["queuename"] try: - block_new_jobs_cmd = f"/cm/shared/apps/slurm/19.05.5/bin/sacctmgr --immediate update user {username} set maxjobs=0" - unblock_new_jobs_cmd = f"/cm/shared/apps/slurm/19.05.5/bin/sacctmgr --immediate update user {username} set maxjobs=-1" - - if action == 'lock': + block_new_jobs_cmd = ( + "/cm/shared/apps/slurm/19.05.5/bin/sacctmgr --immediate update" + f" user {username} set maxjobs=0" + ) + unblock_new_jobs_cmd = ( + "/cm/shared/apps/slurm/19.05.5/bin/sacctmgr --immediate update" + f" user {username} set maxjobs=-1" + ) + + if action == "lock": block_new_jobs = popen(block_new_jobs_cmd).read().rstrip() - elif action == 'unlock': + elif action == "unlock": unblock_new_jobs = popen(unblock_new_jobs_cmd).read().rstrip() msg["success"] = True - logger.info(f"Succeeded in blocking {username}'s jobs getting to run state") + logger.info( + f"Succeeded in blocking {username}'s jobs getting to run state" + ) except Exception: msg["success"] = False - msg["errmsg"] = "Exception raised while setting maxjobs that can enter run state, check the logs for stack trace" + msg["errmsg"] = ( + "Exception raised while setting maxjobs that can enter run state," + " check the logs for stack trace" + ) logger.error("", exc_info=True) - rc_rmq.publish_msg( - {"routing_key": f'acctmgr.done.{queuename}', - "msg": msg} + {"routing_key": f"acctmgr.done.{queuename}", "msg": msg} ) logger.debug(f"User {username} confirmation sent for {action}ing {task}") @@ -55,12 +64,10 @@ def new_jobs(ch, method, properties, body): logger.info(f"Start listening to queue: {task}") -rc_rmq.bind_queue(queue=task, routing_key='lock.*', durable=True) -rc_rmq.bind_queue(queue=task, routing_key='unlock.*', durable=True) -rc_rmq.bind_queue(queue=task, routing_key='newjobs.*', durable=True) -rc_rmq.start_consume( - {"queue": task, "cb": new_jobs} -) +rc_rmq.bind_queue(queue=task, routing_key="lock.*", durable=True) +rc_rmq.bind_queue(queue=task, routing_key="unlock.*", durable=True) +rc_rmq.bind_queue(queue=task, routing_key="newjobs.*", durable=True) +rc_rmq.start_consume({"queue": task, "cb": new_jobs}) logger.info("Disconnected") rc_rmq.disconnect() diff --git a/prod_rmq_agents/ssh_access.py b/prod_rmq_agents/ssh_access.py index fea6eacdfbfaec05404cfe8fc7da4294bdcc3a86..8d84bf105b95d2425d5ad2e7f9568acddd622c5a 100644 --- a/prod_rmq_agents/ssh_access.py +++ b/prod_rmq_agents/ssh_access.py @@ -28,12 +28,18 @@ def ssh_access(ch, method, properties, body): reply_to = properties.reply_to try: - block_ssh_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "group; use nossh; append members {username}; commit;"' - unblock_ssh_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "group; use nossh; removefrom members {username}; commit;"' - - if action == 'lock': + block_ssh_cmd = ( + '/cm/local/apps/cmd/bin/cmsh -n -c "group; use nossh; append' + f' members {username}; commit;"' + ) + unblock_ssh_cmd = ( + '/cm/local/apps/cmd/bin/cmsh -n -c "group; use nossh; removefrom' + f' members {username}; commit;"' + ) + + if action == "lock": block_ssh = popen(block_ssh_cmd).read().rstrip() - elif action == 'unlock': + elif action == "unlock": unblock_ssh = popen(unblock_ssh_cmd).read().rstrip() msg["success"] = True @@ -41,15 +47,15 @@ def ssh_access(ch, method, properties, body): except Exception: msg["success"] = False - msg["errmsg"] = "Exception raised, while blocking user's ssh access, check the logs for stack trace" + msg["errmsg"] = ( + "Exception raised, while blocking user's ssh access, check the" + " logs for stack trace" + ) logger.error("", exc_info=True) # send response to callback queue with it's correlation ID rc_rmq.publish_msg( - { - "routing_key": f'acctmgr.done.{queuename}', - "msg": msg - } + {"routing_key": f"acctmgr.done.{queuename}", "msg": msg} ) logger.debug(f"User {username} confirmation sent for {action}ing {task}") @@ -58,11 +64,9 @@ def ssh_access(ch, method, properties, body): logger.info(f"Start listening to queue: {task}") -rc_rmq.bind_queue(queue=task, routing_key='ssh.*', durable=True) +rc_rmq.bind_queue(queue=task, routing_key="ssh.*", durable=True) -rc_rmq.start_consume( - {"queue": task, "cb": ssh_access} -) +rc_rmq.start_consume({"queue": task, "cb": ssh_access}) logger.info("Disconnected") rc_rmq.disconnect()