diff --git a/.github/workflows/linting.yaml b/.github/workflows/linting.yaml new file mode 100644 index 0000000000000000000000000000000000000000..7e0b959620fa0d944f6aa0ad4c12af15961e889f --- /dev/null +++ b/.github/workflows/linting.yaml @@ -0,0 +1,33 @@ +name: Linting + +on: [push, pull_request] + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - name: Checkout + uses: actions/checkout@v2 + - name: Set up Python 3.6 + uses: actions/setup-python@v2 + with: + # Semantic version range syntax or exact version of a Python version + python-version: '3.11' + # Optional - x64 or x86 architecture, defaults to x64 + architecture: 'x64' + - name: find trailing whitespace + uses: harupy/find-trailing-whitespace@master + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install flake8 + #if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + - name: Lint with flake8 + run: | + # stop the build if there are Python syntax errors or undefined names + #flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + flake8 . --count --max-line-length=79 --show-source --statistics + # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide + #flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000000000000000000000000000000000000..62835a7a794d70dc8626a1e721c0bab09729dbbb --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,15 @@ +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v3.2.0 + hooks: + - id: trailing-whitespace + - id: end-of-file-fixer + - id: check-yaml + - repo: https://github.com/psf/black + rev: 23.3.0 + hooks: + - id: black + - repo: https://github.com/pycqa/flake8 + rev: 3.9.1 + hooks: + - id: flake8 diff --git a/README.md b/README.md index 5d7b9155b49e6906eaa6451a4660fce3536a5c73..2672aaee92263fb5ffe9a2ee8ea1d805341a60b8 100644 --- a/README.md +++ b/README.md @@ -48,3 +48,6 @@ rc_rmq.start_consume({ # don't forget to close connection rc_rmq.disconnect() ``` + +### Account creation flowchart + \ No newline at end of file diff --git a/account-creation-flow.png b/account-creation-flow.png new file mode 100644 index 0000000000000000000000000000000000000000..d5a51b7e1e2c43db0eb710e7dd67f75035f6fa8d Binary files /dev/null and b/account-creation-flow.png differ diff --git a/account_manager.py b/account_manager.py new file mode 100755 index 0000000000000000000000000000000000000000..109a516bb9b875a18046a377d674c913975a61bd --- /dev/null +++ b/account_manager.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python3 +import json +import rc_util +import argparse +import signal +import uuid +import pika +import rc_util +from rc_rmq import RCRMQ +import rabbit_config as rcfg +import time + +parser = argparse.ArgumentParser(description = "Account management driver script") +parser.add_argument( + "username", help="Username that should be locked/unlocked") +parser.add_argument( + "state", choices=['ok', 'hold', 'certification', 'pre_certification'], help="Choose from states (ok,hold,certification,pre_certification)") +parser.add_argument( + "-s", "--service", nargs='+', default='all', choices=['ssh', 'newjobs', 'expiration', 'all'], help="List one or more services to be blocked (default: %(default)s)") +parser.add_argument( + "-v", "--verbose", action="store_true", help="verbose output") +parser.add_argument( + "-n", "--dry-run", action="store_true", help="enable dry run mode" +) +args = parser.parse_args() + +timeout = 60 + +queuename = rc_util.encode_name(args.username) + +username = args.username +state = args.state +service = args.service + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"}) + +msg = {} +msg["username"] = username +msg["state"] = state +msg["service"] = service +msg["queuename"] = queuename +msg["updated_by"], msg["host"] = rc_util.get_caller_info() + +# publish msg with acctmgr.{uname} routing key. +rc_rmq.publish_msg( + { + "routing_key": f'acctmgr.request.{queuename}', + "msg": msg, + } +) + + +def timeout_handler(signum, frame): + print("Process timeout, there's some issue with agents") + rc_rmq.stop_consume() + + +def callback(ch, method, properties, body): + msg = json.loads(body) + username = msg["username"] + + if msg["success"]: + print(f"Account for {username} has been {msg['action']}ed.\n Updating the user state in DB") + else: + print(f"There's some issue in account management agents for {username}") + errmsg = msg.get("errmsg", []) + for err in errmsg: + print(err) + + + ch.basic_ack(delivery_tag=method.delivery_tag) + rc_rmq.stop_consume() + rc_rmq.delete_queue(queuename) + +print(f"Request {username} account state set to {state}.") + +# Set initial timeout timer +signal.signal(signal.SIGALRM, timeout_handler) +signal.setitimer(signal.ITIMER_REAL, timeout) + +print("Waiting for completion...") +rc_rmq.start_consume( + { + "queue": queuename, + "routing_key": f'certified.{queuename}', + "cb": callback, + } +) diff --git a/agent_template.py b/agent_template.py index c6e722338bad04999621bd7dfd47a41d2b962d28..7858bdbfc144b7d2e858fb8fb525e5b4609659bd 100644 --- a/agent_template.py +++ b/agent_template.py @@ -1,32 +1,36 @@ #!/usr/bin/env python -import sys import json from rc_rmq import RCRMQ -task = 'task_name' +task = "task_name" # Instantiate rabbitmq object -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) + # Define your callback function def on_message(ch, method, properties, body): # Retrieve routing key routing_key = method.routing_key + print(routing_key) # Retrieve message msg = json.loads(body) + print(msg) # Do Something - print('[{}]: Callback called.'.format(task)) + print("[{}]: Callback called.".format(task)) # Acknowledge message ch.basic_ack(delivery_tag=method.delivery_tag) print("Start listening to queue: {}".format(task)) -rc_rmq.start_consume({ - 'queue': task, # Define your Queue name - 'routing_key': "#", # Define your routing key - 'cb': on_message # Pass in callback function you just define -}) +rc_rmq.start_consume( + { + "queue": task, # Define your Queue name + "routing_key": "#", # Define your routing key + "cb": on_message, # Pass in callback function you just define + } +) diff --git a/create_account.py b/create_account.py index ef77d78b9eed202c406e127fd9b816dbf18dfd10..42668c36a52677d9998601266ed999bd08ba6d77 100755 --- a/create_account.py +++ b/create_account.py @@ -1,25 +1,77 @@ #!/usr/bin/env python3 -import sys +import json import rc_util +import argparse +import signal -if len(sys.argv) < 2: - print("Usage: {} USERNAME [EMAIL] [FULL_NAME] [REASON]".format(sys.argv[0]), file=sys.stderr) - exit(1) +parser = argparse.ArgumentParser() +parser.add_argument("username", help="username that will be created") +parser.add_argument("email", nargs="?", default="", help="User's email") +parser.add_argument( + "full_name", nargs="?", default="", help="User's full name" +) +parser.add_argument( + "reason", nargs="?", default="", help="Reason of requesting" +) +parser.add_argument("--domain", default="localhost", help="domain of email") +parser.add_argument( + "-v", "--verbose", action="store_true", help="verbose output" +) +parser.add_argument( + "-n", "--dry-run", action="store_true", help="enable dry run mode" +) +args = parser.parse_args() -domain = 'uab.edu' -user_name = sys.argv[1] -email = sys.argv[2] if len(sys.argv) >= 3 else '' -full_name = sys.argv[3] if len(sys.argv) >= 4 else '' -reason = sys.argv[4] if len(sys.argv) >= 5 else '' +timeout = 60 -if email == '': - if '@' in user_name: - email = user_name +queuename = rc_util.encode_name(args.username) +updated_by, host = rc_util.get_caller_info() + +if args.email == "": + args.email = args.username + if "@" not in args.email: + args.email = args.username + "@" + args.domain + + +def timeout_handler(signum, frame): + print("Process timeout, there's might some issue with agents") + rc_util.rc_rmq.disconnect() + + +def callback(channel, method, properties, body): + msg = json.loads(body) + username = msg["username"] + + if msg["success"]: + print(f"Account for {username} has been created.") else: - email = user_name + '@' + domain + print(f"There's some issue while creating account for {username}") + errmsg = msg.get("errmsg", []) + for err in errmsg: + print(err) + + rc_util.rc_rmq.disconnect() + + +rc_util.add_account( + args.username, + queuename=queuename, + email=args.email, + full=args.full_name, + reason=args.reason, + updated_by=updated_by, + host=host, +) +print(f"Account for {args.username} requested.") -rc_util.add_account(user_name, email=email, full=full_name, reason=reason) -print("Account requested for user: {}".format(user_name)) +# Set initial timeout timer +signal.signal(signal.SIGALRM, timeout_handler) +signal.setitimer(signal.ITIMER_REAL, timeout) -print("Waiting for confirmation...") -rc_util.consume(user_name) +print("Waiting for completion...") +rc_util.consume( + queuename, + routing_key=f"complete.{queuename}", + exclusive=True, + callback=callback, +) diff --git a/ohpc_account_create.py b/dev_rmq_agents/ohpc_account_create.py similarity index 57% rename from ohpc_account_create.py rename to dev_rmq_agents/ohpc_account_create.py index 4504702f8d8f8d611b8db1ab4b52811b08ec6ade..5cdabd5e6f0d0905a42abf85f1f92d10835cd145 100644 --- a/ohpc_account_create.py +++ b/dev_rmq_agents/ohpc_account_create.py @@ -8,44 +8,40 @@ from rc_rmq import RCRMQ task = "ohpc_account" # Instantiate rabbitmq object -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) + def ohpc_account_create(ch, method, properties, body): msg = json.loads(body) print("Message received {}".format(msg)) - username = msg['username'] + username = msg["username"] success = False try: subprocess.call(["sudo", "useradd", username]) print("[{}]: User {} has been added".format(task, username)) success = True - except: + except Exception: e = sys.exc_info()[0] print("[{}]: Error: {}".format(task, e)) ch.basic_ack(delivery_tag=method.delivery_tag) - msg['uid'] = getpwnam(username).pw_uid - msg['gid'] = getpwnam(username).pw_gid + msg["uid"] = getpwnam(username).pw_uid + msg["gid"] = getpwnam(username).pw_gid # send confirm message - rc_rmq.publish_msg({ - 'routing_key': 'confirm.' + username, - 'msg': { - 'task': task, - 'success': success + rc_rmq.publish_msg( + { + "routing_key": "confirm." + username, + "msg": {"task": task, "success": success}, } - }) + ) if success: # send create message to other agent - rc_rmq.publish_msg({ - 'routing_key': 'create.' + username, - 'msg': msg - }) + rc_rmq.publish_msg({"routing_key": "create." + username, "msg": msg}) + print("Start Listening to queue: {}".format(task)) -rc_rmq.start_consume({ - 'queue': task, - 'routing_key': 'request.*', - 'cb': ohpc_account_create -}) +rc_rmq.start_consume( + {"queue": task, "routing_key": "request.*", "cb": ohpc_account_create} +) diff --git a/ood_account_create.py b/dev_rmq_agents/ood_account_create.py similarity index 54% rename from ood_account_create.py rename to dev_rmq_agents/ood_account_create.py index 1ff9cc9fd8a61df98ae1473c20e01480f6e4aa90..de4ed8a8d73d2af39e9fb86c29a0fb17bf83b67f 100644 --- a/ood_account_create.py +++ b/dev_rmq_agents/ood_account_create.py @@ -4,41 +4,42 @@ import json import subprocess from rc_rmq import RCRMQ -task = 'ood_account' +task = "ood_account" # Instantiate rabbitmq object -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) + def ood_account_create(ch, method, properties, body): msg = json.loads(body) print("Message received {}".format(msg)) - username = msg['username'] - user_uid = str(msg['uid']) - user_gid = str(msg['gid']) + username = msg["username"] + user_uid = str(msg["uid"]) + user_gid = str(msg["gid"]) success = False try: subprocess.call(["sudo", "groupadd", "-r", "-g", user_gid, username]) - subprocess.call(["sudo", "useradd", "-u", user_uid, "-g", user_gid, username]) + subprocess.call( + ["sudo", "useradd", "-u", user_uid, "-g", user_gid, username] + ) print("[{}]: User {} has been added".format(task, username)) success = True - except: + except Exception: e = sys.exc_info()[0] print("[{}]: Error: {}".format(task, e)) ch.basic_ack(delivery_tag=method.delivery_tag) # send confirm message - rc_rmq.publish_msg({ - 'routing_key': 'confirm.' + username, - 'msg': { - 'task': task, - 'success': success + rc_rmq.publish_msg( + { + "routing_key": "confirm." + username, + "msg": {"task": task, "success": success}, } - }) + ) + print("Start listening to queue: {}".format(task)) -rc_rmq.start_consume({ - 'queue': task, - 'routing_key': "create.*", - 'cb': ood_account_create -}) +rc_rmq.start_consume( + {"queue": task, "routing_key": "create.*", "cb": ood_account_create} +) diff --git a/dev_rmq_agents/slurm_agent.py b/dev_rmq_agents/slurm_agent.py new file mode 100755 index 0000000000000000000000000000000000000000..f6c4350c6cad79e010a6a7c576260db201a19bd3 --- /dev/null +++ b/dev_rmq_agents/slurm_agent.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python +import sys +import json +import subprocess +from rc_rmq import RCRMQ + +task = "slurm_account" + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) + + +def slurm_account_create(ch, method, properties, body): + msg = json.loads(body) + print("Message received {}".format(msg)) + username = msg["username"] + success = False + try: + subprocess.call( + [ + "sudo", + "sacctmgr", + "add", + "account", + username, + "-i", + "Descripition: Add user", + ] + ) + subprocess.call( + [ + "sudo", + "sacctmgr", + "add", + "user", + username, + "account=" + username, + "-i", + ] + ) + print("SLURM account for user {} has been added".format(username)) + success = True + except Exception: + e = sys.exc_info()[0] + print("[{}]: Error: {}".format(task, e)) + + ch.basic_ack(delivery_tag=method.delivery_tag) + + # send confirm message + rc_rmq.publish_msg( + { + "routing_key": "confirm." + username, + "msg": {"task": task, "success": success}, + } + ) + + +print("Start listening to queue: {}".format(task)) +rc_rmq.start_consume( + {"queue": task, "routing_key": "create.*", "cb": slurm_account_create} +) diff --git a/flask_producer.py b/flask_producer.py index dc3c44634745278fa426821d2ea0a2ecc9e61e3f..908b57f9d9e5c6f752421dc76dc2fd472a4e2259 100755 --- a/flask_producer.py +++ b/flask_producer.py @@ -15,7 +15,7 @@ user_name = sys.argv[2] message = { "username": user_name, "fullname": "Full Name", - "reason": "Reason1, Reason2." + "reason": "Reason1, Reason2.", } hostname = socket.gethostname().split(".", 1)[0] @@ -23,33 +23,39 @@ connect_host = rcfg.Server if hostname != rcfg.Server else "localhost" # Set up credentials to connect to RabbitMQ server credentials = pika.PlainCredentials(rcfg.User, rcfg.Password) -parameters = pika.ConnectionParameters(connect_host, - rcfg.Port, - rcfg.VHost, - credentials) +parameters = pika.ConnectionParameters( + connect_host, rcfg.Port, rcfg.VHost, credentials +) # Establish connection to RabbitMQ server connection = pika.BlockingConnection(parameters) channel = connection.channel() -channel.exchange_declare(exchange=rcfg.Exchange, exchange_type='direct') +channel.exchange_declare(exchange=rcfg.Exchange, exchange_type="direct") -channel.basic_publish(exchange=rcfg.Exchange, routing_key=node, body=json.dumps(message)) +channel.basic_publish( + exchange=rcfg.Exchange, routing_key=node, body=json.dumps(message) +) print(" [x] Sent {}: {}".format(node, json.dumps(message))) # creates a named queue result = channel.queue_declare(queue=user_name, exclusive=False, durable=True) # bind the queue with exchange -channel.queue_bind(exchange=rcfg.Exchange, queue=user_name, routing_key=user_name) +channel.queue_bind( + exchange=rcfg.Exchange, queue=user_name, routing_key=user_name +) + def work(ch, method, properties, body): msg = json.loads(body) print("Received message from {}: \n\t{}".format(method.routing_key, msg)) - #queue_unbind(queue, exchange=None, routing_key=None, arguments=None, callback=None) channel.queue_delete(method.routing_key) + # ingest messages, and assume delivered via auto_ack -channel.basic_consume(queue=sys.argv[2], on_message_callback=work, auto_ack=True) +channel.basic_consume( + queue=sys.argv[2], on_message_callback=work, auto_ack=True +) print("Subscribing to queue: {}".format(sys.argv[2])) # initiate message ingestion diff --git a/init_user_state.py b/init_user_state.py new file mode 100644 index 0000000000000000000000000000000000000000..54ba947560e151fa12592d7f314bb5b916a1ccb4 --- /dev/null +++ b/init_user_state.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python3 +import argparse +import dataset +import sys +import subprocess +import rabbit_config as rcfg +import rc_util +from datetime import datetime + +parser = argparse.ArgumentParser() +parser.add_argument("-f", "--force", action="store_true", help="force update") +parser.add_argument( + "--dry-run", action="store_true", help="enable dry run mode" +) +args = parser.parse_args() + +default_state = "ok" +today = datetime.now() +updated_by, host = rc_util.get_caller_info() + +# Chunk size for insert into db +size = 1000 + +db = dataset.connect(f"sqlite:///{rcfg.db_path}/user_reg.db") +table = db["user_state"] + +if table.__len__() > 0 and not args.force: + print("table user_state not empty, abort.") + sys.exit() + +# Getting user list +users = subprocess.run( + ["ls", "/data/user"], stdout=subprocess.PIPE, encoding="UTF-8" +).stdout.split() + +# Update user_state table +# Insert many +if len(users) > 50: + start = 0 + while start < len(users): + end = start + size if start + size < len(users) else len(users) + data = [ + dict( + username=user, + state=default_state, + date=today, + updated_by=updated_by, + host=host, + ) + for user in users[start:end] + ] + if args.dry_run: + print(f"Table insert many from {start} to {end - 1}") + else: + table.insert_many(data, chunk_size=size) + start = end + +# Insert one by one +else: + for user in users: + if args.dry_run: + print(f"Table insert user: {user}, state: {default_state}") + else: + table.insert( + { + "username": user, + "state": default_state, + "date": today, + "updated_by": updated_by, + "host": host, + } + ) diff --git a/mail_config.py.example b/mail_config.py.example new file mode 100644 index 0000000000000000000000000000000000000000..e699d8b2733deda1c1510d11c4bc8503a1289a4a --- /dev/null +++ b/mail_config.py.example @@ -0,0 +1,29 @@ +import rabbit_config as rcfg + +Head = f"""From: {rcfg.Sender_alias} <{rcfg.Sender}> +To: <{{{{ to }}}}> +Subject: {rcfg.Subject} +""" + +Body = f""" +Hi {{{{ username }}}} +Your account has been set up with: + +============================ +User ID: {{{{ username }}}} +============================ + +If you have any questions, please visit: +{rcfg.Info_url} + +or email at {rcfg.Admin_email} + +Cheers, +""" + +Whole_mail = Head + Body + +UserReportHead = f"""From: {rcfg.Sender_alias} <{rcfg.Sender}> +To: <{rcfg.Admin_email}> +Subject: [{{{{ result }}}}]RC Account Creation Report: {{{{ fullname }}}}, {{{{ username }}}} """ + diff --git a/prod_rmq_agents/acct_mgmt_workflow.py b/prod_rmq_agents/acct_mgmt_workflow.py new file mode 100755 index 0000000000000000000000000000000000000000..ac89b6dee2b4c38e3bf89d58246f764640fc8c1c --- /dev/null +++ b/prod_rmq_agents/acct_mgmt_workflow.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 +import json +import rc_util +import argparse +import signal +import uuid +import pika +import rc_util +from rc_rmq import RCRMQ +import rabbit_config as rcfg +import time + +task = "acctmgr" + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"}) + +tracking = {} + +def manage_acct(ch, method, properties, body): + msg = json.loads(body) + op = method.routing_key.split(".")[1] + username = msg["username"] + state = msg["state"] + service = msg["service"] + queuename = msg["queuename"] + + try: + if username in tracking: + current = tracking[username] + else: + current = tracking[username] = {} + + if op == 'request': + if state == 'hold' or state == 'certification': + msg["action"] = "lock" + elif state == 'ok' or state == 'pre_certification': + msg["action"] = "unlock" + else: + print("Invalid state provided. Check the help menu.") + + if service == 'all': + current["new_jobs"] = None + current["expire_account"] = None + current["ssh_access"] = None + # send a broadcast message to all agents + rc_rmq.publish_msg( + { + "routing_key": f"{msg['action']}.{queuename}", + "msg": msg, + } + ) + else: + for each_service in service: + current[each_service] = None + rc_rmq.publish_msg( + { + "routing_key": f"{each_service}.{queuename}", + "msg": msg + } + ) + + + elif op == 'done': + # Check if each task/agent returned success + current[msg["task"]] = msg["success"] + + done = True + + for task in current.keys(): + if current[task] is None: + done = False + + if done: + rc_util.update_state( + username, state, msg.get("updated_by"), msg.get("host") + ) + + # Send done msg to account_manager.py + rc_rmq.publish_msg( + { + "routing_key": f'certified.{queuename}', + "msg": msg, + } + ) + + ch.basic_ack(delivery_tag=method.delivery_tag) + + except Exception: + msg["errmsg"] = "Exception raised in account manager workflow agent , check the logs for stack trace" + logger.error("", exc_info=True) + +rc_rmq.bind_queue(queue=task, routing_key='acctmgr.request.*', durable=True) +rc_rmq.bind_queue(queue=task, routing_key='acctmgr.done.*', durable=True) + +print("Waiting for completion...") +rc_rmq.start_consume( + {"queue": task, "cb": manage_acct} +) + diff --git a/prod_rmq_agents/dir_verify.py b/prod_rmq_agents/dir_verify.py new file mode 100644 index 0000000000000000000000000000000000000000..4b768c9428a4f1438eeb2b81d3f84d6d9c5c1ebc --- /dev/null +++ b/prod_rmq_agents/dir_verify.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python +import os +import json +import rc_util +from pathlib import Path +from rc_rmq import RCRMQ +import rabbit_config as rcfg + +task = "dir_verify" +dirs = rcfg.User_dirs + +args = rc_util.get_args() +logger = rc_util.get_logger(args) + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) + + +def dir_verify(ch, method, properties, body): + msg = json.loads(body) + username = msg["username"] + msg["task"] = task + msg["success"] = True + + missing_dirs = [] + + try: + for d in dirs: + path = Path(d) / msg["username"] + + if args.dry_run: + logger.info(f"Checking dirs: {path}") + + else: + if not path.exists(): + # check if dirs exist and record any missing dirs + missing_dirs.append(path) + msg["success"] = False + msg["errmsg"] = f"Error: missing dirs {missing_dirs}" + logger.info(f"{path} does not exist") + else: + # check existing dirs for correct ownership and permissions + status = os.stat(path) + mask = oct(status.st_mode)[-3:] + uid = str(status.st_uid) + gid = str(status.st_gid) + if mask != "700" or uid != msg["uid"] or gid != msg["gid"]: + msg["success"] = False + msg["errmsg"] = ( + f"Error: dir {path} permissions or ownership are" + " wrong" + ) + + except Exception: + msg["success"] = False + msg["errmsg"] = "Exception raised, check the logs for stack trace" + logger.error("", exc_info=True) + + # send confirm message + rc_rmq.publish_msg( + {"routing_key": "confirm." + msg["queuename"], "msg": msg} + ) + + logger.debug(f"User {username} confirmation sent") + + ch.basic_ack(delivery_tag=method.delivery_tag) + + +logger.info(f"Start listening to queue: {task}") +rc_rmq.start_consume( + {"queue": task, "routing_key": "verify.*", "cb": dir_verify} +) + +logger.info("Disconnected") +rc_rmq.disconnect() diff --git a/prod_rmq_agents/examples/delete_user.sh b/prod_rmq_agents/examples/delete_user.sh new file mode 100755 index 0000000000000000000000000000000000000000..ca51baf74894bcc2e3e827a0a5b67885735521f1 --- /dev/null +++ b/prod_rmq_agents/examples/delete_user.sh @@ -0,0 +1,48 @@ +#!/bin/sh + +username="$1" +path_to_db="/cm/shared/rabbitmq_agents/prod_rmq_agents/.agent_db/user_reg.db" + +usage() { + echo "Usage: $0 USERNAME" +} + +if [[ "$EUID" -ne 0 ]]; then + echo "This script must be run as root!" + exit 1 +fi + +if [ -z "$username" ]; then + usage + exit 1 +fi + +if id "$username" &>/dev/null; then + echo "Deleting user: ${username}" + + echo "Clean PUN process on loginnode" + ssh login001 "/opt/ood/nginx_stage/sbin/nginx_stage nginx_clean --force --user $username" + + echo "Remove user via cmsh" + cmsh -c "user use ${username}; remove -d; commit;" + + echo "Remove user from sqlite db users table" + sqlite3 $path_to_db "delete from users where username=\"$username\"" + + echo "Remove user from sqlite db user_state table" + sqlite3 $path_to_db "delete from user_state where username=\"$username\"" + + echo "Remove user from sacctmgr" + sacctmgr -i delete user $username + sacctmgr -i delete account $username + + echo "Remove /data/user" + rm -rf "/data/user/${username}" + + echo "Remove /data/scratch" + rm -rf "/data/scratch/${username}" + +else + echo "user: ${username} not found." + exit 1 +fi diff --git a/prod_rmq_agents/expire_account.py b/prod_rmq_agents/expire_account.py new file mode 100644 index 0000000000000000000000000000000000000000..94e421276fdb68e2bc07800d75beb0f696fb64c3 --- /dev/null +++ b/prod_rmq_agents/expire_account.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python +import os +import json +import pika +import rc_util +from os import popen +from pathlib import Path +from rc_rmq import RCRMQ +import rabbit_config as rcfg +from datetime import date, timedelta + +task = "expire_account" + +args = rc_util.get_args() +logger = rc_util.get_logger(args) + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"}) + + +def expire_account(ch, method, properties, body): + msg = json.loads(body) + username = msg["username"] + action = msg["action"] + msg["task"] = task + queuename = msg["queuename"] + yesterday = date.today() - timedelta(days=1) + + try: + expire_account_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "user;use {username}; set expirationdate {yesterday}; commit;"' + unexpire_account_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "user;use {username}; set expirationdate 2037/12/31; commit;"' + + if action == 'lock': + block_ssh = popen(expire_account_cmd).read().rstrip() + elif action == 'unlock': + unblock_ssh = popen(unexpire_account_cmd).read().rstrip() + + msg["success"] = True + logger.info(f"ssh expiration set to yesterday for user {username}") + + except Exception: + msg["success"] = False + msg["errmsg"] = "Exception raised, while expiring user's ssh access, check the logs for stack trace" + logger.error("", exc_info=True) + + # send response to callback queue with it's correlation ID + rc_rmq.publish_msg( + {"routing_key": f'acctmgr.done.{queuename}', + "msg": msg} + ) + + logger.debug(f"User {username} confirmation sent for {action}ing {task}") + + ch.basic_ack(delivery_tag=method.delivery_tag) + + +logger.info(f"Start listening to queue: {task}") +rc_rmq.bind_queue(queue=task, routing_key='lock.*', durable=True) +rc_rmq.bind_queue(queue=task, routing_key='unlock.*', durable=True) +rc_rmq.bind_queue(queue=task, routing_key='expiration.*', durable=True) + +rc_rmq.start_consume( + {"queue": task, "cb": expire_account} +) + +logger.info("Disconnected") +rc_rmq.disconnect() diff --git a/prod_rmq_agents/get-next-uid-gid.py b/prod_rmq_agents/get-next-uid-gid.py new file mode 100644 index 0000000000000000000000000000000000000000..03cb51f783a789610698ed1ba53700c637177439 --- /dev/null +++ b/prod_rmq_agents/get-next-uid-gid.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python +import json +import time +import rc_util +from os import popen +from rc_rmq import RCRMQ +import rabbit_config as rcfg +from subprocess import run +import shlex + +task = "create_account" + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) + +args = rc_util.get_args() + +# Logger +logger = rc_util.get_logger() + + +# Account creation +def create_account(msg): + + logger.info(f"Account creation request received: {msg}") + username = msg["username"] + uid = msg["uid"] + email = msg["email"] + fullname = msg["fullname"] + msg["success"] = False + + # Bright command to create user + if str(rcfg.bright_cm_version).split(".")[0] == "8": + cmd = "/cm/local/apps/cmd/bin/cmsh -c " + cmd += f'"user; add {username}; set userid {uid}; set email {email};' + cmd += f'set commonname \\"{fullname}\\"; ' + cmd += 'commit;"' + else: + cmd = "/cm/local/apps/cmd/bin/cmsh -c " + cmd += f'"user; add {username}; set id {uid}; set email {email};' + cmd += f'set commonname \\"{fullname}\\"; ' + cmd += 'commit;"' + + if not args.dry_run: + run(shlex.split(cmd)) + time.sleep(rcfg.Delay) + logger.info(f"Bright command to create user:{cmd}") + + +# Define your callback function +def resolve_uid_gid(ch, method, properties, body): + + # Retrieve message + msg = json.loads(body) + logger.info("Received {}".format(msg)) + username = msg["username"] + msg["success"] = False + + # Determine next available UID + try: + user_exists_cmd = f"/usr/bin/getent passwd {username}" + user_exists = popen(user_exists_cmd).read().rstrip() + + if user_exists: + logger.info("The user, {} already exists".format(username)) + msg["uid"] = user_exists.split(":")[2] + msg["gid"] = user_exists.split(":")[3] + + else: + cmd_uid = ( + "/usr/bin/getent passwd | awk -F: 'BEGIN { maxuid=10000 }" + " ($3>10000) && ($3<20000) && ($3>maxuid) { maxuid=$3; } END {" + " print maxuid+1; }'" + ) + msg["uid"] = popen(cmd_uid).read().rstrip() + logger.info(f"UID query: {cmd_uid}") + + cmd_gid = ( + "/usr/bin/getent group | awk -F: 'BEGIN { maxgid=10000 }" + " ($3>10000) && ($3<20000) && ($3>maxgid) { maxgid=$3; } END {" + " print maxgid+1; }'" + ) + msg["gid"] = popen(cmd_gid).read().rstrip() + logger.info(f"GID query: {cmd_gid}") + + create_account(msg) + msg["task"] = task + msg["success"] = True + except Exception: + msg["success"] = False + msg["errmsg"] = ( + "Exception raised during account creation, check logs for stack" + " trace" + ) + logger.error("", exc_info=True) + + # Acknowledge message + ch.basic_ack(delivery_tag=method.delivery_tag) + + # Send confirm message + logger.debug("rc_rmq.publish_msg()") + rc_rmq.publish_msg( + {"routing_key": "confirm." + msg["queuename"], "msg": msg} + ) + logger.info("confirmation sent") + + +logger.info("Start listening to queue: {}".format(task)) +rc_rmq.start_consume( + {"queue": task, "routing_key": "request.*", "cb": resolve_uid_gid} +) + +logger.info("Disconnected") +rc_rmq.disconnect() diff --git a/prod_rmq_agents/git_commit.py b/prod_rmq_agents/git_commit.py new file mode 100644 index 0000000000000000000000000000000000000000..c8665c17d3a596fa80923cf64b3b333e490a8034 --- /dev/null +++ b/prod_rmq_agents/git_commit.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python +import os +import sh +import json +import rc_util +from rc_rmq import RCRMQ +import rabbit_config as rmq_cfg +import time + +task = "git_commit" + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) + +# Define some location +repo_location = os.path.expanduser(rmq_cfg.rc_users_ldap_repo_loc) +users_dir = repo_location + "/users" +groups_dir = repo_location + "/groups" + +args = rc_util.get_args() +logger = rc_util.get_logger(args) + +if not args.dry_run: + git = sh.git.bake( + "--git-dir", repo_location + "/.git", "--work-tree", repo_location + ) + ldapsearch = sh.Command("ldapsearch") +else: + git = sh.echo.bake( + "--git-dir", repo_location + "/.git", "--work-tree", repo_location + ) + ldapsearch = sh.echo.bake("ldapsearch") + + +def git_commit(ch, method, properties, body): + msg = json.loads(body) + username = msg["username"] + msg["task"] = task + msg["success"] = False + branch_name = ( + "issue-add-users-" + + username.lower() + + "-" + + time.strftime("%Y%m%d_%H%M%S") + ) + user_ldif = users_dir + f"/{username}.ldif" + group_ldif = groups_dir + f"/{username}.ldif" + + logger.info("Received: %s", msg) + logger.debug("branch_name: %s", branch_name) + + try: + + logger.debug("git checkout master") + git.checkout("master") + logger.debug("git pull") + git.pull() + branch_exists = git.branch("--list", branch_name) + if not branch_exists: + logger.debug("git checkout -b %s", branch_name) + git.checkout("-b", branch_name) + logger.debug("open(%s, 'w'), open(%s, 'w')", user_ldif, group_ldif) + with open(user_ldif, "w") as ldif_u, open( + group_ldif, "w" + ) as ldif_g: + logger.debug( + "ldapsearch -LLL -x -H ldaps://ldapserver -b 'dc=cm,dc=clu" + f"ster' uid={username} > {user_ldif}" + ) + ldapsearch( + "-LLL", + "-x", + "-H", + "ldaps://ldapserver", + "-b", + "dc=cm,dc=cluster", + f"uid={username}", + _out=ldif_u, + ) + logger.debug( + "ldapsearch -LLL -x -H ldapserver -b 'ou=Group,dc=cm,dc=cl" + f"uster' cn={username} > {group_ldif}" + ) + ldapsearch( + "-LLL", + "-x", + "-H", + "ldaps://ldapserver", + "-b", + "ou=Group,dc=cm,dc=cluster", + f"cn={username}", + _out=ldif_g, + ) + logger.info("user ldif files generated.") + + logger.debug("git add %s", user_ldif) + git.add(user_ldif) + logger.debug("git add %s", group_ldif) + git.add(group_ldif) + logger.debug("git commit -m 'Added new cheaha user: %s'", username) + git.commit(m="Added new cheaha user: " + username) + logger.debug("git checkout master") + git.checkout("master") + + logger.debug("git merge %s --no-ff --no-edit", branch_name) + git.merge(branch_name, "--no-ff", "--no-edit") + logger.debug("git push origin master") + git.push("origin", "master") + # merge with gitlab api + + logger.info("Added ldif files and committed to git repo") + + msg["success"] = True + except Exception: + logger.error("", exc_info=True) + + # Send confirm message + logger.debug("rc_rmq.publish_msge()") + rc_rmq.publish_msg( + {"routing_key": "confirm." + msg["queuename"], "msg": msg} + ) + logger.info("confirmation sent") + + # Acknowledge message + logger.debug("ch.basic_ack()") + ch.basic_ack(delivery_tag=method.delivery_tag) + + +logger.info("Start listening to queue: %s", task) +rc_rmq.start_consume( + {"queue": task, "routing_key": "verify.*", "cb": git_commit} +) + +logger.info("Disconnected") +rc_rmq.disconnect() diff --git a/prod_rmq_agents/group_member.py b/prod_rmq_agents/group_member.py new file mode 100644 index 0000000000000000000000000000000000000000..cda7e5ac0bdf97f4ef953cc4b3fde4951b65143f --- /dev/null +++ b/prod_rmq_agents/group_member.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python +import os +import json +import pika +import shlex +import rc_util +from subprocess import Popen,PIPE +from pathlib import Path +from rc_rmq import RCRMQ +import rabbit_config as rcfg + +task = "group_member" + +args = rc_util.get_args() +logger = rc_util.get_logger(args) + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"}) + + +def group_member(ch, method, properties, body): + msg = json.loads(body) + username = msg["username"] + action = msg["action"] + msg["task"] = task + state = msg["state"] + + try: + if 'remove' in msg["groups"]: + for each_group in msg["groups"]["remove"]: + logger.debug(f'Removing user {username} from group {each_group}') + if str(rcfg.bright_cm_version).split(".")[0] == "8": + grp_remove_user_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "group; removefrom {each_group} groupmembers {username}; commit;"' + else: + grp_remove_user_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "group; removefrom {each_group} members {username}; commit;"' + + logger.info(f'Running command: {grp_remove_user_cmd}') + proc = Popen(shlex.split(grp_remove_user_cmd), stdout=PIPE, stderr=PIPE) + out,err = proc.communicate() + logger.debug(f'Result: {err}') + logger.info(f'User {username} is removed from {each_group} group') + + if 'add' in msg["groups"]: + for each_group in msg["groups"]["add"]: + logger.debug(f'Adding user {username} to group {each_group}') + if str(rcfg.bright_cm_version).split(".")[0] == "8": + grp_add_user_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "group; append {each_group} groupmembers {username}; commit;"' + else: + grp_add_user_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "group; append {each_group} members {username}; commit;"' + + logger.info(f'Running command: {grp_add_user_cmd}') + proc = Popen(shlex.split(grp_add_user_cmd), stdout=PIPE, stderr=PIPE) + out,err = proc.communicate() + logger.debug(f'Result: {err}') + logger.info(f'User {username} is added to {each_group} group') + + + msg["success"] = True + + except Exception: + msg["success"] = False + msg["errmsg"] = "Exception raised, while adding user to group {groupname}, check the logs for stack trace" + logger.error("", exc_info=True) + + + corr_id = properties.correlation_id + reply_to = properties.reply_to + + logger.debug(f'corr_id: {corr_id} \n reply_to: {reply_to}') + # send response to the callback queue + if reply_to: + props = pika.BasicProperties(correlation_id=corr_id) + logger.debug("Sending confirmation back to reply_to") + rc_rmq.publish_msg( + { + "routing_key": reply_to, + "props": props, + "msg": msg + } + ) + else: + print("Error: no reply_to") + + logger.debug(f'User {username} confirmation sent from {task}') + + ch.basic_ack(delivery_tag=method.delivery_tag) + + +logger.info(f"Start listening to queue: {task}") +rc_rmq.bind_queue(queue=task, routing_key='group_member.*', durable=True) + +rc_rmq.start_consume( + {"queue": task, "cb": group_member} +) + +logger.info("Disconnected") +rc_rmq.disconnect() + diff --git a/prod_rmq_agents/new_jobs.py b/prod_rmq_agents/new_jobs.py new file mode 100644 index 0000000000000000000000000000000000000000..cb51d18e4f21e77abe2d1391724ca18b474a8727 --- /dev/null +++ b/prod_rmq_agents/new_jobs.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python +import os +import json +import pika +import rc_util +from os import popen +from pathlib import Path +from rc_rmq import RCRMQ +import rabbit_config as rcfg + +task = "new_jobs" + +args = rc_util.get_args() +logger = rc_util.get_logger(args) + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"}) + + +def new_jobs(ch, method, properties, body): + msg = json.loads(body) + username = msg["username"] + action = msg["action"] + msg["task"] = task + queuename = msg["queuename"] + + try: + block_new_jobs_cmd = f"/cm/shared/apps/slurm/19.05.5/bin/sacctmgr --immediate update user {username} set maxjobs=0" + unblock_new_jobs_cmd = f"/cm/shared/apps/slurm/19.05.5/bin/sacctmgr --immediate update user {username} set maxjobs=-1" + + if action == 'lock': + block_new_jobs = popen(block_new_jobs_cmd).read().rstrip() + elif action == 'unlock': + unblock_new_jobs = popen(unblock_new_jobs_cmd).read().rstrip() + + msg["success"] = True + logger.info(f"Succeeded in blocking {username}'s jobs getting to run state") + + except Exception: + msg["success"] = False + msg["errmsg"] = "Exception raised while setting maxjobs that can enter run state, check the logs for stack trace" + logger.error("", exc_info=True) + + + rc_rmq.publish_msg( + {"routing_key": f'acctmgr.done.{queuename}', + "msg": msg} + ) + + logger.debug(f"User {username} confirmation sent for {action}ing {task}") + + ch.basic_ack(delivery_tag=method.delivery_tag) + + +logger.info(f"Start listening to queue: {task}") +rc_rmq.bind_queue(queue=task, routing_key='lock.*', durable=True) +rc_rmq.bind_queue(queue=task, routing_key='unlock.*', durable=True) +rc_rmq.bind_queue(queue=task, routing_key='newjobs.*', durable=True) +rc_rmq.start_consume( + {"queue": task, "cb": new_jobs} +) + +logger.info("Disconnected") +rc_rmq.disconnect() diff --git a/prod_rmq_agents/notify_user.py b/prod_rmq_agents/notify_user.py new file mode 100644 index 0000000000000000000000000000000000000000..1c9c83b51809868babeecd8b375a62b844090458 --- /dev/null +++ b/prod_rmq_agents/notify_user.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python +import json +import rc_util +import smtplib +import dataset +from rc_rmq import RCRMQ +from jinja2 import Template +from datetime import datetime +import rabbit_config as rcfg +import mail_config as mail_cfg + +task = "notify_user" + +args = rc_util.get_args() +logger = rc_util.get_logger(args) + +db = dataset.connect(f"sqlite:///{rcfg.db_path}/user_reg.db") +table = db["users"] + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) + + +# Email instruction to user +def notify_user(ch, method, properties, body): + msg = json.loads(body) + username = msg["username"] + user_email = msg["email"] + msg["task"] = task + msg["success"] = False + errmsg = "" + + try: + + # Search username in database + record = table.find_one(username=username) + + if record["sent"] is not None: + errmsg = "Updating database counter" + # Update counter + count = record["count"] + if args.dry_run: + logger.info("Update counter in database") + + else: + table.update( + {"username": username, "count": count + 1}, + ["username"], + ) + + logger.debug(f"User {username} counter updated to {count + 1}") + + else: + # Send email to user + receivers = [user_email, rcfg.Admin_email] + message = Template(mail_cfg.Whole_mail).render( + username=username, to=user_email + ) + + if args.dry_run: + logger.info(f"smtp = smtplib.SMTP({rcfg.Mail_server})") + logger.info( + f"smtp.sendmail({rcfg.Sender}, {receivers}, message)" + ) + logger.info( + f"table.update({{'username': {username}, 'count': 1," + " 'sent_at': datetime.now()}}, ['username'])" + ) + + else: + errmsg = "Sending email to user" + smtp = smtplib.SMTP(rcfg.Mail_server) + smtp.sendmail(rcfg.Sender_notification, receivers, message) + + logger.debug(f"Email sent to: {user_email}") + + errmsg = "Updating database email sent time" + table.update( + { + "username": username, + "count": 1, + "sent": datetime.now(), + }, + ["username"], + ) + + logger.debug(f"User {username} inserted into database") + + msg["success"] = True + except Exception: + logger.error("", exc_info=True) + msg["errmsg"] = errmsg if errmsg else "Unexpected error" + + # Send confirm message + rc_rmq.publish_msg( + {"routing_key": "confirm." + msg["queuename"], "msg": msg} + ) + + logger.debug(f"User {username} confirmation sent") + + # Acknowledge the message + ch.basic_ack(delivery_tag=method.delivery_tag) + + +if __name__ == "__main__": + logger.info(f"Start listening to queue: {task}") + rc_rmq.start_consume( + {"queue": task, "routing_key": "notify.*", "cb": notify_user} + ) + + logger.info("Disconnected") + rc_rmq.disconnect() diff --git a/prod_rmq_agents/ssh_access.py b/prod_rmq_agents/ssh_access.py new file mode 100644 index 0000000000000000000000000000000000000000..947d66a89077d4eb327955beaa1b1561a33ef2c0 --- /dev/null +++ b/prod_rmq_agents/ssh_access.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python +import os +import json +import pika +import uuid +import rc_util +from subprocess import Popen,PIPE +from pathlib import Path +from rc_rmq import RCRMQ +import rabbit_config as rcfg + +task = "ssh_access" + +args = rc_util.get_args() +logger = rc_util.get_logger(args) + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"}) + + +def ssh_access(ch, method, properties, body): + msg = json.loads(body) + routing_key = method.routing_key + username = msg["username"] + action = msg["action"] + msg["task"] = task + queuename = msg["queuename"] + state = msg["state"] + + global corr_id + try: + # check if it's a response from group_member_agent + if routing_key == task: + logger.debug(f"corr_id sent by group_member agent: {properties.correlation_id}") + if corr_id == properties.correlation_id: + logger.debug(f'group_member agent confirmation msg["success"]: {msg["success"]}') + # forward confirmation response to acct_mgmt_workflow agent + rc_rmq.publish_msg( + { + "routing_key": f'acctmgr.done.{queuename}', + "msg": msg + } + ) + logger.debug(f'User {username} confirmation sent for {action}ing {task}') + + else: + corr_id = str(uuid.uuid4()) + logger.debug(f'corr_id generated: {corr_id}') + msg["groups"] = {} + + proc = Popen(['/usr/bin/groups', username], stdout=PIPE, stderr=PIPE) + out,err = proc.communicate() + + user_groups = out.decode().strip().split(":")[1].split() + state_groups = rcfg.state_groups + """ + Filter the lock group a user is in and assign to spl + lambda function returns common elements between two lists. For all + the true values by returned lambda function for common elements + corresponding values are included as a list by filter function. + """ + user_state_groups = list(filter(lambda x:x in list(rcfg.state_groups.values()),user_groups)) + + # Depending on state add user to the group corresponding to state. + # Remove user from user_state_groups they are already part of. + # eg: {"groups": { "add":[a,b,c], "remove":[d,e,f] } + if state == 'certification': + msg["groups"]["add"] = [state_groups[state]] + msg["groups"]["remove"] = user_state_groups + + elif state == 'hold': + msg["groups"]["add"] = [state_groups[state]] + msg["groups"]["remove"] = user_state_groups + + elif state == 'pre_certification': + msg["groups"]["add"] = [state_groups[state]] + msg["groups"]["remove"] = user_state_groups + + elif state == 'ok': + msg["groups"]["remove"] = user_state_groups + + # send a message to group_member.py agent + logger.debug(f"sending msg to group agent: {msg}") + rc_rmq.publish_msg( + { + "routing_key": f'group_member.{queuename}', + "props": pika.BasicProperties( + correlation_id = corr_id, + reply_to = task, + ), + "msg": msg + } + ) + logger.info(f"Request sent to add/remove user {username} to/from spl groups") + + except Exception: + msg["success"] = False + msg["errmsg"] = "Exception raised in ssh_access agent, check the logs for stack trace" + logger.error("", exc_info=True) + + ch.basic_ack(delivery_tag=method.delivery_tag) + + +logger.info(f"Start listening to queue: {task}") +rc_rmq.bind_queue(queue=task, routing_key='lock.*', durable=True) +rc_rmq.bind_queue(queue=task, routing_key='unlock.*', durable=True) +rc_rmq.bind_queue(queue=task, routing_key='ssh.*', durable=True) +rc_rmq.bind_queue(queue=task, routing_key=task, durable=True) + +rc_rmq.start_consume( + {"queue": task, "cb": ssh_access} +) + +logger.info("Disconnected") +rc_rmq.disconnect() diff --git a/prod_rmq_agents/subscribe_mail_lists.py b/prod_rmq_agents/subscribe_mail_lists.py new file mode 100644 index 0000000000000000000000000000000000000000..d3dd7a8ac4ab694caf126859dc731731670f1bcb --- /dev/null +++ b/prod_rmq_agents/subscribe_mail_lists.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python +import json +import smtplib +import rc_util +from email.message import EmailMessage +from rc_rmq import RCRMQ +import rabbit_config as rcfg + +task = "subscribe_mail_list" + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) + +# Parse arguments +args = rc_util.get_args() + +# Logger +logger = rc_util.get_logger() # Define your callback function + + +def mail_list_subscription(ch, method, properties, body): + + # Retrieve message + msg = json.loads(body) + logger.info("Received msg {}".format(msg)) + username = msg["username"] + fullname = msg["fullname"] + email = msg["email"] + + mail_list_admin = rcfg.Sender_subscription + mail_list = rcfg.Mail_list + mail_list_bcc = rcfg.Mail_list_bcc + server = rcfg.Mail_server + + listserv_cmd = ( + f"QUIET ADD hpc-announce {email} {fullname}\n" + f"QUIET ADD hpc-users {email} {fullname}" + ) + + logger.info("Adding user{} to mail list".format(username)) + msg["success"] = False + try: + # Create a text/plain message + email_msg = EmailMessage() + + email_msg["From"] = mail_list_admin + email_msg["To"] = mail_list + email_msg["Subject"] = "" + email_msg["Bcc"] = mail_list_bcc + + # Create an smtp object and send email + s = smtplib.SMTP(server) + + email_msg.set_content(listserv_cmd) + if not args.dry_run: + s.send_message(email_msg) + logger.info( + f"This email will add user {username} to listserv \n{email_msg}" + ) + + s.quit() + msg["task"] = task + msg["success"] = True + except Exception: + logger.error("", exc_info=True) + + # Acknowledge message + ch.basic_ack(delivery_tag=method.delivery_tag) + + # send confirm message + logger.debug("rc_rmq.publish_msg()") + rc_rmq.publish_msg( + {"routing_key": "confirm." + msg["queuename"], "msg": msg} + ) + logger.info("confirmation sent") + + +logger.info("Start listening to queue: {}".format(task)) +rc_rmq.start_consume( + { + "queue": task, # Define your Queue name + "routing_key": "verify.*", # Define your routing key + "cb": mail_list_subscription, # Pass callback function you just define + } +) + +logger.info("Disconnected") +rc_rmq.disconnect() diff --git a/prod_rmq_agents/task_manager.py b/prod_rmq_agents/task_manager.py new file mode 100644 index 0000000000000000000000000000000000000000..3b82f838f47ed8b926bb3ecc3b506ea59bb26fdf --- /dev/null +++ b/prod_rmq_agents/task_manager.py @@ -0,0 +1,330 @@ +#!/usr/bin/env python +import copy +import json +import signal +import dataset +import rc_util +import smtplib +from rc_rmq import RCRMQ +from jinja2 import Template +from datetime import datetime +import mail_config as mail_cfg +import rabbit_config as rcfg + +task = "task_manager" +timeout = 30 + +args = rc_util.get_args() +logger = rc_util.get_logger(args) + +db = dataset.connect(f"sqlite:///{rcfg.db_path}/user_reg.db") +table = db["users"] + +record = { + "uid": -1, + "gid": -1, + "email": "", + "reason": "", + "fullname": "", + "last_update": datetime.now(), + "errmsg": None, + "waiting": set(), + "request": {"create_account": None}, + "verify": { + "git_commit": None, + "dir_verify": None, + "subscribe_mail_list": None, + }, + "notify": {"notify_user": None}, + "reported": False, +} + +# Currently tracking users +tracking = {} + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) + + +def notify_admin(username, user_record): + receivers = [rcfg.Admin_email] + + result = ( + "SUCCESS" + if user_record["request"]["create_account"] + and user_record["verify"]["git_commit"] + and user_record["verify"]["dir_verify"] + and user_record["verify"]["subscribe_mail_list"] + and user_record["notify"]["notify_user"] + else "FAILED" + ) + + message = Template(mail_cfg.UserReportHead).render( + username=username, fullname=user_record["fullname"], result=result + ) + if user_record["reported"]: + message += " (Duplicate)" + message += f""" \n + User Creation Report for user {username} + uid: {user_record["uid"]}, gid: {user_record["gid"]} + Tasks: + 'create_account': {user_record["request"]["create_account"]} + 'git_commit': {user_record["verify"]["git_commit"]} + 'dir_verify': {user_record["verify"]["dir_verify"]} + 'subscribe_mail_list': {user_record["verify"]["subscribe_mail_list"]} + 'notify_user': {user_record["notify"]["notify_user"]} + """ + if user_record["errmsg"]: + message += """ + + Error(s): + """ + for msg in user_record["errmsg"]: + message += msg + "\n" + + if args.dry_run: + logger.info(f"smtp = smtplib.SMTP({rcfg.Mail_server})") + logger.info( + f"smtp.sendmail({rcfg.Sender}, {rcfg.Admin_email}, message)" + ) + logger.info(message) + + else: + smtp = smtplib.SMTP(rcfg.Mail_server) + smtp.sendmail(rcfg.Sender, receivers, message) + + logger.debug(f"User report sent to: {rcfg.Admin_email}") + + +def insert_db(username, msg): + # Search username in db + record = table.find_one(username=username) + + if not record: + # SQL insert + table.insert( + { + "username": username, + "uid": msg.get("uid", -1), + "gid": msg.get("gid", -1), + "email": msg.get("email", ""), + "reason": msg.get("reason", ""), + "fullname": msg.get("fullname", ""), + "create_account": None, + "git_commit": None, + "dir_verify": None, + "subscribe_mail_list": None, + "notify_user": None, + "sent": None, + "reported": False, + "last_update": datetime.now(), + "queuename": msg.get("queuename", ""), + } + ) + + +def update_db(username, data): + obj = {"username": username, **data} + table.update(obj, ["username"]) + + +def task_manager(ch, method, properties, body): + msg = json.loads(body) + queuename = method.routing_key.split(".")[1] + username = msg["username"] + task_name = msg["task"] + success = msg["success"] + send = completed = terminated = False + routing_key = "" + + if username in tracking: + current = tracking[username] + + else: + user_db = table.find_one(username=username) + + current = tracking[username] = copy.deepcopy(record) + current["errmsg"] = [] + current["queuename"] = ( + user_db["queuename"] if user_db else msg["queuename"] + ) + current["uid"] = user_db["uid"] if user_db else msg["uid"] + current["gid"] = user_db["gid"] if user_db else msg["gid"] + current["email"] = user_db["email"] if user_db else msg["email"] + current["reason"] = user_db["reason"] if user_db else msg["reason"] + current["fullname"] = ( + user_db["fullname"] if user_db else msg["fullname"] + ) + + if user_db: + # Restore task status + current["request"]["create_account"] = user_db["create_account"] + current["verify"]["git_commit"] = user_db["git_commit"] + current["verify"]["dir_verify"] = user_db["dir_verify"] + current["verify"]["subscribe_mail_list"] = user_db[ + "subscribe_mail_list" + ] + current["notify"]["notify_user"] = user_db["notify_user"] + + current["reported"] = user_db["reported"] + + for t in ["git_commit", "dir_verify", "subscribe_mail_list"]: + if user_db[t] is None: + current["waiting"].add(t) + + if not current["waiting"] and user_db["notify_user"] is None: + current["waiting"].add("notify_user") + + logger.debug(f"Loaded user {username} from DB") + + else: + insert_db(username, msg) + + logger.debug(f"Tracking user {username}") + + current["last_update"] = datetime.now() + + # Update Database + update_db( + username, + {task_name: success, "last_update": current["last_update"]}, + ) + + # Save error message if the task was failed + if not success: + errmsg = msg.get("errmsg", "") + if errmsg: + current["errmsg"].append(f"{task_name}: {errmsg}") + + # Define message that's going to be published + message = { + "username": username, + "queuename": queuename, + "uid": current["uid"], + "gid": current["gid"], + "email": current["email"], + "reason": current["reason"], + "fullname": current["fullname"], + } + + try: + if task_name in current["request"]: + current["request"][task_name] = success + routing_key = "verify." + queuename + + # Terminate the process if failed + if not success: + terminated = True + routing_key = "complete." + queuename + message["success"] = False + message["errmsg"] = current["errmsg"] + + send = True + current["waiting"] = { + "git_commit", + "dir_verify", + "subscribe_mail_list", + } + logger.debug(f"Request level {task_name}? {success}") + + elif task_name in current["verify"]: + current["verify"][task_name] = success + current["waiting"].discard(task_name) + routing_key = "notify." + queuename + + if not current["waiting"]: + send = True + current["waiting"] = {"notify_user"} + + # Terminate if dir_verify failed and all agents has responsed + if send and not current["verify"]["dir_verify"]: + terminated = True + routing_key = "complete." + queuename + message["success"] = False + message["errmsg"] = current["errmsg"] + + logger.debug(f"Verify level {task_name}? {success}") + + elif task_name in current["notify"]: + current["notify"][task_name] = success + current["waiting"].discard(task_name) + routing_key = "complete." + queuename + message["success"] = success + message["errmsg"] = current["errmsg"] + + send = True + + # The whole creation process has completed + completed = True + + logger.debug(f"Notify level {task_name}? {success}") + + except Exception: + logger.error("", exc_info=True) + + if send: + # Send trigger message + rc_rmq.publish_msg({"routing_key": routing_key, "msg": message}) + + logger.debug(f"Trigger message '{routing_key}' sent") + + logger.debug("Previous level messages acknowledged") + + # Send report to admin + if completed or terminated: + + notify_admin(username, current) + + update_db(username, {"reported": True}) + + rc_util.update_state( + username, "ok", msg.get("updated_by"), msg.get("host") + ) + + tracking.pop(username) + + logger.debug("Admin report sent") + + # Acknowledge message + ch.basic_ack(method.delivery_tag) + + +def timeout_handler(signum, frame): + current_time = datetime.now() + for user in tuple(tracking): + delta = current_time - tracking[user]["last_update"] + + if delta.seconds > timeout: + + rc_rmq.publish_msg( + { + "routing_key": "complete." + user, + "msg": { + "username": user, + "success": False, + "errmsg": [ + "Timeout on " + + ", ".join(tracking[user]["waiting"]) + ], + }, + } + ) + + notify_admin(user, tracking[user]) + + update_db(user, {"reported": True}) + + tracking.pop(user) + + +# Set initial timeout timer +signal.signal(signal.SIGALRM, timeout_handler) +signal.setitimer(signal.ITIMER_REAL, timeout, timeout) + +logger.info(f"Start listening to queue: {task}") +rc_rmq.start_consume( + {"queue": task, "routing_key": "confirm.*", "cb": task_manager} +) + +logger.info("Disconnected") +rc_rmq.disconnect() diff --git a/prod_rmq_agents/user_reg_event_logger.py b/prod_rmq_agents/user_reg_event_logger.py new file mode 100644 index 0000000000000000000000000000000000000000..177b2c97ff543e0f05936991a1512fa6a975cfdf --- /dev/null +++ b/prod_rmq_agents/user_reg_event_logger.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python +import json +from rc_rmq import RCRMQ + +task = "user_reg_event_log" + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) + + +# Define your callback function +def log_user_reg_events(ch, method, properties, body): + + # Retrieve message + msg = json.loads(body) + + # Retrieve routing key + routing_key = method.routing_key + action = routing_key.split(".")[0] + user = routing_key.split(".")[1] + print(f"Got a {action} message for {user} with routing key: {routing_key}") + print(msg) + + # Acknowledge message + ch.basic_ack(delivery_tag=method.delivery_tag) + + +print("Start listening to queue: {}".format(task)) +rc_rmq.start_consume( + { + "queue": task, # Define your Queue name + "routing_key": "#", # Define your routing key + "cb": log_user_reg_events, # Pass in callback function you just define + } +) diff --git a/prod_rmq_agents/user_reg_logger.py b/prod_rmq_agents/user_reg_logger.py new file mode 100755 index 0000000000000000000000000000000000000000..ea8f9eed894c93c304f7c755c3f74f69b72f7029 --- /dev/null +++ b/prod_rmq_agents/user_reg_logger.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python +import json +import dataset +import rc_util +from rc_rmq import RCRMQ +from datetime import datetime +import rabbit_config as rcfg + +# Define queue name +task = "reg_logger" + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) + +# Parse arguments +args = rc_util.get_args() + +# Logger +logger = rc_util.get_logger() + +# Open registry table in DB +db = dataset.connect(f"sqlite:///{rcfg.db_path}/reg_logger.db") +account_req_table = db["registry"] + + +# Define registration logger callback +def log_registration(ch, method, properties, body): + + account_req = json.loads(body) + account_req["req_time"] = datetime.now() + account_req_table.insert(account_req) + logger.info("logged account request for %s", account_req["username"]) + + ch.basic_ack(delivery_tag=method.delivery_tag) + + +logger.info("Start listening to queue: {}".format(task)) + +# Start consuming messages from queue with callback function +rc_rmq.start_consume( + {"queue": task, "routing_key": "request.*", "cb": log_registration} +) diff --git a/prod_rmq_agents/user_state.py b/prod_rmq_agents/user_state.py new file mode 100644 index 0000000000000000000000000000000000000000..208af16b4b68886161c79683f92ad77ab01df8e3 --- /dev/null +++ b/prod_rmq_agents/user_state.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python +import json +import rc_util +import dataset +import pika +from rc_rmq import RCRMQ +from datetime import datetime +import rabbit_config as rcfg + +task = "user_state" + +args = rc_util.get_args() +logger = rc_util.get_logger(args) + +db = dataset.connect(f"sqlite:///{rcfg.db_path}/user_reg.db") +table = db["user_state"] + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"}) + + +def user_state(ch, method, properties, body): + msg = json.loads(body) + username = msg["username"] + updated_by = msg.get("updated_by") + host = msg.get("host") + op = msg["op"] + msg["success"] = False + errmsg = "" + + corr_id = properties.correlation_id + reply_to = properties.reply_to + + try: + + if op == "get": + errmsg = "Getting latest state of {username}" + record = table.find_one(username=username, order_by="-date") + + if record: + msg["state"] = record["state"] + logger.debug( + f'The latest state of {username} is {msg["state"]}' + ) + else: + msg["state"] = "no-account" + + elif op == "post": + state = msg["state"] + errmsg = "Updating state of {username} to {state}" + table.insert( + { + "username": username, + "state": state, + "date": datetime.now(), + "updated_by": updated_by, + "host": host, + } + ) + logger.debug(f"User {username} state updates to {state}") + + msg["success"] = True + except Exception: + logger.error("", exc_info=True) + msg["errmsg"] = errmsg if errmsg else "Unexpected error" + + # Send response + if reply_to: + props = pika.BasicProperties(correlation_id=corr_id) + rc_rmq.publish_msg( + {"routing_key": reply_to, "msg": msg, "props": props} + ) + + # Acknowledge the message + ch.basic_ack(delivery_tag=method.delivery_tag) + + +if __name__ == "__main__": + logger.info(f"Start listening to queue: {task}") + rc_rmq.start_consume({"queue": task, "cb": user_state}) + + logger.info("Disconnected") + rc_rmq.disconnect() diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000000000000000000000000000000000000..1d15ad7d9a23fece8d2321b3f09abba072fa4c6e --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,3 @@ +[tool.black] +line-length=79 +target-version=['py36'] diff --git a/rabbit_config.py.example b/rabbit_config.py.example index 5643bb163e71c667dffdb6890c854b84d3810596..d1f3a924a21a81e307c9d607804a4ab54b5ff502 100644 --- a/rabbit_config.py.example +++ b/rabbit_config.py.example @@ -4,3 +4,55 @@ Password = 'CHANGE_IT_TO_YOUR_OWN_PASSWORD' VHost = '/' Server = 'ohpc' Port = 5672 + +Valid_state = ["ok", "blocked", "certification"] + +# Default function timeout +Function_timeout = 30 + +# time delay to let account creation finish +# to avoid concurrency with downstream agents +Delay = 5 + +# dir_verify agent config +User_dirs = ['/home', '/data/user', '/data/scratch'] + +# git_commit agent config +rc_users_ldap_repo_loc = "~/git/rc-users" +db_path = ".agent_db" + +# Config related to email +Mail_server = 'localhost' +Admin_email = 'root@localhost' +Sender = 'ROOT@LOCALHOST' +Sender_notification = 'NOTIFY@LOCALHOST' +Sender_subscription = 'SUBSCRIBE_EMAIL@LOCALHOST' +Sender_alias = 'Services' +Subject = 'New User Account' +Info_url = 'https://www.google.com' +Mail_list = 'root@localhost' +Mail_list_bcc = 'cmsupport@localhost' +Support_email = 'support@listserv.uab.edu' + +Head = f"""From: {Sender_alias} <{Sender}> +To: <{{{{ to }}}}> +Subject: {Subject} +""" + +Body = f""" +Hi {{{{ username }}}} +Your account has been set up with: + +============================ +User ID: {{{{ username }}}} +============================ + +If you have any questions, please visit: +{Info_url} + +or email at {Admin_email} + +Cheers, +""" + +Whole_mail = Head + Body diff --git a/rc_rmq.py b/rc_rmq.py index 37afa3f44f872454a3b96fccf2d5a6a538fe01cf..8227982803d0033dc6855fd2de8685fd65dfefbe 100644 --- a/rc_rmq.py +++ b/rc_rmq.py @@ -3,26 +3,24 @@ import pika import socket import rabbit_config as rcfg + class RCRMQ(object): - USER = 'guest' - PASSWORD = 'guest' - HOST = 'localhost' + USER = "guest" + PASSWORD = "guest" + HOST = "localhost" PORT = 5672 - VHOST = '/' - EXCHANGE = '' - EXCHANGE_TYPE = 'direct' - QUEUE = None - DURABLE = True - ROUTING_KEY = None + VHOST = "/" + EXCHANGE = "" + EXCHANGE_TYPE = "direct" DEBUG = False def __init__(self, config=None, debug=False): if config: - if 'exchange' in config: - self.EXCHANGE = config['exchange'] - if 'exchange_type' in config: - self.EXCHANGE_TYPE = config['exchange_type'] + if "exchange" in config: + self.EXCHANGE = config["exchange"] + if "exchange_type" in config: + self.EXCHANGE_TYPE = config["exchange_type"] hostname = socket.gethostname().split(".", 1)[0] @@ -34,7 +32,8 @@ class RCRMQ(object): self.DEBUG = debug if self.DEBUG: - print(""" + print( + """ Created RabbitMQ instance with: Exchange name: {}, Exchange type: {}, @@ -42,70 +41,104 @@ class RCRMQ(object): User: {}, VHost: {}, Port: {} - """.format(self.EXCHANGE, self.EXCHANGE_TYPE, self.HOST, self.USER, self.VHOST, self.PORT)) + """.format( + self.EXCHANGE, + self.EXCHANGE_TYPE, + self.HOST, + self.USER, + self.VHOST, + self.PORT, + ) + ) self._consumer_tag = None self._connection = None self._consuming = False self._channel = None self._parameters = pika.ConnectionParameters( - self.HOST, - self.PORT, - self.VHOST, - pika.PlainCredentials(self.USER, self.PASSWORD)) + self.HOST, + self.PORT, + self.VHOST, + pika.PlainCredentials(self.USER, self.PASSWORD), + ) def connect(self): if self.DEBUG: - print("Connecting...\n" + "Exchange: " + self.EXCHANGE + " Exchange type: " + self.EXCHANGE_TYPE) + print( + "Connecting...\n" + + "Exchange: " + + self.EXCHANGE + + " Exchange type: " + + self.EXCHANGE_TYPE + ) self._connection = pika.BlockingConnection(self._parameters) self._channel = self._connection.channel() self._channel.exchange_declare( - exchange=self.EXCHANGE, - exchange_type=self.EXCHANGE_TYPE, - durable=True) + exchange=self.EXCHANGE, + exchange_type=self.EXCHANGE_TYPE, + durable=True, + ) + + def bind_queue( + self, queue="", routing_key=None, durable=True, exclusive=False + ): + + if self._connection is None: + self.connect() + + result = self._channel.queue_declare( + queue=queue, durable=durable, exclusive=exclusive + ) - def bind_queue(self): - self._channel.queue_declare(queue=self.QUEUE, durable=self.DURABLE) - self._channel.queue_bind(exchange=self.EXCHANGE, - queue=self.QUEUE, - routing_key=self.ROUTING_KEY) + self._channel.queue_bind( + exchange=self.EXCHANGE, + queue=result.method.queue, + routing_key=routing_key, + ) + + return result.method.queue def disconnect(self): - self._channel.close() - self._connection.close() - self._connection = None + if self._connection: + self._channel.close() + self._connection.close() + self._connection = None - def delete_queue(self): - self._channel.queue_delete(self.QUEUE) + def delete_queue(self, queue): + self._channel.queue_delete(queue) def publish_msg(self, obj): - if 'routing_key' in obj: - self.ROUTING_KEY = obj['routing_key'] + routing_key = obj.get("routing_key") + props = obj.get("props") if self._connection is None: self.connect() - self._channel.basic_publish(exchange=self.EXCHANGE, - routing_key=self.ROUTING_KEY, - body=json.dumps(obj['msg'])) + self._channel.basic_publish( + exchange=self.EXCHANGE, + routing_key=routing_key, + properties=props, + body=json.dumps(obj["msg"]), + ) def start_consume(self, obj): - if 'queue' in obj: - self.QUEUE = obj['queue'] - self.ROUTING_KEY = obj['routing_key'] if 'routing_key' in obj else self.QUEUE - if 'durable' in obj: - self.DURABLE = obj['durable'] - - if self.DEBUG: - print("Queue: " + self.QUEUE + "\nRouting_key: " + self.ROUTING_KEY) + queue = obj.get("queue", "") + routing_key = obj.get("routing_key", queue or None) + durable = obj.get("durable", True) + exclusive = obj.get("exclusive", False) + bind = obj.get("bind", True) if self._connection is None: self.connect() - self.bind_queue() + if bind: + self.bind_queue(queue, routing_key, durable, exclusive) + + if self.DEBUG: + print("Queue: " + queue + "\nRouting_key: " + routing_key) - self._consumer_tag = self._channel.basic_consume(self.QUEUE,obj['cb']) + self._consumer_tag = self._channel.basic_consume(queue, obj["cb"]) self._consuming = True try: self._channel.start_consuming() diff --git a/rc_util.py b/rc_util.py index 0e7c4c1e6ec7d0cba4cc7370cf35370efa7f7354..89f84e9169ad95fee359906673b6c473cd4c894f 100644 --- a/rc_util.py +++ b/rc_util.py @@ -1,60 +1,151 @@ +import errno +import functools +import os +import signal import logging import argparse +import pika +import pwd +import uuid from rc_rmq import RCRMQ import json +from urllib.parse import quote +from time import sleep +import rabbit_config as rcfg + +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) +tasks = { + "create_account": None, + "git_commit": None, + "dir_verify": None, + "subscribe_mail_list": None, + "notify_user": None, +} +logger_fmt = "%(asctime)s [%(module)s] - %(message)s" + + +class TimeoutError(Exception): + pass + + +# From https://stackoverflow.com/questions/2281850 +def timeout(seconds=30, error_message=os.strerror(errno.ETIME)): + def decorator(func): + def _handle_timeout(signum, frame): + raise TimeoutError(error_message) + + @functools.wraps(func) + def wrapper(*args, **kwargs): + signal.signal(signal.SIGALRM, _handle_timeout) + signal.alarm(seconds) + try: + result = func(*args, **kwargs) + finally: + signal.alarm(0) + return result + + return wrapper + + return decorator + + +def add_account( + username, queuename, email, full="", reason="", updated_by="", host="" +): + rc_rmq.publish_msg( + { + "routing_key": "request." + queuename, + "msg": { + "username": username, + "email": email, + "fullname": full, + "reason": reason, + "queuename": queuename, + "updated_by": updated_by, + "host": host, + }, + } + ) + rc_rmq.disconnect() + + +def certify_account( + username, queuename, state="ok", service="all", updated_by="", host="" +): + rc_rmq.publish_msg( + { + "routing_key": "acctmgr.request." + queuename, + "msg": { + "username": username, + "service": service, + "state": state, + "queuename": queuename, + "updated_by": updated_by, + "host": host, + }, + } + ) + rc_rmq.disconnect() -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) -tasks = {'ohpc_account': None, 'ood_account': None, 'slurm_account': None} -logger_fmt = '%(asctime)s [%(module)s] - %(message)s' - -def add_account(username, email, full='', reason=''): - rc_rmq.publish_msg({ - 'routing_key': 'request.' + username, - 'msg': { - "username": username, - "email": email, - "fullname": full, - "reason": reason - } - }) - rc_rmq.disconnect() def worker(ch, method, properties, body): msg = json.loads(body) - task = msg['task'] - tasks[task] = msg['success'] - print("Got msg: {}({})".format(msg['task'], msg['success'])) - - # Check if all tasks are done - done = True - for key, status in tasks.items(): - if not status: - print("{} is not done yet.".format(key)) - done = False - if done: - rc_rmq.stop_consume() - rc_rmq.delete_queue() - -def consume(username, callback=worker, debug=False): + username = msg["username"] + + if msg["success"]: + print(f"Account for {username} has been created.") + else: + print(f"There's some issue while creating account for {username}") + errmsg = msg.get("errmsg", []) + for err in errmsg: + print(err) + + rc_rmq.stop_consume() + rc_rmq.delete_queue() + + +def consume( + queuename, + routing_key="", + callback=worker, + bind=True, + durable=True, + exclusive=False, + debug=False, +): + if routing_key == "": + routing_key = "complete." + queuename + if debug: sleep(5) else: - rc_rmq.start_consume({ - 'queue': username, - 'routing_key': 'confirm.' + username, - 'cb': callback - }) + rc_rmq.start_consume( + { + "queue": queuename, + "routing_key": routing_key, + "bind": bind, + "durable": durable, + "exclusive": exclusive, + "cb": callback, + } + ) rc_rmq.disconnect() - return { 'success' : True } + return {"success": True} + def get_args(): # Parse arguments parser = argparse.ArgumentParser() - parser.add_argument('-v', '--verbose', action='store_true', help='verbose output') - parser.add_argument('-n', '--dry-run', action='store_true', help='enable dry run mode') + parser.add_argument( + "-v", "--verbose", action="store_true", help="verbose output" + ) + parser.add_argument( + "-n", "--dry-run", action="store_true", help="enable dry run mode" + ) return parser.parse_args() + def get_logger(args=None): if args is None: args = get_args() @@ -70,3 +161,125 @@ def get_logger(args=None): logging.basicConfig(format=logger_fmt, level=logger_lvl) return logging.getLogger(__name__) + +def encode_name(uname): + uname_quote = quote(uname) + if "." in uname_quote: + uname_quote = uname_quote.replace(".", "%2E") + return uname_quote + + +@timeout(rcfg.Function_timeout) +def check_state(username, debug=False): + corr_id = str(uuid.uuid4()) + result = "" + rpc_queue = "user_state" + + def handler(ch, method, properties, body): + if debug: + print("Message received:") + print(body) + + nonlocal corr_id + nonlocal result + msg = json.loads(body) + + if corr_id == properties.correlation_id: + if not msg["success"]: + print("Something's wrong, please try again.") + else: + result = msg.get("state") + + rc_rmq.stop_consume() + rc_rmq.disconnect() + + callback_queue = rc_rmq.bind_queue(exclusive=True) + + if debug: + print(f"Checking state of user {username}") + print(f"Callback queue: {callback_queue}, correlation_id: {corr_id}") + + rc_rmq.publish_msg( + { + "routing_key": rpc_queue, + "props": pika.BasicProperties( + correlation_id=corr_id, reply_to=callback_queue + ), + "msg": {"op": "get", "username": username}, + } + ) + + rc_rmq.start_consume( + { + "queue": callback_queue, + "exclusive": True, + "bind": False, + "cb": handler, + } + ) + + return result + + +@timeout(rcfg.Function_timeout) +def update_state(username, state, updated_by="", host="", debug=False): + + if state not in rcfg.Valid_state: + print(f"Invalid state '{state}'") + return False + + corr_id = str(uuid.uuid4()) + result = "" + rpc_queue = "user_state" + + def handler(ch, method, properties, body): + if debug: + print("Message received:") + print(body) + + nonlocal corr_id + nonlocal result + msg = json.loads(body) + + if corr_id == properties.correlation_id: + if not msg["success"]: + print("Something's wrong, please try again.") + + result = msg["success"] + rc_rmq.stop_consume() + rc_rmq.disconnect() + + callback_queue = rc_rmq.bind_queue(exclusive=True) + + rc_rmq.publish_msg( + { + "routing_key": rpc_queue, + "props": pika.BasicProperties( + reply_to=callback_queue, correlation_id=corr_id + ), + "msg": { + "op": "post", + "username": username, + "state": state, + "updated_by": updated_by, + "host": host, + }, + } + ) + + rc_rmq.start_consume( + { + "queue": callback_queue, + "exclusive": True, + "bind": False, + "cb": handler, + } + ) + + return result + + +def get_caller_info(): + username = pwd.getpwuid(os.getuid()).pw_name + hostname = os.uname().nodename + return (username, hostname) diff --git a/requirements.txt b/requirements.txt index becc2ce1d48131ef086f528906361c6b94cd5487..2fd94d50a0505442d2fbdbd98fc6db4f7a6277f6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,6 @@ pika==1.1.0 +dataset==1.3.1 +Jinja2==2.11.2 +sh==1.12.14 +pre-commit==2.12.1 +greenlet==1.1.3 diff --git a/slurm_agent.py b/slurm_agent.py deleted file mode 100755 index e07af9bd233b952aa385848a4504859f31a22177..0000000000000000000000000000000000000000 --- a/slurm_agent.py +++ /dev/null @@ -1,42 +0,0 @@ -#!/usr/bin/env python -import sys -import json -import subprocess -from rc_rmq import RCRMQ - -task = 'slurm_account' - -# Instantiate rabbitmq object -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) - -def slurm_account_create(ch, method, properties, body): - msg = json.loads(body) - print("Message received {}".format(msg)) - username = msg['username'] - success = False - try: - subprocess.call(["sudo", "sacctmgr", "add", "account", username, "-i", "Descripition: Add user"]) - subprocess.call(["sudo", "sacctmgr", "add", "user", username, "account=" + username, "-i"]) - print("SLURM account for user {} has been added".format(username)) - success = True - except: - e = sys.exc_info()[0] - print("[{}]: Error: {}".format(task, e)) - - ch.basic_ack(delivery_tag=method.delivery_tag) - - # send confirm message - rc_rmq.publish_msg({ - 'routing_key': 'confirm.' + username, - 'msg': { - 'task': task, - 'success': success - } - }) - -print("Start listening to queue: {}".format(task)) -rc_rmq.start_consume({ - 'queue': task, - 'routing_key': "create.*", - 'cb': slurm_account_create -}) diff --git a/user_auth.py b/user_auth.py new file mode 100644 index 0000000000000000000000000000000000000000..54b8f8ad93acf25980b393377e7b7c6806ad7a3c --- /dev/null +++ b/user_auth.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python3 +import sys +import rc_util +import subprocess + +# During migration of this new script for ood +# e.g. not all of users are in the db +migration = True +default_state = "ok" +# migration = False # uncomment after migration's done +remote_user = sys.argv[1] + +result = rc_util.check_state(remote_user) + +if result == "ok": + print(remote_user) +else: + if migration and result == "no-account": + rc = subprocess.run( + ["getent", "passwd", remote_user], stdout=subprocess.DEVNULL + ).returncode + if rc == 0: + rc_util.update_state(remote_user, default_state) + print(remote_user) + sys.exit() + print()