Skip to content
Snippets Groups Projects
Unverified Commit 933285e5 authored by Bo-Chun Chen's avatar Bo-Chun Chen Committed by GitHub
Browse files

User state agent (#112)

* Remove some variables from the RCRMQ class

The previous design was base on these assumptions:
  RCRMQ instance only need to do either publish or consume
  RCRMQ instance only need to interact with one single queue

However, in order to perform RPC-like call, an instance will need to do both
publish and consume. In addition, publish and consume are not necessary
to the same queue. So, save queue name as single variable inside the
instance is not viable anymore.

* Add exclusive queue support

* Check connection before bind queue

* Add auto-generate queue support

* Make bind queue optional when start consume

* Add properties support when publish msg

* Add user_state agent

* Add check_state in rc_util

* Add update_state in rc_util

* Add no-account when no hit in db

* Move valid state into config file

* Add user auth example

* Add migration flag

During the first phase, e.g. not all of users are in the user_state DB,
  using alternative method, getnet passwd, to see if user is an existing
  user.

* Update user state when using alternative lookup

* Allow no reply-to message to the user_state

* Add path for venv packages

* Fix typo

* Remove sys path to venv

* Fix condition for migration phase code

* Drop output from subprocess

* Move user_auth to top level

* Add default state as variable
parent b8c7d6bd
No related branches found
No related tags found
1 merge request!147Merge previous default branch feat-cod-rmq into main
#!/usr/bin/env python
import json
import rc_util
import dataset
import pika
from rc_rmq import RCRMQ
from datetime import datetime
import rabbit_config as rcfg
task = "user_state"
args = rc_util.get_args()
logger = rc_util.get_logger(args)
db = dataset.connect(f"sqlite:///{rcfg.db_path}/user_reg.db")
table = db["user_state"]
# Instantiate rabbitmq object
rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"})
def user_state(ch, method, properties, body):
msg = json.loads(body)
username = msg["username"]
op = msg["op"]
msg["success"] = False
errmsg = ""
corr_id = properties.correlation_id
reply_to = properties.reply_to
try:
if op == "get":
errmsg = "Getting latest state of {username}"
record = table.find_one(username=username, order_by="-date")
if record:
msg["state"] = record["state"]
logger.debug(
f'The latest state of {username} is {msg["state"]}'
)
else:
msg["state"] = "no-account"
elif op == "post":
state = msg["state"]
errmsg = "Updating state of {username} to {state}"
table.insert(
{"username": username, "state": state, "date": datetime.now()}
)
logger.debug(f"User {username} state updates to {state}")
msg["success"] = True
except Exception:
logger.error("", exc_info=True)
msg["errmsg"] = errmsg if errmsg else "Unexpected error"
# Send response
if reply_to:
props = pika.BasicProperties(correlation_id=corr_id)
rc_rmq.publish_msg(
{"routing_key": reply_to, "msg": msg, "props": props}
)
# Acknowledge the message
ch.basic_ack(delivery_tag=method.delivery_tag)
if __name__ == "__main__":
logger.info(f"Start listening to queue: {task}")
rc_rmq.start_consume({"queue": task, "cb": user_state})
logger.info("Disconnected")
rc_rmq.disconnect()
...@@ -5,6 +5,8 @@ VHost = '/' ...@@ -5,6 +5,8 @@ VHost = '/'
Server = 'ohpc' Server = 'ohpc'
Port = 5672 Port = 5672
Valid_state = ["ok", "blocked", "certification"]
# time delay to let account creation finish # time delay to let account creation finish
# to avoid concurrency with downstream agents # to avoid concurrency with downstream agents
Delay = 5 Delay = 5
......
import logging import logging
import argparse import argparse
import pika
import uuid
from rc_rmq import RCRMQ from rc_rmq import RCRMQ
import json import json
from urllib.parse import quote from urllib.parse import quote
from time import sleep from time import sleep
import rabbit_config as rcfg
rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"})
tasks = { tasks = {
...@@ -100,3 +103,101 @@ def encode_name(uname): ...@@ -100,3 +103,101 @@ def encode_name(uname):
if "." in uname_quote: if "." in uname_quote:
uname_quote = uname_quote.replace(".", "%2E") uname_quote = uname_quote.replace(".", "%2E")
return uname_quote return uname_quote
def check_state(username, debug=False):
corr_id = str(uuid.uuid4())
result = ""
rpc_queue = "user_state"
def handler(ch, method, properties, body):
if debug:
print("Message received:")
print(body)
nonlocal corr_id
nonlocal result
msg = json.loads(body)
if corr_id == properties.correlation_id:
if not msg["success"]:
print("Something's wrong, please try again.")
else:
result = msg.get("state")
rc_rmq.stop_consume()
rc_rmq.disconnect()
callback_queue = rc_rmq.bind_queue(exclusive=True)
if debug:
print(f"Checking state of user {username}")
print(f"Callback queue: {callback_queue}, correlation_id: {corr_id}")
rc_rmq.publish_msg(
{
"routing_key": rpc_queue,
"props": pika.BasicProperties(
correlation_id=corr_id, reply_to=callback_queue
),
"msg": {"op": "get", "username": username},
}
)
rc_rmq.start_consume(
{
"queue": callback_queue,
"exclusive": True,
"bind": False,
"cb": handler,
}
)
return result
def update_state(username, state, debug=False):
if state not in rcfg.Valid_state:
print(f"Invalid state '{state}'")
return
corr_id = str(uuid.uuid4())
rpc_queue = "user_state"
def handler(ch, method, properties, body):
if debug:
print("Message received:")
print(body)
nonlocal corr_id
msg = json.loads(body)
if corr_id == properties.correlation_id:
if not msg["success"]:
print("Something's wrong, please try again.")
rc_rmq.stop_consume()
rc_rmq.disconnect()
callback_queue = rc_rmq.bind_queue(exclusive=True)
rc_rmq.publish_msg(
{
"routing_key": rpc_queue,
"props": pika.BasicProperties(
reply_to=callback_queue, correlation_id=corr_id
),
"msg": {"op": "post", "username": username, "state": state},
}
)
rc_rmq.start_consume(
{
"queue": callback_queue,
"exclusive": True,
"bind": False,
"cb": handler,
}
)
#!/usr/bin/env python3
import sys
import rc_util
import subprocess
# During migration of this new script for ood
# e.g. not all of users are in the db
migration = True
default_state = "ok"
# migration = False # uncomment after migration's done
remote_user = sys.argv[1]
result = rc_util.check_state(remote_user)
if result == "ok":
print(remote_user)
else:
if migration and result == "no-account":
rc = subprocess.run(
["getent", "passwd", remote_user], stdout=subprocess.DEVNULL
).returncode
if rc == 0:
rc_util.update_state(remote_user, default_state)
print(remote_user)
sys.exit()
print()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment