From 933285e5d147fdb1f3bee1367a4724516f83b235 Mon Sep 17 00:00:00 2001
From: diedpigs <louistw@uab.edu>
Date: Thu, 7 Apr 2022 17:41:06 -0500
Subject: [PATCH] User state agent (#112)

* Remove some variables from the RCRMQ class

The previous design was base on these assumptions:
  RCRMQ instance only need to do either publish or consume
  RCRMQ instance only need to interact with one single queue

However, in order to perform RPC-like call, an instance will need to do both
publish and consume. In addition, publish and consume are not necessary
to the same queue. So, save queue name as single variable inside the
instance is not viable anymore.

* Add exclusive queue support

* Check connection before bind queue

* Add auto-generate queue support

* Make bind queue optional when start consume

* Add properties support when publish msg

* Add user_state agent

* Add check_state in rc_util

* Add update_state in rc_util

* Add no-account when no hit in db

* Move valid state into config file

* Add user auth example

* Add migration flag

During the first phase, e.g. not all of users are in the user_state DB,
  using alternative method, getnet passwd, to see if user is an existing
  user.

* Update user state when using alternative lookup

* Allow no reply-to message to the user_state

* Add path for venv packages

* Fix typo

* Remove sys path to venv

* Fix condition for migration phase code

* Drop output from subprocess

* Move user_auth to top level

* Add default state as variable
---
 prod_rmq_agents/user_state.py |  75 +++++++++++++++++++++++++
 rabbit_config.py.example      |   2 +
 rc_util.py                    | 101 ++++++++++++++++++++++++++++++++++
 user_auth.py                  |  26 +++++++++
 4 files changed, 204 insertions(+)
 create mode 100644 prod_rmq_agents/user_state.py
 create mode 100644 user_auth.py

diff --git a/prod_rmq_agents/user_state.py b/prod_rmq_agents/user_state.py
new file mode 100644
index 0000000..53445c2
--- /dev/null
+++ b/prod_rmq_agents/user_state.py
@@ -0,0 +1,75 @@
+#!/usr/bin/env python
+import json
+import rc_util
+import dataset
+import pika
+from rc_rmq import RCRMQ
+from datetime import datetime
+import rabbit_config as rcfg
+
+task = "user_state"
+
+args = rc_util.get_args()
+logger = rc_util.get_logger(args)
+
+db = dataset.connect(f"sqlite:///{rcfg.db_path}/user_reg.db")
+table = db["user_state"]
+
+# Instantiate rabbitmq object
+rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"})
+
+
+def user_state(ch, method, properties, body):
+    msg = json.loads(body)
+    username = msg["username"]
+    op = msg["op"]
+    msg["success"] = False
+    errmsg = ""
+
+    corr_id = properties.correlation_id
+    reply_to = properties.reply_to
+
+    try:
+
+        if op == "get":
+            errmsg = "Getting latest state of {username}"
+            record = table.find_one(username=username, order_by="-date")
+
+            if record:
+                msg["state"] = record["state"]
+                logger.debug(
+                    f'The latest state of {username} is {msg["state"]}'
+                )
+            else:
+                msg["state"] = "no-account"
+
+        elif op == "post":
+            state = msg["state"]
+            errmsg = "Updating state of {username} to {state}"
+            table.insert(
+                {"username": username, "state": state, "date": datetime.now()}
+            )
+            logger.debug(f"User {username} state updates to {state}")
+
+        msg["success"] = True
+    except Exception:
+        logger.error("", exc_info=True)
+        msg["errmsg"] = errmsg if errmsg else "Unexpected error"
+
+    # Send response
+    if reply_to:
+        props = pika.BasicProperties(correlation_id=corr_id)
+        rc_rmq.publish_msg(
+            {"routing_key": reply_to, "msg": msg, "props": props}
+        )
+
+    # Acknowledge the message
+    ch.basic_ack(delivery_tag=method.delivery_tag)
+
+
+if __name__ == "__main__":
+    logger.info(f"Start listening to queue: {task}")
+    rc_rmq.start_consume({"queue": task, "cb": user_state})
+
+    logger.info("Disconnected")
+    rc_rmq.disconnect()
diff --git a/rabbit_config.py.example b/rabbit_config.py.example
index bffaaab..22be725 100644
--- a/rabbit_config.py.example
+++ b/rabbit_config.py.example
@@ -5,6 +5,8 @@ VHost = '/'
 Server = 'ohpc'
 Port = 5672
 
+Valid_state = ["ok", "blocked", "certification"]
+
 # time delay to let account creation finish
 # to avoid concurrency with downstream agents
 Delay = 5
diff --git a/rc_util.py b/rc_util.py
index 0741cd1..c266a45 100644
--- a/rc_util.py
+++ b/rc_util.py
@@ -1,9 +1,12 @@
 import logging
 import argparse
+import pika
+import uuid
 from rc_rmq import RCRMQ
 import json
 from urllib.parse import quote
 from time import sleep
+import rabbit_config as rcfg
 
 rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"})
 tasks = {
@@ -100,3 +103,101 @@ def encode_name(uname):
     if "." in uname_quote:
         uname_quote = uname_quote.replace(".", "%2E")
     return uname_quote
+
+
+def check_state(username, debug=False):
+    corr_id = str(uuid.uuid4())
+    result = ""
+    rpc_queue = "user_state"
+
+    def handler(ch, method, properties, body):
+        if debug:
+            print("Message received:")
+            print(body)
+
+        nonlocal corr_id
+        nonlocal result
+        msg = json.loads(body)
+
+        if corr_id == properties.correlation_id:
+            if not msg["success"]:
+                print("Something's wrong, please try again.")
+            else:
+                result = msg.get("state")
+
+            rc_rmq.stop_consume()
+            rc_rmq.disconnect()
+
+    callback_queue = rc_rmq.bind_queue(exclusive=True)
+
+    if debug:
+        print(f"Checking state of user {username}")
+        print(f"Callback queue: {callback_queue}, correlation_id: {corr_id}")
+
+    rc_rmq.publish_msg(
+        {
+            "routing_key": rpc_queue,
+            "props": pika.BasicProperties(
+                correlation_id=corr_id, reply_to=callback_queue
+            ),
+            "msg": {"op": "get", "username": username},
+        }
+    )
+
+    rc_rmq.start_consume(
+        {
+            "queue": callback_queue,
+            "exclusive": True,
+            "bind": False,
+            "cb": handler,
+        }
+    )
+
+    return result
+
+
+def update_state(username, state, debug=False):
+
+    if state not in rcfg.Valid_state:
+        print(f"Invalid state '{state}'")
+        return
+
+    corr_id = str(uuid.uuid4())
+
+    rpc_queue = "user_state"
+
+    def handler(ch, method, properties, body):
+        if debug:
+            print("Message received:")
+            print(body)
+
+        nonlocal corr_id
+        msg = json.loads(body)
+
+        if corr_id == properties.correlation_id:
+            if not msg["success"]:
+                print("Something's wrong, please try again.")
+
+            rc_rmq.stop_consume()
+            rc_rmq.disconnect()
+
+    callback_queue = rc_rmq.bind_queue(exclusive=True)
+
+    rc_rmq.publish_msg(
+        {
+            "routing_key": rpc_queue,
+            "props": pika.BasicProperties(
+                reply_to=callback_queue, correlation_id=corr_id
+            ),
+            "msg": {"op": "post", "username": username, "state": state},
+        }
+    )
+
+    rc_rmq.start_consume(
+        {
+            "queue": callback_queue,
+            "exclusive": True,
+            "bind": False,
+            "cb": handler,
+        }
+    )
diff --git a/user_auth.py b/user_auth.py
new file mode 100644
index 0000000..54b8f8a
--- /dev/null
+++ b/user_auth.py
@@ -0,0 +1,26 @@
+#!/usr/bin/env python3
+import sys
+import rc_util
+import subprocess
+
+# During migration of this new script for ood
+# e.g. not all of users are in the db
+migration = True
+default_state = "ok"
+# migration = False  # uncomment after migration's done
+remote_user = sys.argv[1]
+
+result = rc_util.check_state(remote_user)
+
+if result == "ok":
+    print(remote_user)
+else:
+    if migration and result == "no-account":
+        rc = subprocess.run(
+            ["getent", "passwd", remote_user], stdout=subprocess.DEVNULL
+        ).returncode
+        if rc == 0:
+            rc_util.update_state(remote_user, default_state)
+            print(remote_user)
+            sys.exit()
+    print()
-- 
GitLab