diff --git a/prod_rmq_agents/user_state.py b/prod_rmq_agents/user_state.py new file mode 100644 index 0000000000000000000000000000000000000000..53445c284c92ae035ebadcb14c837d540ca9c3a2 --- /dev/null +++ b/prod_rmq_agents/user_state.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python +import json +import rc_util +import dataset +import pika +from rc_rmq import RCRMQ +from datetime import datetime +import rabbit_config as rcfg + +task = "user_state" + +args = rc_util.get_args() +logger = rc_util.get_logger(args) + +db = dataset.connect(f"sqlite:///{rcfg.db_path}/user_reg.db") +table = db["user_state"] + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"}) + + +def user_state(ch, method, properties, body): + msg = json.loads(body) + username = msg["username"] + op = msg["op"] + msg["success"] = False + errmsg = "" + + corr_id = properties.correlation_id + reply_to = properties.reply_to + + try: + + if op == "get": + errmsg = "Getting latest state of {username}" + record = table.find_one(username=username, order_by="-date") + + if record: + msg["state"] = record["state"] + logger.debug( + f'The latest state of {username} is {msg["state"]}' + ) + else: + msg["state"] = "no-account" + + elif op == "post": + state = msg["state"] + errmsg = "Updating state of {username} to {state}" + table.insert( + {"username": username, "state": state, "date": datetime.now()} + ) + logger.debug(f"User {username} state updates to {state}") + + msg["success"] = True + except Exception: + logger.error("", exc_info=True) + msg["errmsg"] = errmsg if errmsg else "Unexpected error" + + # Send response + if reply_to: + props = pika.BasicProperties(correlation_id=corr_id) + rc_rmq.publish_msg( + {"routing_key": reply_to, "msg": msg, "props": props} + ) + + # Acknowledge the message + ch.basic_ack(delivery_tag=method.delivery_tag) + + +if __name__ == "__main__": + logger.info(f"Start listening to queue: {task}") + rc_rmq.start_consume({"queue": task, "cb": user_state}) + + logger.info("Disconnected") + rc_rmq.disconnect() diff --git a/rabbit_config.py.example b/rabbit_config.py.example index bffaaab2a8f1b815c5ca6886264d5c3e79236822..22be7250dbceb9614e18965f15ddcf05c26a57fd 100644 --- a/rabbit_config.py.example +++ b/rabbit_config.py.example @@ -5,6 +5,8 @@ VHost = '/' Server = 'ohpc' Port = 5672 +Valid_state = ["ok", "blocked", "certification"] + # time delay to let account creation finish # to avoid concurrency with downstream agents Delay = 5 diff --git a/rc_util.py b/rc_util.py index 0741cd18fb4f3cf91321c88fe4af0a11e6532a3d..c266a454e18846c06c7afc092223834a2473edf8 100644 --- a/rc_util.py +++ b/rc_util.py @@ -1,9 +1,12 @@ import logging import argparse +import pika +import uuid from rc_rmq import RCRMQ import json from urllib.parse import quote from time import sleep +import rabbit_config as rcfg rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) tasks = { @@ -100,3 +103,101 @@ def encode_name(uname): if "." in uname_quote: uname_quote = uname_quote.replace(".", "%2E") return uname_quote + + +def check_state(username, debug=False): + corr_id = str(uuid.uuid4()) + result = "" + rpc_queue = "user_state" + + def handler(ch, method, properties, body): + if debug: + print("Message received:") + print(body) + + nonlocal corr_id + nonlocal result + msg = json.loads(body) + + if corr_id == properties.correlation_id: + if not msg["success"]: + print("Something's wrong, please try again.") + else: + result = msg.get("state") + + rc_rmq.stop_consume() + rc_rmq.disconnect() + + callback_queue = rc_rmq.bind_queue(exclusive=True) + + if debug: + print(f"Checking state of user {username}") + print(f"Callback queue: {callback_queue}, correlation_id: {corr_id}") + + rc_rmq.publish_msg( + { + "routing_key": rpc_queue, + "props": pika.BasicProperties( + correlation_id=corr_id, reply_to=callback_queue + ), + "msg": {"op": "get", "username": username}, + } + ) + + rc_rmq.start_consume( + { + "queue": callback_queue, + "exclusive": True, + "bind": False, + "cb": handler, + } + ) + + return result + + +def update_state(username, state, debug=False): + + if state not in rcfg.Valid_state: + print(f"Invalid state '{state}'") + return + + corr_id = str(uuid.uuid4()) + + rpc_queue = "user_state" + + def handler(ch, method, properties, body): + if debug: + print("Message received:") + print(body) + + nonlocal corr_id + msg = json.loads(body) + + if corr_id == properties.correlation_id: + if not msg["success"]: + print("Something's wrong, please try again.") + + rc_rmq.stop_consume() + rc_rmq.disconnect() + + callback_queue = rc_rmq.bind_queue(exclusive=True) + + rc_rmq.publish_msg( + { + "routing_key": rpc_queue, + "props": pika.BasicProperties( + reply_to=callback_queue, correlation_id=corr_id + ), + "msg": {"op": "post", "username": username, "state": state}, + } + ) + + rc_rmq.start_consume( + { + "queue": callback_queue, + "exclusive": True, + "bind": False, + "cb": handler, + } + ) diff --git a/user_auth.py b/user_auth.py new file mode 100644 index 0000000000000000000000000000000000000000..54b8f8ad93acf25980b393377e7b7c6806ad7a3c --- /dev/null +++ b/user_auth.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python3 +import sys +import rc_util +import subprocess + +# During migration of this new script for ood +# e.g. not all of users are in the db +migration = True +default_state = "ok" +# migration = False # uncomment after migration's done +remote_user = sys.argv[1] + +result = rc_util.check_state(remote_user) + +if result == "ok": + print(remote_user) +else: + if migration and result == "no-account": + rc = subprocess.run( + ["getent", "passwd", remote_user], stdout=subprocess.DEVNULL + ).returncode + if rc == 0: + rc_util.update_state(remote_user, default_state) + print(remote_user) + sys.exit() + print()