Skip to content
Snippets Groups Projects
Unverified Commit fe0dea03 authored by Ravi Tripathi's avatar Ravi Tripathi Committed by GitHub
Browse files

Merge pull request #119 from eesaanatluri/feat-account-management

Feat account management
parents 79793636 5dcf31a9
No related branches found
No related tags found
1 merge request!147Merge previous default branch feat-cod-rmq into main
#!/usr/bin/env python3
import json
import rc_util
import argparse
import signal
import uuid
import pika
import rc_util
from rc_rmq import RCRMQ
import rabbit_config as rcfg
import time
parser = argparse.ArgumentParser(description = "Account management driver script")
parser.add_argument(
"username", help="Username that should be locked/unlocked")
parser.add_argument(
"state", help="Choose from states (ok,block,certify) to put the user in")
parser.add_argument(
"-s", "--service", nargs='+', default='all', choices=['ssh', 'newjobs', 'expiration', 'all'], help="List one or more services to be blocked (default: %(default)s)")
parser.add_argument(
"-v", "--verbose", action="store_true", help="verbose output")
parser.add_argument(
"-n", "--dry-run", action="store_true", help="enable dry run mode"
)
args = parser.parse_args()
timeout = 60
queuename = rc_util.encode_name(args.username)
username = args.username
state = args.state
service = args.service
# Instantiate rabbitmq object
rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"})
msg = {}
msg["username"] = username
msg["state"] = state
msg["service"] = service
msg["queuename"] = queuename
# publish msg with acctmgr.{uname} routing key.
rc_rmq.publish_msg(
{
"routing_key": f'acctmgr.request.{queuename}',
"msg": msg,
}
)
def timeout_handler(signum, frame):
print("Process timeout, there's some issue with agents")
rc_rmq.stop_consume()
def callback(ch, method, properties, body):
msg = json.loads(body)
username = msg["username"]
if msg["success"]:
print(f"Account for {username} has been {msg['action']}ed.\n Updating the user state in DB")
else:
print(f"There's some issue in account management agents for {username}")
errmsg = msg.get("errmsg", [])
for err in errmsg:
print(err)
ch.basic_ack(delivery_tag=method.delivery_tag)
rc_rmq.stop_consume()
rc_rmq.delete_queue(queuename)
print(f"Request {username} account state set to {state}.")
# Set initial timeout timer
signal.signal(signal.SIGALRM, timeout_handler)
signal.setitimer(signal.ITIMER_REAL, timeout)
print("Waiting for completion...")
rc_rmq.start_consume(
{
"queue": queuename,
"routing_key": f'certified.{queuename}',
"cb": callback,
}
)
#!/usr/bin/env python3
import json
import rc_util
import argparse
import signal
import uuid
import pika
import rc_util
from rc_rmq import RCRMQ
import rabbit_config as rcfg
import time
task = "acctmgr"
# Instantiate rabbitmq object
rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"})
tracking = {}
def manage_acct(ch, method, properties, body):
msg = json.loads(body)
op = method.routing_key.split(".")[1]
username = msg["username"]
state = msg["state"]
service = msg["service"]
queuename = msg["queuename"]
if username in tracking:
current = tracking[username]
else:
current = tracking[username] = {}
if op == 'request':
if state == 'blocked' or state == 'certification':
msg["action"] = "lock"
elif state == 'ok':
msg["action"] = "unlock"
else:
print("Invalid state provided. Check the help menu.")
if service == 'all':
current["new_jobs"] = None
current["expire_account"] = None
# send a broadcast message to all agents
rc_rmq.publish_msg(
{
"routing_key": f"{msg['action']}.{queuename}",
"msg": msg,
}
)
else:
for each_service in service:
current[each_service] = None
rc_rmq.publish_msg(
{
"routing_key": f"{each_service}.{queuename}",
"msg": msg
}
)
elif op == 'done':
# Check if each task/agent returned success
current[msg["task"]] = msg["success"]
done = True
for task in current.keys():
if current[task] is None:
done = False
if done:
rc_util.update_state(username, state)
# Send done msg to account_manager.py
rc_rmq.publish_msg(
{
"routing_key": f'certified.{queuename}',
"msg": msg,
}
)
ch.basic_ack(delivery_tag=method.delivery_tag)
rc_rmq.bind_queue(queue=task, routing_key='acctmgr.request.*', durable=True)
rc_rmq.bind_queue(queue=task, routing_key='acctmgr.done.*', durable=True)
print("Waiting for completion...")
rc_rmq.start_consume(
{"queue": task, "cb": manage_acct}
)
#!/usr/bin/env python
import os
import json
import pika
import rc_util
from os import popen
from pathlib import Path
from rc_rmq import RCRMQ
import rabbit_config as rcfg
from datetime import date, timedelta
task = "expire_account"
args = rc_util.get_args()
logger = rc_util.get_logger(args)
# Instantiate rabbitmq object
rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"})
def expire_account(ch, method, properties, body):
msg = json.loads(body)
username = msg["username"]
action = msg["action"]
msg["task"] = task
queuename = msg["queuename"]
yesterday = date.today() - timedelta(days=1)
try:
expire_account_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "user;use {username}; set expirationdate {yesterday}; commit;"'
unexpire_account_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "user;use {username}; set expirationdate 2037/12/31; commit;"'
if action == 'lock':
block_ssh = popen(expire_account_cmd).read().rstrip()
elif action == 'unlock':
unblock_ssh = popen(unexpire_account_cmd).read().rstrip()
msg["success"] = True
logger.info(f"ssh expiration set to yesterday for user {username}")
except Exception:
msg["success"] = False
msg["errmsg"] = "Exception raised, while expiring user's ssh access, check the logs for stack trace"
logger.error("", exc_info=True)
# send response to callback queue with it's correlation ID
rc_rmq.publish_msg(
{"routing_key": f'acctmgr.done.{queuename}',
"msg": msg}
)
logger.debug(f"User {username} confirmation sent for {action}ing {task}")
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.info(f"Start listening to queue: {task}")
rc_rmq.bind_queue(queue=task, routing_key='lock.*', durable=True)
rc_rmq.bind_queue(queue=task, routing_key='unlock.*', durable=True)
rc_rmq.bind_queue(queue=task, routing_key='expiration.*', durable=True)
rc_rmq.start_consume(
{"queue": task, "cb": expire_account}
)
logger.info("Disconnected")
rc_rmq.disconnect()
#!/usr/bin/env python
import os
import json
import pika
import rc_util
from os import popen
from pathlib import Path
from rc_rmq import RCRMQ
import rabbit_config as rcfg
task = "new_jobs"
args = rc_util.get_args()
logger = rc_util.get_logger(args)
# Instantiate rabbitmq object
rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"})
def new_jobs(ch, method, properties, body):
msg = json.loads(body)
username = msg["username"]
action = msg["action"]
msg["task"] = task
queuename = msg["queuename"]
try:
block_new_jobs_cmd = f"/cm/shared/apps/slurm/19.05.5/bin/sacctmgr --immediate update user {username} set maxjobs=0"
unblock_new_jobs_cmd = f"/cm/shared/apps/slurm/19.05.5/bin/sacctmgr --immediate update user {username} set maxjobs=-1"
if action == 'lock':
block_new_jobs = popen(block_new_jobs_cmd).read().rstrip()
elif action == 'unlock':
unblock_new_jobs = popen(unblock_new_jobs_cmd).read().rstrip()
msg["success"] = True
logger.info(f"Succeeded in blocking {username}'s jobs getting to run state")
except Exception:
msg["success"] = False
msg["errmsg"] = "Exception raised while setting maxjobs that can enter run state, check the logs for stack trace"
logger.error("", exc_info=True)
rc_rmq.publish_msg(
{"routing_key": f'acctmgr.done.{queuename}',
"msg": msg}
)
logger.debug(f"User {username} confirmation sent for {action}ing {task}")
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.info(f"Start listening to queue: {task}")
rc_rmq.bind_queue(queue=task, routing_key='lock.*', durable=True)
rc_rmq.bind_queue(queue=task, routing_key='unlock.*', durable=True)
rc_rmq.bind_queue(queue=task, routing_key='newjobs.*', durable=True)
rc_rmq.start_consume(
{"queue": task, "cb": new_jobs}
)
logger.info("Disconnected")
rc_rmq.disconnect()
#!/usr/bin/env python
import os
import json
import pika
import rc_util
from os import popen
from pathlib import Path
from rc_rmq import RCRMQ
import rabbit_config as rcfg
task = "ssh_access"
args = rc_util.get_args()
logger = rc_util.get_logger(args)
# Instantiate rabbitmq object
rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"})
def ssh_access(ch, method, properties, body):
msg = json.loads(body)
username = msg["username"]
action = msg["action"]
msg["task"] = task
corr_id = properties.correlation_id
reply_to = properties.reply_to
try:
block_ssh_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "group; use nossh; append members {username}; commit;"'
unblock_ssh_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "group; use nossh; removefrom members {username}; commit;"'
if action == 'lock':
block_ssh = popen(block_ssh_cmd).read().rstrip()
elif action == 'unlock':
unblock_ssh = popen(unblock_ssh_cmd).read().rstrip()
msg["success"] = True
logger.info(f"User {username} is added to nossh group")
except Exception:
msg["success"] = False
msg["errmsg"] = "Exception raised, while blocking user's ssh access, check the logs for stack trace"
logger.error("", exc_info=True)
# send response to callback queue with it's correlation ID
rc_rmq.publish_msg(
{
"routing_key": f'acctmgr.done.{queuename}',
"msg": msg
}
)
logger.debug(f"User {username} confirmation sent for {action}ing {task}")
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.info(f"Start listening to queue: {task}")
rc_rmq.bind_queue(queue=task, routing_key='ssh.*', durable=True)
rc_rmq.start_consume(
{"queue": task, "cb": ssh_access}
)
logger.info("Disconnected")
rc_rmq.disconnect()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment