diff --git a/prod_rmq_agents/examples/delete_user.sh b/prod_rmq_agents/examples/delete_user.sh index 28459c20d0c73ec76016d9a182423174aa6b8b72..2b2a1d98fe8d292a14dce280734588fc2fd13a8b 100755 --- a/prod_rmq_agents/examples/delete_user.sh +++ b/prod_rmq_agents/examples/delete_user.sh @@ -26,6 +26,9 @@ if id "$username" &>/dev/null; then echo "sqlite3 $path_to_db \"delete from users where username=\"$username\"" sqlite3 $path_to_db "delete from users where username=\"$username\"" + echo "sqlite3 $path_to_db \"delete from user_state where username=\"$username\"" + sqlite3 $path_to_db "delete from user_state where username=\"$username\"" + echo "rm -r /data/user/${username}" rm -rf "/data/user/${username}" diff --git a/prod_rmq_agents/task_manager.py b/prod_rmq_agents/task_manager.py index e2e3edc5a746e13c319df8a7f40e4a9bc539a930..f843f0cffc8cc319ddbfd983090eda75e7e176c5 100644 --- a/prod_rmq_agents/task_manager.py +++ b/prod_rmq_agents/task_manager.py @@ -277,6 +277,8 @@ def task_manager(ch, method, properties, body): update_db(username, {"reported": True}) + rc_util.update_state(username, "ok") + tracking.pop(username) logger.debug("Admin report sent") diff --git a/rabbit_config.py.example b/rabbit_config.py.example index 22be7250dbceb9614e18965f15ddcf05c26a57fd..1445130c68672f7dec00f8644e62df407e3242c1 100644 --- a/rabbit_config.py.example +++ b/rabbit_config.py.example @@ -7,6 +7,9 @@ Port = 5672 Valid_state = ["ok", "blocked", "certification"] +# Default function timeout +Function_timeout = 30 + # time delay to let account creation finish # to avoid concurrency with downstream agents Delay = 5 diff --git a/rc_util.py b/rc_util.py index 5bd7627a92cff4734b073f8d18621f0e140efee1..edf72d3e5d90bdf8f3e4529c02bbe940d682687a 100644 --- a/rc_util.py +++ b/rc_util.py @@ -1,3 +1,7 @@ +import errno +import functools +import os +import signal import logging import argparse import pika @@ -19,6 +23,31 @@ tasks = { logger_fmt = "%(asctime)s [%(module)s] - %(message)s" +class TimeoutError(Exception): + pass + + +# From https://stackoverflow.com/questions/2281850 +def timeout(seconds=30, error_message=os.strerror(errno.ETIME)): + def decorator(func): + def _handle_timeout(signum, frame): + raise TimeoutError(error_message) + + @functools.wraps(func) + def wrapper(*args, **kwargs): + signal.signal(signal.SIGALRM, _handle_timeout) + signal.alarm(seconds) + try: + result = func(*args, **kwargs) + finally: + signal.alarm(0) + return result + + return wrapper + + return decorator + + def add_account(username, queuename, email, full="", reason=""): rc_rmq.publish_msg( { @@ -114,6 +143,7 @@ def encode_name(uname): return uname_quote +@timeout(rcfg.Function_timeout) def check_state(username, debug=False): corr_id = str(uuid.uuid4()) result = "" @@ -165,14 +195,15 @@ def check_state(username, debug=False): return result +@timeout(rcfg.Function_timeout) def update_state(username, state, debug=False): if state not in rcfg.Valid_state: print(f"Invalid state '{state}'") - return + return False corr_id = str(uuid.uuid4()) - + result = "" rpc_queue = "user_state" def handler(ch, method, properties, body): @@ -181,12 +212,14 @@ def update_state(username, state, debug=False): print(body) nonlocal corr_id + nonlocal result msg = json.loads(body) if corr_id == properties.correlation_id: if not msg["success"]: print("Something's wrong, please try again.") + result = msg["success"] rc_rmq.stop_consume() rc_rmq.disconnect() @@ -210,3 +243,5 @@ def update_state(username, state, debug=False): "cb": handler, } ) + + return result