diff --git a/account_manager.py b/account_manager.py index 98e7a3b89088fc6047c4389e45a02ee476a5632c..3e9632d10fe86006ef1ae380fb30c10e2105473e 100755 --- a/account_manager.py +++ b/account_manager.py @@ -40,6 +40,7 @@ msg["username"] = username msg["state"] = state msg["service"] = service msg["queuename"] = queuename +msg["updated_by"], msg["host"] = rc_util.get_caller_info() # publish msg with acctmgr.{uname} routing key. rc_rmq.publish_msg( diff --git a/create_account.py b/create_account.py index d5ae3a2a0e0ddcfef1d30e713fd905cfd486847d..42668c36a52677d9998601266ed999bd08ba6d77 100755 --- a/create_account.py +++ b/create_account.py @@ -25,6 +25,7 @@ args = parser.parse_args() timeout = 60 queuename = rc_util.encode_name(args.username) +updated_by, host = rc_util.get_caller_info() if args.email == "": args.email = args.username @@ -58,6 +59,8 @@ rc_util.add_account( email=args.email, full=args.full_name, reason=args.reason, + updated_by=updated_by, + host=host, ) print(f"Account for {args.username} requested.") diff --git a/init_user_state.py b/init_user_state.py index c2b7628ecbeaa0db389201018147d8d8e3e8412b..54ba947560e151fa12592d7f314bb5b916a1ccb4 100644 --- a/init_user_state.py +++ b/init_user_state.py @@ -4,6 +4,7 @@ import dataset import sys import subprocess import rabbit_config as rcfg +import rc_util from datetime import datetime parser = argparse.ArgumentParser() @@ -15,6 +16,7 @@ args = parser.parse_args() default_state = "ok" today = datetime.now() +updated_by, host = rc_util.get_caller_info() # Chunk size for insert into db size = 1000 @@ -38,7 +40,13 @@ if len(users) > 50: while start < len(users): end = start + size if start + size < len(users) else len(users) data = [ - dict(username=user, state=default_state, date=today) + dict( + username=user, + state=default_state, + date=today, + updated_by=updated_by, + host=host, + ) for user in users[start:end] ] if args.dry_run: @@ -54,5 +62,11 @@ else: print(f"Table insert user: {user}, state: {default_state}") else: table.insert( - {"username": user, "state": default_state, "date": today} + { + "username": user, + "state": default_state, + "date": today, + "updated_by": updated_by, + "host": host, + } ) diff --git a/prod_rmq_agents/acct_mgmt_workflow.py b/prod_rmq_agents/acct_mgmt_workflow.py index 7b7bc0cf7cb48ff9e045a918e8f599424a1c1a7a..922d802f77f285ad7a748789463bb7a41f8143a3 100755 --- a/prod_rmq_agents/acct_mgmt_workflow.py +++ b/prod_rmq_agents/acct_mgmt_workflow.py @@ -37,7 +37,7 @@ def manage_acct(ch, method, properties, body): msg["action"] = "unlock" else: print("Invalid state provided. Check the help menu.") - + if service == 'all': current["new_jobs"] = None current["expire_account"] = None @@ -54,7 +54,7 @@ def manage_acct(ch, method, properties, body): rc_rmq.publish_msg( { "routing_key": f"{each_service}.{queuename}", - "msg": msg + "msg": msg } ) @@ -70,8 +70,10 @@ def manage_acct(ch, method, properties, body): done = False if done: - rc_util.update_state(username, state) - + rc_util.update_state( + username, state, msg.get("updated_by"), msg.get("host") + ) + # Send done msg to account_manager.py rc_rmq.publish_msg( { diff --git a/prod_rmq_agents/task_manager.py b/prod_rmq_agents/task_manager.py index f843f0cffc8cc319ddbfd983090eda75e7e176c5..3b82f838f47ed8b926bb3ecc3b506ea59bb26fdf 100644 --- a/prod_rmq_agents/task_manager.py +++ b/prod_rmq_agents/task_manager.py @@ -277,7 +277,9 @@ def task_manager(ch, method, properties, body): update_db(username, {"reported": True}) - rc_util.update_state(username, "ok") + rc_util.update_state( + username, "ok", msg.get("updated_by"), msg.get("host") + ) tracking.pop(username) diff --git a/prod_rmq_agents/user_state.py b/prod_rmq_agents/user_state.py index 53445c284c92ae035ebadcb14c837d540ca9c3a2..208af16b4b68886161c79683f92ad77ab01df8e3 100644 --- a/prod_rmq_agents/user_state.py +++ b/prod_rmq_agents/user_state.py @@ -22,6 +22,8 @@ rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"}) def user_state(ch, method, properties, body): msg = json.loads(body) username = msg["username"] + updated_by = msg.get("updated_by") + host = msg.get("host") op = msg["op"] msg["success"] = False errmsg = "" @@ -47,7 +49,13 @@ def user_state(ch, method, properties, body): state = msg["state"] errmsg = "Updating state of {username} to {state}" table.insert( - {"username": username, "state": state, "date": datetime.now()} + { + "username": username, + "state": state, + "date": datetime.now(), + "updated_by": updated_by, + "host": host, + } ) logger.debug(f"User {username} state updates to {state}") diff --git a/rc_util.py b/rc_util.py index 9becf2f93878e982d3cb89ff2ccb355a8a28e8c3..89f84e9169ad95fee359906673b6c473cd4c894f 100644 --- a/rc_util.py +++ b/rc_util.py @@ -5,6 +5,7 @@ import signal import logging import argparse import pika +import pwd import uuid from rc_rmq import RCRMQ import json @@ -48,7 +49,9 @@ def timeout(seconds=30, error_message=os.strerror(errno.ETIME)): return decorator -def add_account(username, queuename, email, full="", reason=""): +def add_account( + username, queuename, email, full="", reason="", updated_by="", host="" +): rc_rmq.publish_msg( { "routing_key": "request." + queuename, @@ -58,12 +61,17 @@ def add_account(username, queuename, email, full="", reason=""): "fullname": full, "reason": reason, "queuename": queuename, + "updated_by": updated_by, + "host": host, }, } ) rc_rmq.disconnect() -def certify_account(username, queuename, state="ok", service="all"): + +def certify_account( + username, queuename, state="ok", service="all", updated_by="", host="" +): rc_rmq.publish_msg( { "routing_key": "acctmgr.request." + queuename, @@ -72,11 +80,14 @@ def certify_account(username, queuename, state="ok", service="all"): "service": service, "state": state, "queuename": queuename, + "updated_by": updated_by, + "host": host, }, } ) rc_rmq.disconnect() + def worker(ch, method, properties, body): msg = json.loads(body) username = msg["username"] @@ -211,7 +222,7 @@ def check_state(username, debug=False): @timeout(rcfg.Function_timeout) -def update_state(username, state, debug=False): +def update_state(username, state, updated_by="", host="", debug=False): if state not in rcfg.Valid_state: print(f"Invalid state '{state}'") @@ -246,7 +257,13 @@ def update_state(username, state, debug=False): "props": pika.BasicProperties( reply_to=callback_queue, correlation_id=corr_id ), - "msg": {"op": "post", "username": username, "state": state}, + "msg": { + "op": "post", + "username": username, + "state": state, + "updated_by": updated_by, + "host": host, + }, } ) @@ -260,3 +277,9 @@ def update_state(username, state, debug=False): ) return result + + +def get_caller_info(): + username = pwd.getpwuid(os.getuid()).pw_name + hostname = os.uname().nodename + return (username, hostname)