Skip to content
Snippets Groups Projects
Commit 384498db authored by Bo-Chun Chen's avatar Bo-Chun Chen
Browse files

Merge branch 'feat-cod-rmq' into fix-linting

parents d0f83148 3fda80bd
No related branches found
No related tags found
1 merge request!131WIP: Fix linting
......@@ -14,7 +14,7 @@ parser = argparse.ArgumentParser(description = "Account management driver script
parser.add_argument(
"username", help="Username that should be locked/unlocked")
parser.add_argument(
"state", help="Choose from states (ok,block,certify) to put the user in")
"state", choices=['ok', 'hold', 'certification', 'pre_certification'], help="Choose from states (ok,hold,certification,pre_certification)")
parser.add_argument(
"-s", "--service", nargs='+', default='all', choices=['ssh', 'newjobs', 'expiration', 'all'], help="List one or more services to be blocked (default: %(default)s)")
parser.add_argument(
......@@ -40,6 +40,7 @@ msg["username"] = username
msg["state"] = state
msg["service"] = service
msg["queuename"] = queuename
msg["updated_by"], msg["host"] = rc_util.get_caller_info()
# publish msg with acctmgr.{uname} routing key.
rc_rmq.publish_msg(
......
......@@ -25,6 +25,7 @@ args = parser.parse_args()
timeout = 60
queuename = rc_util.encode_name(args.username)
updated_by, host = rc_util.get_caller_info()
if args.email == "":
args.email = args.username
......@@ -58,6 +59,8 @@ rc_util.add_account(
email=args.email,
full=args.full_name,
reason=args.reason,
updated_by=updated_by,
host=host,
)
print(f"Account for {args.username} requested.")
......
#!/usr/bin/env python3
import argparse
import dataset
import sys
import subprocess
import rabbit_config as rcfg
import rc_util
from datetime import datetime
parser = argparse.ArgumentParser()
parser.add_argument("-f", "--force", action="store_true", help="force update")
parser.add_argument(
"--dry-run", action="store_true", help="enable dry run mode"
)
args = parser.parse_args()
default_state = "ok"
today = datetime.now()
updated_by, host = rc_util.get_caller_info()
# Chunk size for insert into db
size = 1000
db = dataset.connect(f"sqlite:///{rcfg.db_path}/user_reg.db")
table = db["user_state"]
if table.__len__() > 0 and not args.force:
print("table user_state not empty, abort.")
sys.exit()
# Getting user list
users = subprocess.run(
["ls", "/data/user"], stdout=subprocess.PIPE, encoding="UTF-8"
).stdout.split()
# Update user_state table
# Insert many
if len(users) > 50:
start = 0
while start < len(users):
end = start + size if start + size < len(users) else len(users)
data = [
dict(
username=user,
state=default_state,
date=today,
updated_by=updated_by,
host=host,
)
for user in users[start:end]
]
if args.dry_run:
print(f"Table insert many from {start} to {end - 1}")
else:
table.insert_many(data, chunk_size=size)
start = end
# Insert one by one
else:
for user in users:
if args.dry_run:
print(f"Table insert user: {user}, state: {default_state}")
else:
table.insert(
{
"username": user,
"state": default_state,
"date": today,
"updated_by": updated_by,
"host": host,
}
)
......@@ -25,62 +25,69 @@ def manage_acct(ch, method, properties, body):
service = msg["service"]
queuename = msg["queuename"]
if username in tracking:
current = tracking[username]
else:
current = tracking[username] = {}
if op == 'request':
if state == 'blocked' or state == 'certification':
msg["action"] = "lock"
elif state == 'ok':
msg["action"] = "unlock"
try:
if username in tracking:
current = tracking[username]
else:
print("Invalid state provided. Check the help menu.")
if service == 'all':
current["new_jobs"] = None
current["expire_account"] = None
# send a broadcast message to all agents
rc_rmq.publish_msg(
{
"routing_key": f"{msg['action']}.{queuename}",
"msg": msg,
}
)
else:
for each_service in service:
current[each_service] = None
current = tracking[username] = {}
if op == 'request':
if state == 'hold' or state == 'certification':
msg["action"] = "lock"
elif state == 'ok' or state == 'pre_certification':
msg["action"] = "unlock"
else:
print("Invalid state provided. Check the help menu.")
if service == 'all':
current["new_jobs"] = None
current["expire_account"] = None
# send a broadcast message to all agents
rc_rmq.publish_msg(
{
"routing_key": f"{each_service}.{queuename}",
"msg": msg
"routing_key": f"{msg['action']}.{queuename}",
"msg": msg,
}
)
else:
for each_service in service:
current[each_service] = None
rc_rmq.publish_msg(
{
"routing_key": f"{each_service}.{queuename}",
"msg": msg
}
)
elif op == 'done':
# Check if each task/agent returned success
current[msg["task"]] = msg["success"]
done = True
for task in current.keys():
if current[task] is None:
done = False
if done:
rc_util.update_state(
username, state, msg.get("updated_by"), msg.get("host")
)
# Send done msg to account_manager.py
rc_rmq.publish_msg(
{
"routing_key": f'certified.{queuename}',
"msg": msg,
}
)
elif op == 'done':
# Check if each task/agent returned success
current[msg["task"]] = msg["success"]
done = True
for task in current.keys():
if current[task] is None:
done = False
if done:
rc_util.update_state(username, state)
# Send done msg to account_manager.py
rc_rmq.publish_msg(
{
"routing_key": f'certified.{queuename}',
"msg": msg,
}
)
ch.basic_ack(delivery_tag=method.delivery_tag)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception:
msg["errmsg"] = "Exception raised in account manager workflow agent , check the logs for stack trace"
logger.error("", exc_info=True)
rc_rmq.bind_queue(queue=task, routing_key='acctmgr.request.*', durable=True)
rc_rmq.bind_queue(queue=task, routing_key='acctmgr.done.*', durable=True)
......
......@@ -30,10 +30,16 @@ def create_account(msg):
msg["success"] = False
# Bright command to create user
cmd = "/cm/local/apps/cmd/bin/cmsh -c "
cmd += f'"user; add {username}; set id {uid}; set email {email};'
cmd += f'set commonname \\"{fullname}\\"; '
cmd += 'commit;"'
if str(rcfg.bright_cm_version).split(".")[0] == "8":
cmd = "/cm/local/apps/cmd/bin/cmsh -c "
cmd += f'"user; add {username}; set userid {uid}; set email {email};'
cmd += f'set commonname \\"{fullname}\\"; '
cmd += 'commit;"'
else:
cmd = "/cm/local/apps/cmd/bin/cmsh -c "
cmd += f'"user; add {username}; set id {uid}; set email {email};'
cmd += f'set commonname \\"{fullname}\\"; '
cmd += 'commit;"'
if not args.dry_run:
run(shlex.split(cmd))
......
......@@ -277,7 +277,9 @@ def task_manager(ch, method, properties, body):
update_db(username, {"reported": True})
rc_util.update_state(username, "ok")
rc_util.update_state(
username, "ok", msg.get("updated_by"), msg.get("host")
)
tracking.pop(username)
......
......@@ -22,6 +22,8 @@ rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"})
def user_state(ch, method, properties, body):
msg = json.loads(body)
username = msg["username"]
updated_by = msg.get("updated_by")
host = msg.get("host")
op = msg["op"]
msg["success"] = False
errmsg = ""
......@@ -47,7 +49,13 @@ def user_state(ch, method, properties, body):
state = msg["state"]
errmsg = "Updating state of {username} to {state}"
table.insert(
{"username": username, "state": state, "date": datetime.now()}
{
"username": username,
"state": state,
"date": datetime.now(),
"updated_by": updated_by,
"host": host,
}
)
logger.debug(f"User {username} state updates to {state}")
......
......@@ -5,6 +5,7 @@ import signal
import logging
import argparse
import pika
import pwd
import uuid
from rc_rmq import RCRMQ
import json
......@@ -48,7 +49,9 @@ def timeout(seconds=30, error_message=os.strerror(errno.ETIME)):
return decorator
def add_account(username, queuename, email, full="", reason=""):
def add_account(
username, queuename, email, full="", reason="", updated_by="", host=""
):
rc_rmq.publish_msg(
{
"routing_key": "request." + queuename,
......@@ -58,12 +61,17 @@ def add_account(username, queuename, email, full="", reason=""):
"fullname": full,
"reason": reason,
"queuename": queuename,
"updated_by": updated_by,
"host": host,
},
}
)
rc_rmq.disconnect()
def certify_account(username, queuename, state="ok", service="all"):
def certify_account(
username, queuename, state="ok", service="all", updated_by="", host=""
):
rc_rmq.publish_msg(
{
"routing_key": "acctmgr.request." + queuename,
......@@ -72,11 +80,14 @@ def certify_account(username, queuename, state="ok", service="all"):
"service": service,
"state": state,
"queuename": queuename,
"updated_by": updated_by,
"host": host,
},
}
)
rc_rmq.disconnect()
def worker(ch, method, properties, body):
msg = json.loads(body)
username = msg["username"]
......@@ -211,7 +222,7 @@ def check_state(username, debug=False):
@timeout(rcfg.Function_timeout)
def update_state(username, state, debug=False):
def update_state(username, state, updated_by="", host="", debug=False):
if state not in rcfg.Valid_state:
print(f"Invalid state '{state}'")
......@@ -246,7 +257,13 @@ def update_state(username, state, debug=False):
"props": pika.BasicProperties(
reply_to=callback_queue, correlation_id=corr_id
),
"msg": {"op": "post", "username": username, "state": state},
"msg": {
"op": "post",
"username": username,
"state": state,
"updated_by": updated_by,
"host": host,
},
}
)
......@@ -260,3 +277,9 @@ def update_state(username, state, debug=False):
)
return result
def get_caller_info():
username = pwd.getpwuid(os.getuid()).pw_name
hostname = os.uname().nodename
return (username, hostname)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment