Skip to content
Snippets Groups Projects
Unverified Commit 59e37902 authored by Bo-Chun Chen's avatar Bo-Chun Chen Committed by GitHub
Browse files

Add missing function to user state agent and tools (#114)

* Add return value to update_state

* Add timeout decorator

* Add 30 sec timeout for check and update state

* Update user state when all tasks are done

* Update delete user script

Remove user state records from db

* Add timeout to rabbit config

* User timeout variable for check and update state

* Update timeout variable name
parent 7aae9271
No related branches found
No related tags found
1 merge request!147Merge previous default branch feat-cod-rmq into main
...@@ -26,6 +26,9 @@ if id "$username" &>/dev/null; then ...@@ -26,6 +26,9 @@ if id "$username" &>/dev/null; then
echo "sqlite3 $path_to_db \"delete from users where username=\"$username\"" echo "sqlite3 $path_to_db \"delete from users where username=\"$username\""
sqlite3 $path_to_db "delete from users where username=\"$username\"" sqlite3 $path_to_db "delete from users where username=\"$username\""
echo "sqlite3 $path_to_db \"delete from user_state where username=\"$username\""
sqlite3 $path_to_db "delete from user_state where username=\"$username\""
echo "rm -r /data/user/${username}" echo "rm -r /data/user/${username}"
rm -rf "/data/user/${username}" rm -rf "/data/user/${username}"
......
...@@ -277,6 +277,8 @@ def task_manager(ch, method, properties, body): ...@@ -277,6 +277,8 @@ def task_manager(ch, method, properties, body):
update_db(username, {"reported": True}) update_db(username, {"reported": True})
rc_util.update_state(username, "ok")
tracking.pop(username) tracking.pop(username)
logger.debug("Admin report sent") logger.debug("Admin report sent")
......
...@@ -7,6 +7,9 @@ Port = 5672 ...@@ -7,6 +7,9 @@ Port = 5672
Valid_state = ["ok", "blocked", "certification"] Valid_state = ["ok", "blocked", "certification"]
# Default function timeout
Function_timeout = 30
# time delay to let account creation finish # time delay to let account creation finish
# to avoid concurrency with downstream agents # to avoid concurrency with downstream agents
Delay = 5 Delay = 5
......
import errno
import functools
import os
import signal
import logging import logging
import argparse import argparse
import pika import pika
...@@ -19,6 +23,31 @@ tasks = { ...@@ -19,6 +23,31 @@ tasks = {
logger_fmt = "%(asctime)s [%(module)s] - %(message)s" logger_fmt = "%(asctime)s [%(module)s] - %(message)s"
class TimeoutError(Exception):
pass
# From https://stackoverflow.com/questions/2281850
def timeout(seconds=30, error_message=os.strerror(errno.ETIME)):
def decorator(func):
def _handle_timeout(signum, frame):
raise TimeoutError(error_message)
@functools.wraps(func)
def wrapper(*args, **kwargs):
signal.signal(signal.SIGALRM, _handle_timeout)
signal.alarm(seconds)
try:
result = func(*args, **kwargs)
finally:
signal.alarm(0)
return result
return wrapper
return decorator
def add_account(username, queuename, email, full="", reason=""): def add_account(username, queuename, email, full="", reason=""):
rc_rmq.publish_msg( rc_rmq.publish_msg(
{ {
...@@ -114,6 +143,7 @@ def encode_name(uname): ...@@ -114,6 +143,7 @@ def encode_name(uname):
return uname_quote return uname_quote
@timeout(rcfg.Function_timeout)
def check_state(username, debug=False): def check_state(username, debug=False):
corr_id = str(uuid.uuid4()) corr_id = str(uuid.uuid4())
result = "" result = ""
...@@ -165,14 +195,15 @@ def check_state(username, debug=False): ...@@ -165,14 +195,15 @@ def check_state(username, debug=False):
return result return result
@timeout(rcfg.Function_timeout)
def update_state(username, state, debug=False): def update_state(username, state, debug=False):
if state not in rcfg.Valid_state: if state not in rcfg.Valid_state:
print(f"Invalid state '{state}'") print(f"Invalid state '{state}'")
return return False
corr_id = str(uuid.uuid4()) corr_id = str(uuid.uuid4())
result = ""
rpc_queue = "user_state" rpc_queue = "user_state"
def handler(ch, method, properties, body): def handler(ch, method, properties, body):
...@@ -181,12 +212,14 @@ def update_state(username, state, debug=False): ...@@ -181,12 +212,14 @@ def update_state(username, state, debug=False):
print(body) print(body)
nonlocal corr_id nonlocal corr_id
nonlocal result
msg = json.loads(body) msg = json.loads(body)
if corr_id == properties.correlation_id: if corr_id == properties.correlation_id:
if not msg["success"]: if not msg["success"]:
print("Something's wrong, please try again.") print("Something's wrong, please try again.")
result = msg["success"]
rc_rmq.stop_consume() rc_rmq.stop_consume()
rc_rmq.disconnect() rc_rmq.disconnect()
...@@ -210,3 +243,5 @@ def update_state(username, state, debug=False): ...@@ -210,3 +243,5 @@ def update_state(username, state, debug=False):
"cb": handler, "cb": handler,
} }
) )
return result
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment