Skip to content
Snippets Groups Projects
Commit 4cdd4ae3 authored by Bo-Chun Chen's avatar Bo-Chun Chen
Browse files
parent 742db5e1
No related tags found
No related merge requests found
...@@ -3,22 +3,27 @@ import json ...@@ -3,22 +3,27 @@ import json
import rc_util import rc_util
import argparse import argparse
import signal import signal
import uuid
import pika
import rc_util
from rc_rmq import RCRMQ from rc_rmq import RCRMQ
import rabbit_config as rcfg import rabbit_config as rcfg
import time
parser = argparse.ArgumentParser(description = "Account management driver script") parser = argparse.ArgumentParser(
parser.add_argument( description="Account management driver script"
"username", help="Username that should be locked/unlocked") )
parser.add_argument("username", help="Username that should be locked/unlocked")
parser.add_argument( parser.add_argument(
"state", help="Choose from states (ok,block,certify) to put the user in") "state", help="Choose from states (ok,block,certify) to put the user in"
)
parser.add_argument( parser.add_argument(
"-s", "--service", nargs='+', default='all', choices=['ssh', 'newjobs', 'expiration', 'all'], help="List one or more services to be blocked (default: %(default)s)") "-s",
"--service",
nargs="+",
default="all",
choices=["ssh", "newjobs", "expiration", "all"],
help="List one or more services to be blocked (default: %(default)s)",
)
parser.add_argument( parser.add_argument(
"-v", "--verbose", action="store_true", help="verbose output") "-v", "--verbose", action="store_true", help="verbose output"
)
parser.add_argument( parser.add_argument(
"-n", "--dry-run", action="store_true", help="enable dry run mode" "-n", "--dry-run", action="store_true", help="enable dry run mode"
) )
...@@ -44,7 +49,7 @@ msg["queuename"] = queuename ...@@ -44,7 +49,7 @@ msg["queuename"] = queuename
# publish msg with acctmgr.{uname} routing key. # publish msg with acctmgr.{uname} routing key.
rc_rmq.publish_msg( rc_rmq.publish_msg(
{ {
"routing_key": f'acctmgr.request.{queuename}', "routing_key": f"acctmgr.request.{queuename}",
"msg": msg, "msg": msg,
} }
) )
...@@ -60,18 +65,23 @@ def callback(ch, method, properties, body): ...@@ -60,18 +65,23 @@ def callback(ch, method, properties, body):
username = msg["username"] username = msg["username"]
if msg["success"]: if msg["success"]:
print(f"Account for {username} has been {msg['action']}ed.\n Updating the user state in DB") print(
f"Account for {username} has been {msg['action']}ed.\n Updating"
" the user state in DB"
)
else: else:
print(f"There's some issue in account management agents for {username}") print(
f"There's some issue in account management agents for {username}"
)
errmsg = msg.get("errmsg", []) errmsg = msg.get("errmsg", [])
for err in errmsg: for err in errmsg:
print(err) print(err)
ch.basic_ack(delivery_tag=method.delivery_tag) ch.basic_ack(delivery_tag=method.delivery_tag)
rc_rmq.stop_consume() rc_rmq.stop_consume()
rc_rmq.delete_queue(queuename) rc_rmq.delete_queue(queuename)
print(f"Request {username} account state set to {state}.") print(f"Request {username} account state set to {state}.")
# Set initial timeout timer # Set initial timeout timer
...@@ -82,7 +92,7 @@ print("Waiting for completion...") ...@@ -82,7 +92,7 @@ print("Waiting for completion...")
rc_rmq.start_consume( rc_rmq.start_consume(
{ {
"queue": queuename, "queue": queuename,
"routing_key": f'certified.{queuename}', "routing_key": f"certified.{queuename}",
"cb": callback, "cb": callback,
} }
) )
#!/usr/bin/env python3 #!/usr/bin/env python3
import json import json
import rc_util import rc_util
import argparse
import signal
import uuid
import pika
import rc_util
from rc_rmq import RCRMQ from rc_rmq import RCRMQ
import rabbit_config as rcfg import rabbit_config as rcfg
import time
task = "acctmgr" task = "acctmgr"
...@@ -17,6 +11,7 @@ rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"}) ...@@ -17,6 +11,7 @@ rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"})
tracking = {} tracking = {}
def manage_acct(ch, method, properties, body): def manage_acct(ch, method, properties, body):
msg = json.loads(body) msg = json.loads(body)
op = method.routing_key.split(".")[1] op = method.routing_key.split(".")[1]
...@@ -28,38 +23,34 @@ def manage_acct(ch, method, properties, body): ...@@ -28,38 +23,34 @@ def manage_acct(ch, method, properties, body):
if username in tracking: if username in tracking:
current = tracking[username] current = tracking[username]
else: else:
current = tracking[username] = {} current = tracking[username] = {}
if op == 'request': if op == "request":
if state == 'blocked' or state == 'certification': if state == "blocked" or state == "certification":
msg["action"] = "lock" msg["action"] = "lock"
elif state == 'ok': elif state == "ok":
msg["action"] = "unlock" msg["action"] = "unlock"
else: else:
print("Invalid state provided. Check the help menu.") print("Invalid state provided. Check the help menu.")
if service == 'all': if service == "all":
current["new_jobs"] = None current["new_jobs"] = None
current["expire_account"] = None current["expire_account"] = None
# send a broadcast message to all agents # send a broadcast message to all agents
rc_rmq.publish_msg( rc_rmq.publish_msg(
{ {
"routing_key": f"{msg['action']}.{queuename}", "routing_key": f"{msg['action']}.{queuename}",
"msg": msg, "msg": msg,
} }
) )
else: else:
for each_service in service: for each_service in service:
current[each_service] = None current[each_service] = None
rc_rmq.publish_msg( rc_rmq.publish_msg(
{ {"routing_key": f"{each_service}.{queuename}", "msg": msg}
"routing_key": f"{each_service}.{queuename}",
"msg": msg
}
) )
elif op == "done":
elif op == 'done':
# Check if each task/agent returned success # Check if each task/agent returned success
current[msg["task"]] = msg["success"] current[msg["task"]] = msg["success"]
...@@ -71,22 +62,20 @@ def manage_acct(ch, method, properties, body): ...@@ -71,22 +62,20 @@ def manage_acct(ch, method, properties, body):
if done: if done:
rc_util.update_state(username, state) rc_util.update_state(username, state)
# Send done msg to account_manager.py # Send done msg to account_manager.py
rc_rmq.publish_msg( rc_rmq.publish_msg(
{ {
"routing_key": f'certified.{queuename}', "routing_key": f"certified.{queuename}",
"msg": msg, "msg": msg,
} }
) )
ch.basic_ack(delivery_tag=method.delivery_tag) ch.basic_ack(delivery_tag=method.delivery_tag)
rc_rmq.bind_queue(queue=task, routing_key='acctmgr.request.*', durable=True)
rc_rmq.bind_queue(queue=task, routing_key='acctmgr.done.*', durable=True)
print("Waiting for completion...") rc_rmq.bind_queue(queue=task, routing_key="acctmgr.request.*", durable=True)
rc_rmq.start_consume( rc_rmq.bind_queue(queue=task, routing_key="acctmgr.done.*", durable=True)
{"queue": task, "cb": manage_acct}
)
print("Waiting for completion...")
rc_rmq.start_consume({"queue": task, "cb": manage_acct})
#!/usr/bin/env python #!/usr/bin/env python
import os
import json import json
import pika
import rc_util import rc_util
from os import popen from os import popen
from pathlib import Path
from rc_rmq import RCRMQ from rc_rmq import RCRMQ
import rabbit_config as rcfg import rabbit_config as rcfg
...@@ -25,26 +22,35 @@ def new_jobs(ch, method, properties, body): ...@@ -25,26 +22,35 @@ def new_jobs(ch, method, properties, body):
queuename = msg["queuename"] queuename = msg["queuename"]
try: try:
block_new_jobs_cmd = f"/cm/shared/apps/slurm/19.05.5/bin/sacctmgr --immediate update user {username} set maxjobs=0" block_new_jobs_cmd = (
unblock_new_jobs_cmd = f"/cm/shared/apps/slurm/19.05.5/bin/sacctmgr --immediate update user {username} set maxjobs=-1" "/cm/shared/apps/slurm/19.05.5/bin/sacctmgr --immediate update"
f" user {username} set maxjobs=0"
if action == 'lock': )
block_new_jobs = popen(block_new_jobs_cmd).read().rstrip() unblock_new_jobs_cmd = (
elif action == 'unlock': "/cm/shared/apps/slurm/19.05.5/bin/sacctmgr --immediate update"
unblock_new_jobs = popen(unblock_new_jobs_cmd).read().rstrip() f" user {username} set maxjobs=-1"
)
if action == "lock":
popen(block_new_jobs_cmd).read().rstrip()
elif action == "unlock":
popen(unblock_new_jobs_cmd).read().rstrip()
msg["success"] = True msg["success"] = True
logger.info(f"Succeeded in blocking {username}'s jobs getting to run state") logger.info(
f"Succeeded in blocking {username}'s jobs getting to run state"
)
except Exception: except Exception:
msg["success"] = False msg["success"] = False
msg["errmsg"] = "Exception raised while setting maxjobs that can enter run state, check the logs for stack trace" msg["errmsg"] = (
"Exception raised while setting maxjobs that can enter run state,"
" check the logs for stack trace"
)
logger.error("", exc_info=True) logger.error("", exc_info=True)
rc_rmq.publish_msg( rc_rmq.publish_msg(
{"routing_key": f'acctmgr.done.{queuename}', {"routing_key": f"acctmgr.done.{queuename}", "msg": msg}
"msg": msg}
) )
logger.debug(f"User {username} confirmation sent for {action}ing {task}") logger.debug(f"User {username} confirmation sent for {action}ing {task}")
...@@ -53,12 +59,10 @@ def new_jobs(ch, method, properties, body): ...@@ -53,12 +59,10 @@ def new_jobs(ch, method, properties, body):
logger.info(f"Start listening to queue: {task}") logger.info(f"Start listening to queue: {task}")
rc_rmq.bind_queue(queue=task, routing_key='lock.*', durable=True) rc_rmq.bind_queue(queue=task, routing_key="lock.*", durable=True)
rc_rmq.bind_queue(queue=task, routing_key='unlock.*', durable=True) rc_rmq.bind_queue(queue=task, routing_key="unlock.*", durable=True)
rc_rmq.bind_queue(queue=task, routing_key='newjobs.*', durable=True) rc_rmq.bind_queue(queue=task, routing_key="newjobs.*", durable=True)
rc_rmq.start_consume( rc_rmq.start_consume({"queue": task, "cb": new_jobs})
{"queue": task, "cb": new_jobs}
)
logger.info("Disconnected") logger.info("Disconnected")
rc_rmq.disconnect() rc_rmq.disconnect()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment