diff --git a/account_manager.py b/account_manager.py new file mode 100755 index 0000000000000000000000000000000000000000..98e7a3b89088fc6047c4389e45a02ee476a5632c --- /dev/null +++ b/account_manager.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 +import json +import rc_util +import argparse +import signal +import uuid +import pika +import rc_util +from rc_rmq import RCRMQ +import rabbit_config as rcfg +import time + +parser = argparse.ArgumentParser(description = "Account management driver script") +parser.add_argument( + "username", help="Username that should be locked/unlocked") +parser.add_argument( + "state", help="Choose from states (ok,block,certify) to put the user in") +parser.add_argument( + "-s", "--service", nargs='+', default='all', choices=['ssh', 'newjobs', 'expiration', 'all'], help="List one or more services to be blocked (default: %(default)s)") +parser.add_argument( + "-v", "--verbose", action="store_true", help="verbose output") +parser.add_argument( + "-n", "--dry-run", action="store_true", help="enable dry run mode" +) +args = parser.parse_args() + +timeout = 60 + +queuename = rc_util.encode_name(args.username) + +username = args.username +state = args.state +service = args.service + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"}) + +msg = {} +msg["username"] = username +msg["state"] = state +msg["service"] = service +msg["queuename"] = queuename + +# publish msg with acctmgr.{uname} routing key. +rc_rmq.publish_msg( + { + "routing_key": f'acctmgr.request.{queuename}', + "msg": msg, + } +) + + +def timeout_handler(signum, frame): + print("Process timeout, there's some issue with agents") + rc_rmq.stop_consume() + + +def callback(ch, method, properties, body): + msg = json.loads(body) + username = msg["username"] + + if msg["success"]: + print(f"Account for {username} has been {msg['action']}ed.\n Updating the user state in DB") + else: + print(f"There's some issue in account management agents for {username}") + errmsg = msg.get("errmsg", []) + for err in errmsg: + print(err) + + + ch.basic_ack(delivery_tag=method.delivery_tag) + rc_rmq.stop_consume() + rc_rmq.delete_queue(queuename) + +print(f"Request {username} account state set to {state}.") + +# Set initial timeout timer +signal.signal(signal.SIGALRM, timeout_handler) +signal.setitimer(signal.ITIMER_REAL, timeout) + +print("Waiting for completion...") +rc_rmq.start_consume( + { + "queue": queuename, + "routing_key": f'certified.{queuename}', + "cb": callback, + } +) diff --git a/prod_rmq_agents/acct_mgmt_workflow.py b/prod_rmq_agents/acct_mgmt_workflow.py new file mode 100755 index 0000000000000000000000000000000000000000..7b7bc0cf7cb48ff9e045a918e8f599424a1c1a7a --- /dev/null +++ b/prod_rmq_agents/acct_mgmt_workflow.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python3 +import json +import rc_util +import argparse +import signal +import uuid +import pika +import rc_util +from rc_rmq import RCRMQ +import rabbit_config as rcfg +import time + +task = "acctmgr" + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"}) + +tracking = {} + +def manage_acct(ch, method, properties, body): + msg = json.loads(body) + op = method.routing_key.split(".")[1] + username = msg["username"] + state = msg["state"] + service = msg["service"] + queuename = msg["queuename"] + + if username in tracking: + current = tracking[username] + else: + current = tracking[username] = {} + + if op == 'request': + if state == 'blocked' or state == 'certification': + msg["action"] = "lock" + elif state == 'ok': + msg["action"] = "unlock" + else: + print("Invalid state provided. Check the help menu.") + + if service == 'all': + current["new_jobs"] = None + current["expire_account"] = None + # send a broadcast message to all agents + rc_rmq.publish_msg( + { + "routing_key": f"{msg['action']}.{queuename}", + "msg": msg, + } + ) + else: + for each_service in service: + current[each_service] = None + rc_rmq.publish_msg( + { + "routing_key": f"{each_service}.{queuename}", + "msg": msg + } + ) + + + elif op == 'done': + # Check if each task/agent returned success + current[msg["task"]] = msg["success"] + + done = True + + for task in current.keys(): + if current[task] is None: + done = False + + if done: + rc_util.update_state(username, state) + + # Send done msg to account_manager.py + rc_rmq.publish_msg( + { + "routing_key": f'certified.{queuename}', + "msg": msg, + } + ) + + ch.basic_ack(delivery_tag=method.delivery_tag) + +rc_rmq.bind_queue(queue=task, routing_key='acctmgr.request.*', durable=True) +rc_rmq.bind_queue(queue=task, routing_key='acctmgr.done.*', durable=True) + +print("Waiting for completion...") +rc_rmq.start_consume( + {"queue": task, "cb": manage_acct} +) + diff --git a/prod_rmq_agents/expire_account.py b/prod_rmq_agents/expire_account.py new file mode 100644 index 0000000000000000000000000000000000000000..94e421276fdb68e2bc07800d75beb0f696fb64c3 --- /dev/null +++ b/prod_rmq_agents/expire_account.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python +import os +import json +import pika +import rc_util +from os import popen +from pathlib import Path +from rc_rmq import RCRMQ +import rabbit_config as rcfg +from datetime import date, timedelta + +task = "expire_account" + +args = rc_util.get_args() +logger = rc_util.get_logger(args) + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"}) + + +def expire_account(ch, method, properties, body): + msg = json.loads(body) + username = msg["username"] + action = msg["action"] + msg["task"] = task + queuename = msg["queuename"] + yesterday = date.today() - timedelta(days=1) + + try: + expire_account_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "user;use {username}; set expirationdate {yesterday}; commit;"' + unexpire_account_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "user;use {username}; set expirationdate 2037/12/31; commit;"' + + if action == 'lock': + block_ssh = popen(expire_account_cmd).read().rstrip() + elif action == 'unlock': + unblock_ssh = popen(unexpire_account_cmd).read().rstrip() + + msg["success"] = True + logger.info(f"ssh expiration set to yesterday for user {username}") + + except Exception: + msg["success"] = False + msg["errmsg"] = "Exception raised, while expiring user's ssh access, check the logs for stack trace" + logger.error("", exc_info=True) + + # send response to callback queue with it's correlation ID + rc_rmq.publish_msg( + {"routing_key": f'acctmgr.done.{queuename}', + "msg": msg} + ) + + logger.debug(f"User {username} confirmation sent for {action}ing {task}") + + ch.basic_ack(delivery_tag=method.delivery_tag) + + +logger.info(f"Start listening to queue: {task}") +rc_rmq.bind_queue(queue=task, routing_key='lock.*', durable=True) +rc_rmq.bind_queue(queue=task, routing_key='unlock.*', durable=True) +rc_rmq.bind_queue(queue=task, routing_key='expiration.*', durable=True) + +rc_rmq.start_consume( + {"queue": task, "cb": expire_account} +) + +logger.info("Disconnected") +rc_rmq.disconnect() diff --git a/prod_rmq_agents/new_jobs.py b/prod_rmq_agents/new_jobs.py new file mode 100644 index 0000000000000000000000000000000000000000..cb51d18e4f21e77abe2d1391724ca18b474a8727 --- /dev/null +++ b/prod_rmq_agents/new_jobs.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python +import os +import json +import pika +import rc_util +from os import popen +from pathlib import Path +from rc_rmq import RCRMQ +import rabbit_config as rcfg + +task = "new_jobs" + +args = rc_util.get_args() +logger = rc_util.get_logger(args) + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"}) + + +def new_jobs(ch, method, properties, body): + msg = json.loads(body) + username = msg["username"] + action = msg["action"] + msg["task"] = task + queuename = msg["queuename"] + + try: + block_new_jobs_cmd = f"/cm/shared/apps/slurm/19.05.5/bin/sacctmgr --immediate update user {username} set maxjobs=0" + unblock_new_jobs_cmd = f"/cm/shared/apps/slurm/19.05.5/bin/sacctmgr --immediate update user {username} set maxjobs=-1" + + if action == 'lock': + block_new_jobs = popen(block_new_jobs_cmd).read().rstrip() + elif action == 'unlock': + unblock_new_jobs = popen(unblock_new_jobs_cmd).read().rstrip() + + msg["success"] = True + logger.info(f"Succeeded in blocking {username}'s jobs getting to run state") + + except Exception: + msg["success"] = False + msg["errmsg"] = "Exception raised while setting maxjobs that can enter run state, check the logs for stack trace" + logger.error("", exc_info=True) + + + rc_rmq.publish_msg( + {"routing_key": f'acctmgr.done.{queuename}', + "msg": msg} + ) + + logger.debug(f"User {username} confirmation sent for {action}ing {task}") + + ch.basic_ack(delivery_tag=method.delivery_tag) + + +logger.info(f"Start listening to queue: {task}") +rc_rmq.bind_queue(queue=task, routing_key='lock.*', durable=True) +rc_rmq.bind_queue(queue=task, routing_key='unlock.*', durable=True) +rc_rmq.bind_queue(queue=task, routing_key='newjobs.*', durable=True) +rc_rmq.start_consume( + {"queue": task, "cb": new_jobs} +) + +logger.info("Disconnected") +rc_rmq.disconnect() diff --git a/prod_rmq_agents/ssh_access.py b/prod_rmq_agents/ssh_access.py new file mode 100644 index 0000000000000000000000000000000000000000..e289e579ab618a8ccc3f66ad070db22c1016f48f --- /dev/null +++ b/prod_rmq_agents/ssh_access.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python +import os +import json +import pika +import rc_util +from os import popen +from pathlib import Path +from rc_rmq import RCRMQ +import rabbit_config as rcfg + +task = "ssh_access" + +args = rc_util.get_args() +logger = rc_util.get_logger(args) + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"}) + + +def ssh_access(ch, method, properties, body): + msg = json.loads(body) + username = msg["username"] + action = msg["action"] + msg["task"] = task + corr_id = properties.correlation_id + reply_to = properties.reply_to + + try: + block_ssh_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "group; use nossh; append members {username}; commit;"' + unblock_ssh_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "group; use nossh; removefrom members {username}; commit;"' + + if action == 'lock': + block_ssh = popen(block_ssh_cmd).read().rstrip() + elif action == 'unlock': + unblock_ssh = popen(unblock_ssh_cmd).read().rstrip() + + msg["success"] = True + logger.info(f"User {username} is added to nossh group") + + except Exception: + msg["success"] = False + msg["errmsg"] = "Exception raised, while blocking user's ssh access, check the logs for stack trace" + logger.error("", exc_info=True) + + # send response to callback queue with it's correlation ID + rc_rmq.publish_msg( + { + "routing_key": f'acctmgr.done.{queuename}', + "msg": msg + } + ) + + logger.debug(f"User {username} confirmation sent for {action}ing {task}") + + ch.basic_ack(delivery_tag=method.delivery_tag) + + +logger.info(f"Start listening to queue: {task}") +rc_rmq.bind_queue(queue=task, routing_key='ssh.*', durable=True) + +rc_rmq.start_consume( + {"queue": task, "cb": ssh_access} +) + +logger.info("Disconnected") +rc_rmq.disconnect()