diff --git a/account_manager.py b/account_manager.py index 98e7a3b89088fc6047c4389e45a02ee476a5632c..109a516bb9b875a18046a377d674c913975a61bd 100755 --- a/account_manager.py +++ b/account_manager.py @@ -14,7 +14,7 @@ parser = argparse.ArgumentParser(description = "Account management driver script parser.add_argument( "username", help="Username that should be locked/unlocked") parser.add_argument( - "state", help="Choose from states (ok,block,certify) to put the user in") + "state", choices=['ok', 'hold', 'certification', 'pre_certification'], help="Choose from states (ok,hold,certification,pre_certification)") parser.add_argument( "-s", "--service", nargs='+', default='all', choices=['ssh', 'newjobs', 'expiration', 'all'], help="List one or more services to be blocked (default: %(default)s)") parser.add_argument( @@ -40,6 +40,7 @@ msg["username"] = username msg["state"] = state msg["service"] = service msg["queuename"] = queuename +msg["updated_by"], msg["host"] = rc_util.get_caller_info() # publish msg with acctmgr.{uname} routing key. rc_rmq.publish_msg( diff --git a/create_account.py b/create_account.py index d5ae3a2a0e0ddcfef1d30e713fd905cfd486847d..42668c36a52677d9998601266ed999bd08ba6d77 100755 --- a/create_account.py +++ b/create_account.py @@ -25,6 +25,7 @@ args = parser.parse_args() timeout = 60 queuename = rc_util.encode_name(args.username) +updated_by, host = rc_util.get_caller_info() if args.email == "": args.email = args.username @@ -58,6 +59,8 @@ rc_util.add_account( email=args.email, full=args.full_name, reason=args.reason, + updated_by=updated_by, + host=host, ) print(f"Account for {args.username} requested.") diff --git a/init_user_state.py b/init_user_state.py new file mode 100644 index 0000000000000000000000000000000000000000..54ba947560e151fa12592d7f314bb5b916a1ccb4 --- /dev/null +++ b/init_user_state.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python3 +import argparse +import dataset +import sys +import subprocess +import rabbit_config as rcfg +import rc_util +from datetime import datetime + +parser = argparse.ArgumentParser() +parser.add_argument("-f", "--force", action="store_true", help="force update") +parser.add_argument( + "--dry-run", action="store_true", help="enable dry run mode" +) +args = parser.parse_args() + +default_state = "ok" +today = datetime.now() +updated_by, host = rc_util.get_caller_info() + +# Chunk size for insert into db +size = 1000 + +db = dataset.connect(f"sqlite:///{rcfg.db_path}/user_reg.db") +table = db["user_state"] + +if table.__len__() > 0 and not args.force: + print("table user_state not empty, abort.") + sys.exit() + +# Getting user list +users = subprocess.run( + ["ls", "/data/user"], stdout=subprocess.PIPE, encoding="UTF-8" +).stdout.split() + +# Update user_state table +# Insert many +if len(users) > 50: + start = 0 + while start < len(users): + end = start + size if start + size < len(users) else len(users) + data = [ + dict( + username=user, + state=default_state, + date=today, + updated_by=updated_by, + host=host, + ) + for user in users[start:end] + ] + if args.dry_run: + print(f"Table insert many from {start} to {end - 1}") + else: + table.insert_many(data, chunk_size=size) + start = end + +# Insert one by one +else: + for user in users: + if args.dry_run: + print(f"Table insert user: {user}, state: {default_state}") + else: + table.insert( + { + "username": user, + "state": default_state, + "date": today, + "updated_by": updated_by, + "host": host, + } + ) diff --git a/prod_rmq_agents/acct_mgmt_workflow.py b/prod_rmq_agents/acct_mgmt_workflow.py index ae4257e7f27b16263474c648c669f0b4bd2b293a..124af99d6178dc2fddd34084c0fe22aa4282e05e 100755 --- a/prod_rmq_agents/acct_mgmt_workflow.py +++ b/prod_rmq_agents/acct_mgmt_workflow.py @@ -25,62 +25,69 @@ def manage_acct(ch, method, properties, body): service = msg["service"] queuename = msg["queuename"] - if username in tracking: - current = tracking[username] - else: - current = tracking[username] = {} - - if op == 'request': - if state == 'blocked' or state == 'certification': - msg["action"] = "lock" - elif state == 'ok': - msg["action"] = "unlock" + try: + if username in tracking: + current = tracking[username] else: - print("Invalid state provided. Check the help menu.") - - if service == 'all': - current["new_jobs"] = None - current["expire_account"] = None - # send a broadcast message to all agents - rc_rmq.publish_msg( - { - "routing_key": f"{msg['action']}.{queuename}", - "msg": msg, - } - ) - else: - for each_service in service: - current[each_service] = None + current = tracking[username] = {} + + if op == 'request': + if state == 'hold' or state == 'certification': + msg["action"] = "lock" + elif state == 'ok' or state == 'pre_certification': + msg["action"] = "unlock" + else: + print("Invalid state provided. Check the help menu.") + + if service == 'all': + current["new_jobs"] = None + current["expire_account"] = None + # send a broadcast message to all agents rc_rmq.publish_msg( { - "routing_key": f"{each_service}.{queuename}", - "msg": msg + "routing_key": f"{msg['action']}.{queuename}", + "msg": msg, } ) + else: + for each_service in service: + current[each_service] = None + rc_rmq.publish_msg( + { + "routing_key": f"{each_service}.{queuename}", + "msg": msg + } + ) + + + elif op == 'done': + # Check if each task/agent returned success + current[msg["task"]] = msg["success"] + + done = True + + for task in current.keys(): + if current[task] is None: + done = False + + if done: + rc_util.update_state( + username, state, msg.get("updated_by"), msg.get("host") + ) + # Send done msg to account_manager.py + rc_rmq.publish_msg( + { + "routing_key": f'certified.{queuename}', + "msg": msg, + } + ) - elif op == 'done': - # Check if each task/agent returned success - current[msg["task"]] = msg["success"] - - done = True - - for task in current.keys(): - if current[task] is None: - done = False - - if done: - rc_util.update_state(username, state) - - # Send done msg to account_manager.py - rc_rmq.publish_msg( - { - "routing_key": f'certified.{queuename}', - "msg": msg, - } - ) + ch.basic_ack(delivery_tag=method.delivery_tag) - ch.basic_ack(delivery_tag=method.delivery_tag) + except Exception: + msg["errmsg"] = "Exception raised in account manager workflow agent , check the logs for stack trace" + logger.error("", exc_info=True) rc_rmq.bind_queue(queue=task, routing_key='acctmgr.request.*', durable=True) rc_rmq.bind_queue(queue=task, routing_key='acctmgr.done.*', durable=True) diff --git a/prod_rmq_agents/get-next-uid-gid.py b/prod_rmq_agents/get-next-uid-gid.py index 42d6cc892b138253c9e3aad2af2f0512357dce16..03cb51f783a789610698ed1ba53700c637177439 100644 --- a/prod_rmq_agents/get-next-uid-gid.py +++ b/prod_rmq_agents/get-next-uid-gid.py @@ -30,10 +30,16 @@ def create_account(msg): msg["success"] = False # Bright command to create user - cmd = "/cm/local/apps/cmd/bin/cmsh -c " - cmd += f'"user; add {username}; set id {uid}; set email {email};' - cmd += f'set commonname \\"{fullname}\\"; ' - cmd += 'commit;"' + if str(rcfg.bright_cm_version).split(".")[0] == "8": + cmd = "/cm/local/apps/cmd/bin/cmsh -c " + cmd += f'"user; add {username}; set userid {uid}; set email {email};' + cmd += f'set commonname \\"{fullname}\\"; ' + cmd += 'commit;"' + else: + cmd = "/cm/local/apps/cmd/bin/cmsh -c " + cmd += f'"user; add {username}; set id {uid}; set email {email};' + cmd += f'set commonname \\"{fullname}\\"; ' + cmd += 'commit;"' if not args.dry_run: run(shlex.split(cmd)) diff --git a/prod_rmq_agents/task_manager.py b/prod_rmq_agents/task_manager.py index f843f0cffc8cc319ddbfd983090eda75e7e176c5..3b82f838f47ed8b926bb3ecc3b506ea59bb26fdf 100644 --- a/prod_rmq_agents/task_manager.py +++ b/prod_rmq_agents/task_manager.py @@ -277,7 +277,9 @@ def task_manager(ch, method, properties, body): update_db(username, {"reported": True}) - rc_util.update_state(username, "ok") + rc_util.update_state( + username, "ok", msg.get("updated_by"), msg.get("host") + ) tracking.pop(username) diff --git a/prod_rmq_agents/user_state.py b/prod_rmq_agents/user_state.py index 53445c284c92ae035ebadcb14c837d540ca9c3a2..208af16b4b68886161c79683f92ad77ab01df8e3 100644 --- a/prod_rmq_agents/user_state.py +++ b/prod_rmq_agents/user_state.py @@ -22,6 +22,8 @@ rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"}) def user_state(ch, method, properties, body): msg = json.loads(body) username = msg["username"] + updated_by = msg.get("updated_by") + host = msg.get("host") op = msg["op"] msg["success"] = False errmsg = "" @@ -47,7 +49,13 @@ def user_state(ch, method, properties, body): state = msg["state"] errmsg = "Updating state of {username} to {state}" table.insert( - {"username": username, "state": state, "date": datetime.now()} + { + "username": username, + "state": state, + "date": datetime.now(), + "updated_by": updated_by, + "host": host, + } ) logger.debug(f"User {username} state updates to {state}") diff --git a/rc_util.py b/rc_util.py index 9becf2f93878e982d3cb89ff2ccb355a8a28e8c3..89f84e9169ad95fee359906673b6c473cd4c894f 100644 --- a/rc_util.py +++ b/rc_util.py @@ -5,6 +5,7 @@ import signal import logging import argparse import pika +import pwd import uuid from rc_rmq import RCRMQ import json @@ -48,7 +49,9 @@ def timeout(seconds=30, error_message=os.strerror(errno.ETIME)): return decorator -def add_account(username, queuename, email, full="", reason=""): +def add_account( + username, queuename, email, full="", reason="", updated_by="", host="" +): rc_rmq.publish_msg( { "routing_key": "request." + queuename, @@ -58,12 +61,17 @@ def add_account(username, queuename, email, full="", reason=""): "fullname": full, "reason": reason, "queuename": queuename, + "updated_by": updated_by, + "host": host, }, } ) rc_rmq.disconnect() -def certify_account(username, queuename, state="ok", service="all"): + +def certify_account( + username, queuename, state="ok", service="all", updated_by="", host="" +): rc_rmq.publish_msg( { "routing_key": "acctmgr.request." + queuename, @@ -72,11 +80,14 @@ def certify_account(username, queuename, state="ok", service="all"): "service": service, "state": state, "queuename": queuename, + "updated_by": updated_by, + "host": host, }, } ) rc_rmq.disconnect() + def worker(ch, method, properties, body): msg = json.loads(body) username = msg["username"] @@ -211,7 +222,7 @@ def check_state(username, debug=False): @timeout(rcfg.Function_timeout) -def update_state(username, state, debug=False): +def update_state(username, state, updated_by="", host="", debug=False): if state not in rcfg.Valid_state: print(f"Invalid state '{state}'") @@ -246,7 +257,13 @@ def update_state(username, state, debug=False): "props": pika.BasicProperties( reply_to=callback_queue, correlation_id=corr_id ), - "msg": {"op": "post", "username": username, "state": state}, + "msg": { + "op": "post", + "username": username, + "state": state, + "updated_by": updated_by, + "host": host, + }, } ) @@ -260,3 +277,9 @@ def update_state(username, state, debug=False): ) return result + + +def get_caller_info(): + username = pwd.getpwuid(os.getuid()).pw_name + hostname = os.uname().nodename + return (username, hostname)