diff --git a/create_account.py b/create_account.py index ef77d78b9eed202c406e127fd9b816dbf18dfd10..d46906ff10aa105ffa2688a4406881ea85692840 100755 --- a/create_account.py +++ b/create_account.py @@ -1,25 +1,54 @@ #!/usr/bin/env python3 +import json import sys import rc_util +import argparse +import signal -if len(sys.argv) < 2: - print("Usage: {} USERNAME [EMAIL] [FULL_NAME] [REASON]".format(sys.argv[0]), file=sys.stderr) - exit(1) +parser = argparse.ArgumentParser() +parser.add_argument('username', help='username that will be created') +parser.add_argument('email', nargs='?', default='', help="User's email") +parser.add_argument('full_name', nargs='?', default='', help="User's full name") +parser.add_argument('reason', nargs='?', default='', help='Reason of requesting') +parser.add_argument('--domain', default='localhost', help='domain of email') +parser.add_argument('-v', '--verbose', action='store_true', help='verbose output') +parser.add_argument('-n', '--dry-run', action='store_true', help='enable dry run mode') +args = parser.parse_args() -domain = 'uab.edu' -user_name = sys.argv[1] -email = sys.argv[2] if len(sys.argv) >= 3 else '' -full_name = sys.argv[3] if len(sys.argv) >= 4 else '' -reason = sys.argv[4] if len(sys.argv) >= 5 else '' +timeout = 60 -if email == '': - if '@' in user_name: - email = user_name +if args.email == '': + args.email = args.username + if '@' not in args.email: + args.email = args.username + '@' + args.domain + +def timeout_handler(signum, frame): + print("Process timeout, there's might some issue with agents") + rc_util.rc_rmq.stop_consume() + + +def callback(channel, method, properties, body): + msg = json.loads(body) + username = msg['username'] + + if msg['success']: + print(f'Account for {username} has been created.') else: - email = user_name + '@' + domain + print(f"There's some issue while creating account for {username}") + errmsg = msg.get('errmsg', []) + for err in errmsg: + print(err) + + rc_util.rc_rmq.stop_consume() + rc_util.rc_rmq.delete_queue() + + +rc_util.add_account(args.username, email=args.email, full=args.full_name, reason=args.reason) +print(f'Account for {args.username} requested.') -rc_util.add_account(user_name, email=email, full=full_name, reason=reason) -print("Account requested for user: {}".format(user_name)) +# Set initial timeout timer +signal.signal(signal.SIGALRM, timeout_handler) +signal.setitimer(signal.ITIMER_REAL, timeout) -print("Waiting for confirmation...") -rc_util.consume(user_name) +print('Waiting for completion...') +rc_util.consume(args.username, routing_key=f'complete.{args.username}', callback=callback) diff --git a/ohpc_account_create.py b/dev_rmq_agents/ohpc_account_create.py similarity index 100% rename from ohpc_account_create.py rename to dev_rmq_agents/ohpc_account_create.py diff --git a/ood_account_create.py b/dev_rmq_agents/ood_account_create.py similarity index 100% rename from ood_account_create.py rename to dev_rmq_agents/ood_account_create.py diff --git a/slurm_agent.py b/dev_rmq_agents/slurm_agent.py similarity index 100% rename from slurm_agent.py rename to dev_rmq_agents/slurm_agent.py diff --git a/mail_config.py.example b/mail_config.py.example new file mode 100644 index 0000000000000000000000000000000000000000..434cb44e7928dacc9143afd00ad0cdf3f8b87679 --- /dev/null +++ b/mail_config.py.example @@ -0,0 +1,29 @@ +import rabbit_config as rcfg + +Head = f"""From: {rcfg.Sender_alias} <{rcfg.Sender}> +To: <{{{{ to }}}}> +Subject: {rcfg.Subject} +""" + +Body = f""" +Hi {{{{ username }}}} +Your account has been set up with: + +============================ +User ID: {{{{ username }}}} +============================ + +If you have any questions, please visit: +{rcfg.Info_url} + +or email at {rcfg.Admin_email} + +Cheers, +""" + +Whole_mail = Head + Body + +UserReportHead = f"""From: {rcfg.Sender_alias} <{rcfg.Sender}> +To: <{rcfg.Admin_email}> +Subject: RC Account Creation Report: {{{{ fullname }}}}, {{{{ username }}}} """ + diff --git a/prod_rmq_agents/dir_verify.py b/prod_rmq_agents/dir_verify.py new file mode 100644 index 0000000000000000000000000000000000000000..156db1cff7f62ac3dea884614f37d716b6846acf --- /dev/null +++ b/prod_rmq_agents/dir_verify.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python +import os +import sys +import json +import shutil +import rc_util +from pathlib import Path +from rc_rmq import RCRMQ +import rabbit_config as rcfg + +task = 'dir_verify' +dirs = rcfg.User_dirs + +args = rc_util.get_args() +logger = rc_util.get_logger(args) + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) + +def dir_verify(ch, method, properties, body): + msg = json.loads(body) + username = msg['username'] + msg['task'] = task + msg['success'] = True + + missing_dirs = [] + + try: + for d in dirs: + path = Path(d) / msg['username'] + + if args.dry_run: + logger.info(f'Checking dirs: {path}') + + else: + if not path.exists(): + # check if dirs exist and record any missing dirs + missing_dirs.append(path) + msg['success'] = False + msg['errmsg'] = f"Error: missing dirs {missing_dirs}" + logger.info(f'{path} does not exist') + else: + # check existing dirs for correct ownership and permissions + status = os.stat(path) + mask = oct(status.st_mode)[-3:] + uid = str(status.st_uid) + gid = str(status.st_gid) + if mask!='700' or uid!=msg['uid'] or gid!=msg['gid']: + msg['success'] = False + msg['errmsg'] = f"Error: dir {path} permissions or ownership are wrong" + + except Exception as exception: + msg['success'] = False + msg['errmsg'] = "Exception raised, check the logs for stack trace" + logger.error('', exc_info=True) + + # send confirm message + rc_rmq.publish_msg({ + 'routing_key': 'confirm.' + username, + 'msg': msg + }) + + logger.debug(f'User {username} confirmation sent') + + ch.basic_ack(delivery_tag=method.delivery_tag) + + +logger.info(f'Start listening to queue: {task}') +rc_rmq.start_consume({ + 'queue': task, + 'routing_key': "verify.*", + 'cb': dir_verify +}) + +logger.info('Disconnected') +rc_rmq.disconnect() diff --git a/prod_rmq_agents/get-next-uid-gid.py b/prod_rmq_agents/get-next-uid-gid.py new file mode 100644 index 0000000000000000000000000000000000000000..565501b7cf7dc1e371718ad01992e73fe516ce8c --- /dev/null +++ b/prod_rmq_agents/get-next-uid-gid.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python +import sys +import json +import ldap +import time +import logging +import argparse +import rc_util +from os import popen +from rc_rmq import RCRMQ +import rabbit_config as rcfg + +task = 'create_account' + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) + +args = rc_util.get_args() + +# Logger +logger = rc_util.get_logger() + +#Account creation +def create_account(msg): + + logger.info(f'Account creation request received: {msg}') + username = msg['username'] + uid = msg['uid'] + email = msg['email'] + fullname = msg['fullname'] + msg['success'] = False + + # Bright command to create user + cmd = '/cm/local/apps/cmd/bin/cmsh -c ' + cmd += f'"user; add {username}; set id {uid}; set email {email}; set commonname \\"{fullname}\\"; ' + cmd += 'commit;"' + + if not args.dry_run: + popen(cmd) + time.sleep(rcfg.Delay) + logger.info(f'Bright command to create user:{cmd}') + +# Define your callback function +def resolve_uid_gid(ch, method, properties, body): + + # Retrieve message + msg = json.loads(body) + logger.info("Received {}".format(msg)) + username = msg['username'] + msg['success'] = False + + # Determine next available UID + try: + user_exists_cmd = f"/usr/bin/getent passwd {username}" + user_exists = popen(user_exists_cmd).read().rstrip() + + if user_exists: + logger.info("The user, {} already exists".format(username)) + msg['uid'] = user_exists.split(':')[2] + msg['gid'] = user_exists.split(':')[3] + + else: + cmd_uid = "/usr/bin/getent passwd | \ + awk -F: '($3>10000) && ($3<20000) && ($3>maxuid) { maxuid=$3; } END { print maxuid+1; }'" + msg['uid'] = popen(cmd_uid).read().rstrip() + logger.info(f"UID query: {cmd_uid}") + + cmd_gid = "/usr/bin/getent group | \ + awk -F: '($3>10000) && ($3<20000) && ($3>maxgid) { maxgid=$3; } END { print maxgid+1; }'" + msg['gid'] = popen(cmd_gid).read().rstrip() + logger.info(f"GID query: {cmd_gid}") + + create_account(msg) + msg['task'] = task + msg['success'] = True + except Exception as exception: + msg['success'] = False + msg['errmsg'] = f"Exception raised during account creation, check logs for stack trace" + logger.error('', exc_info=True) + + # Acknowledge message + ch.basic_ack(delivery_tag=method.delivery_tag) + + # Send confirm message + logger.debug('rc_rmq.publish_msg()') + rc_rmq.publish_msg({ + 'routing_key': 'confirm.' + username, + 'msg': msg + }) + logger.info('confirmation sent') + +logger.info("Start listening to queue: {}".format(task)) +rc_rmq.start_consume({ + 'queue': task, + 'routing_key': "request.*", + 'cb': resolve_uid_gid +}) + +logger.info("Disconnected") +rc_rmq.disconnect() diff --git a/prod_rmq_agents/git_commit.py b/prod_rmq_agents/git_commit.py new file mode 100644 index 0000000000000000000000000000000000000000..5c0de34d8582a97750228df0e37bb034e12955c7 --- /dev/null +++ b/prod_rmq_agents/git_commit.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python +import os +import sh +import sys +import json +import rc_util +from rc_rmq import RCRMQ +import rabbit_config as rmq_cfg + +task = 'git_commit' + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) + +# Define some location +repo_location = os.path.expanduser(rmq_cfg.rc_users_ldap_repo_loc) +users_dir = repo_location + '/users' +groups_dir = repo_location + '/groups' + +args = rc_util.get_args() +logger = rc_util.get_logger(args) + +if not args.dry_run: + git = sh.git.bake('--git-dir', repo_location+'/.git', '--work-tree', repo_location) + ldapsearch = sh.Command('ldapsearch') +else: + git = sh.echo.bake('--git-dir', repo_location+'/.git', '--work-tree', repo_location) + ldapsearch = sh.echo.bake('ldapsearch') + +def git_commit(ch, method, properties, body): + msg = json.loads(body) + username = msg['username'] + msg['task'] = task + msg['success'] = False + branch_name = 'issue-add-users-' + username.lower() + user_ldif = users_dir + f'/{username}.ldif' + group_ldif = groups_dir + f'/{username}.ldif' + + logger.info("Received: %s", msg) + logger.debug("branch_name: %s", branch_name) + + try: + + logger.debug('git checkout master') + git.checkout('master') + logger.debug('git pull') + git.pull() + branch_exists = git.branch('--list', branch_name) + if not branch_exists: + logger.debug('git checkout -b %s', branch_name) + git.checkout('-b', branch_name) + logger.debug("open(%s, 'w'), open(%s, 'w')", user_ldif, group_ldif) + with open(user_ldif, 'w') as ldif_u,\ + open(group_ldif, 'w') as ldif_g: + logger.debug(f"ldapsearch -LLL -x -H ldaps://ldapserver -b 'dc=cm,dc=cluster' uid={username} > {user_ldif}") + ldapsearch('-LLL', '-x', '-H', 'ldaps://ldapserver', '-b', "dc=cm,dc=cluster", f"uid={username}", _out=ldif_u) + logger.debug(f"ldapsearch -LLL -x -H ldapserver -b 'ou=Group,dc=cm,dc=cluster' cn={username} > {group_ldif}") + ldapsearch('-LLL', '-x', '-H', 'ldaps://ldapserver', '-b', "ou=Group,dc=cm,dc=cluster", f"cn={username}", _out=ldif_g) + logger.info('user ldif files generated.') + + logger.debug('git add %s', user_ldif) + git.add(user_ldif) + logger.debug('git add %s', group_ldif) + git.add(group_ldif) + logger.debug("git commit -m 'Added new cheaha user: %s'", username) + git.commit(m="Added new cheaha user: " + username) + logger.debug('git checkout master') + git.checkout('master') + + logger.debug('git merge %s --no-ff --no-edit', branch_name) + git.merge(branch_name, '--no-ff', '--no-edit') + logger.debug('git push origin master') + git.push('origin', 'master') + # merge with gitlab api + + logger.info('Added ldif files and committed to git repo') + + msg['success'] = True + except Exception as exception: + logger.error('', exc_info=True) + + # Send confirm message + logger.debug('rc_rmq.publish_msge()') + rc_rmq.publish_msg({ + 'routing_key': 'confirm.' + username, + 'msg': msg + }) + logger.info('confirmation sent') + + # Acknowledge message + logger.debug('ch.basic_ack()') + ch.basic_ack(delivery_tag=method.delivery_tag) + + +logger.info("Start listening to queue: %s", task) +rc_rmq.start_consume({ + 'queue': task, + 'routing_key': "verify.*", + 'cb': git_commit +}) + +logger.info("Disconnected") +rc_rmq.disconnect() diff --git a/prod_rmq_agents/notify_user.py b/prod_rmq_agents/notify_user.py new file mode 100644 index 0000000000000000000000000000000000000000..b7ed9489b30931e475331c713380ca5f1275eb0e --- /dev/null +++ b/prod_rmq_agents/notify_user.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python +import sys +import json +import rc_util +import smtplib +import dataset +from rc_rmq import RCRMQ +from jinja2 import Template +from datetime import datetime +import rabbit_config as rcfg +import mail_config as mail_cfg +task = 'notify_user' + +args = rc_util.get_args() +logger = rc_util.get_logger(args) + +db = dataset.connect(f'sqlite:///.agent_db/user_reg.db') +table = db['users'] + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) + +# Email instruction to user +def notify_user(ch, method, properties, body): + msg = json.loads(body) + username = msg['username'] + user_email = msg['email'] + msg['task'] = task + msg['success'] = False + errmsg = "" + + try: + + # Search username in database + record = table.find_one(username=username) + + if record['sent'] is not None: + errmsg = 'Updating database counter' + # Update counter + count = record['count'] + if args.dry_run: + logger.info('Update counter in database') + + else: + table.update({ + 'username': username, + 'count': count + 1 + }, ['username']) + + logger.debug(f'User {username} counter updated to {count + 1}') + + else: + # Send email to user + receivers = [user_email, rcfg.Admin_email] + message = Template(mail_cfg.Whole_mail).render(username=username, to=user_email) + + if args.dry_run: + logger.info(f'smtp = smtplib.SMTP({rcfg.Server})') + logger.info(f'smtp.sendmail({rcfg.Sender}, {receivers}, message)') + logger.info(f"table.update({{'username': {username}, 'count': 1, 'sent_at': datetime.now()}}, ['username'])") + + else: + errmsg = 'Sending email to user' + smtp = smtplib.SMTP(rcfg.Server) + smtp.sendmail(rcfg.Sender, receivers, message) + + logger.debug(f'Email sent to: {user_email}') + + errmsg = 'Updating database email sent time' + table.update({ + 'username': username, + 'count': 1, + 'sent': datetime.now() + }, ['username']) + + logger.debug(f'User {username} inserted into database') + + msg['success'] = True + except Exception as exception: + logger.error('', exc_info=True) + msg['errmsg'] = errmsg if errmsg else 'Unexpected error' + + # Send confirm message + rc_rmq.publish_msg({ + 'routing_key': 'confirm.' + username, + 'msg': msg + }) + + logger.debug(f'User {username} confirmation sent') + + # Acknowledge the message + ch.basic_ack(delivery_tag=method.delivery_tag) + + +if __name__ == "__main__": + logger.info(f'Start listening to queue: {task}') + rc_rmq.start_consume({ + 'queue': task, + 'routing_key': "notify.*", + 'cb': notify_user + }) + + logger.info('Disconnected') + rc_rmq.disconnect() diff --git a/prod_rmq_agents/subscribe_mail_lists.py b/prod_rmq_agents/subscribe_mail_lists.py new file mode 100644 index 0000000000000000000000000000000000000000..9ff0af16ef9a7e4628234657f1712d00740ec0f6 --- /dev/null +++ b/prod_rmq_agents/subscribe_mail_lists.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python +import sys +import json +import smtplib +import logging +import argparse +import rc_util +from email.message import EmailMessage +from rc_rmq import RCRMQ +import rabbit_config as rcfg + +task = 'subscribe_mail_list' + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) + +# Parse arguments +args = rc_util.get_args() + +# Logger +logger = rc_util.get_logger()# Define your callback function + +def mail_list_subscription(ch, method, properties, body): + + # Retrieve message + msg = json.loads(body) + logger.info("Received msg {}".format(msg)) + username = msg['username'] + fullname = msg['fullname'] + email = msg['email'] + + mail_list_admin = rcfg.Sender + mail_list = rcfg.Mail_list + mail_list_bcc = rcfg.Mail_list_bcc + server = rcfg.Server + + listserv_cmd = f'QUIET ADD hpc-announce {email} {fullname} \ + \nQUIET ADD hpc-users {email} {fullname}' + + logger.info("Adding user{} to mail list".format(username)) + msg['success'] = False + try: + # Create a text/plain message + email_msg = EmailMessage() + + email_msg['From'] = mail_list_admin + email_msg['To'] = mail_list + email_msg['Subject'] = '' + email_msg['Bcc'] = mail_list_bcc + + # Create an smtp object and send email + s = smtplib.SMTP(server) + + email_msg.set_content(listserv_cmd) + if not args.dry_run: + s.send_message(email_msg) + logging.info(f'This email will add user {username} to listserv \n{email_msg}') + + s.quit() + msg['task'] = task + msg['success'] = True + except Exception as exception: + logger.error('', exc_info=True) + + # Acknowledge message + ch.basic_ack(delivery_tag=method.delivery_tag) + + # send confirm message + logger.debug('rc_rmq.publish_msg()') + rc_rmq.publish_msg({ + 'routing_key': 'confirm.' + username, + 'msg': msg + }) + logger.info('confirmation sent') + +logger.info("Start listening to queue: {}".format(task)) +rc_rmq.start_consume({ + 'queue': task, # Define your Queue name + 'routing_key': "verify.*", # Define your routing key + 'cb': mail_list_subscription # Pass in callback function you just define +}) + +logger.info("Disconnected") +rc_rmq.disconnect() diff --git a/prod_rmq_agents/task_manager.py b/prod_rmq_agents/task_manager.py new file mode 100644 index 0000000000000000000000000000000000000000..795f7c322dce8ba1c5b2e40a583e685f86884c03 --- /dev/null +++ b/prod_rmq_agents/task_manager.py @@ -0,0 +1,299 @@ +#!/usr/bin/env python +import sys +import copy +import json +import signal +import dataset +import rc_util +import smtplib +from rc_rmq import RCRMQ +from jinja2 import Template +from datetime import datetime +import mail_config as mail_cfg +import rabbit_config as rcfg + +task = 'task_manager' +timeout = 30 + +args = rc_util.get_args() +logger = rc_util.get_logger(args) + +db = dataset.connect(f'sqlite:///.agent_db/user_reg.db') +table = db['users'] + +record = { + 'uid': -1, + 'gid': -1, + 'email': '', + 'reason': '', + 'fullname': '', + 'last_update': datetime.now(), + 'errmsg': None, + 'waiting': set(), + 'request': { + 'create_account': None + }, + 'verify': { + 'git_commit': None, + 'dir_verify': None, + 'subscribe_mail_list': None + }, + 'notify': { + 'notify_user': None + }, + 'reported': False +} + +# Currently tracking users +tracking = {} + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) + +def notify_admin(username, user_record): + receivers = [rcfg.Admin_email] + message = Template(mail_cfg.UserReportHead).render(username=username, fullname=user_record['fullname']) + if user_record['reported']: + message += ' (Duplicate)' + message += f""" \n + User Creation Report for user {username} + uid: {user_record["uid"]}, gid: {user_record["gid"]} + Tasks: + 'create_account': {user_record["request"]["create_account"]} + 'git_commit': {user_record["verify"]["git_commit"]} + 'dir_verify': {user_record["verify"]["dir_verify"]} + 'subscribe_mail_list': {user_record["verify"]["subscribe_mail_list"]} + 'notify_user': {user_record["notify"]["notify_user"]} + """ + if user_record['errmsg']: + message += """ + + Error(s): + """ + for msg in user_record['errmsg']: + message += msg + "\n" + + if args.dry_run: + logger.info(f'smtp = smtplib.SMTP({rcfg.Server})') + logger.info(f'smtp.sendmail({rcfg.Sender}, {rcfg.Admin_email}, message)') + logger.info(message) + + else: + smtp = smtplib.SMTP(rcfg.Server) + smtp.sendmail(rcfg.Sender, receivers, message) + + logger.debug(f'User report sent to: {rcfg.Admin_email}') + + +def insert_db(username, msg): + # Search username in db + record = table.find_one(username=username) + + if not record: + # SQL insert + table.insert({ + 'username': username, + 'uid': msg.get('uid', -1), + 'gid': msg.get('gid', -1), + 'email': msg.get('email', ''), + 'reason': msg.get('reason', ''), + 'fullname': msg.get('fullname', ''), + 'create_account': None, + 'git_commit': None, + 'dir_verify': None, + 'subscribe_mail_list': None, + 'notify_user': None, + 'sent': None, + 'reported': False, + 'last_update': datetime.now() + }) + + +def update_db(username, data): + obj = { 'username': username, **data } + table.update(obj, ['username']) + + +def task_manager(ch, method, properties, body): + msg = json.loads(body) + username = method.routing_key.split('.')[1] + task_name = msg['task'] + success = msg['success'] + send = completed = terminated = False + routing_key = "" + + if username in tracking: + current = tracking[username] + + else: + user_db = table.find_one(username=username) + + current = tracking[username] = copy.deepcopy(record) + current['errmsg'] = [] + current['uid'] = user_db['uid'] if user_db else msg['uid'] + current['gid'] = user_db['gid'] if user_db else msg['gid'] + current['email'] = user_db['email'] if user_db else msg['email'] + current['reason'] = user_db['reason'] if user_db else msg['reason'] + current['fullname'] = user_db['fullname'] if user_db else msg['fullname'] + + if user_db: + # Restore task status + current['request']['create_account'] = user_db['create_account'] + current['verify']['git_commit'] = user_db['git_commit'] + current['verify']['dir_verify'] = user_db['dir_verify'] + current['verify']['subscribe_mail_list'] = user_db['subscribe_mail_list'] + current['notify']['notify_user'] = user_db['notify_user'] + + current['reported'] = user_db['reported'] + + for t in ['git_commit', 'dir_verify', 'subscribe_mail_list']: + if user_db[t] is None: + current['waiting'].add(t) + + if not current['waiting'] and user_db['notify_user'] is None: + current['waiting'].add('notify_user') + + logger.debug(f'Loaded user {username} from DB') + + else: + insert_db(username, msg) + + logger.debug(f'Tracking user {username}') + + current['last_update'] = datetime.now() + + # Update Database + update_db(username, { + task_name: success, + 'last_update': current['last_update']} + ) + + # Save error message if the task was failed + if not success: + errmsg = msg.get('errmsg', '') + if errmsg: + current['errmsg'].append(f"{task_name}: {errmsg}") + + # Define message that's going to be published + message = { + 'username': username, + 'uid': current['uid'], + 'gid': current['gid'], + 'email': current['email'], + 'reason': current['reason'], + 'fullname': current['fullname'] + } + + try: + if task_name in current['request']: + current['request'][task_name] = success + routing_key = 'verify.' + username + + # Terminate the process if failed + if not success: + terminated = True + routing_key = 'complete.' + username + message['success'] = False + message['errmsg'] = current['errmsg'] + + send = True + current['waiting'] = {'git_commit', 'dir_verify', 'subscribe_mail_list'} + logger.debug(f'Request level {task_name}? {success}') + + elif task_name in current['verify']: + current['verify'][task_name] = success + current['waiting'].discard(task_name) + routing_key = 'notify.' + username + + if not current['waiting']: + send = True + current['waiting'] = {'notify_user'} + + # Terminate if dir_verify failed and all agents has responsed + if send and not current['verify']['dir_verify']: + terminated = True + routing_key = 'complete.' + username + message['success'] = False + message['errmsg'] = current['errmsg'] + + logger.debug(f'Verify level {task_name}? {success}') + + elif task_name in current['notify']: + current['notify'][task_name] = success + current['waiting'].discard(task_name) + routing_key = 'complete.' + username + message['success'] = success + message['errmsg'] = current['errmsg'] + + send = True + + # The whole creation process has completed + completed = True + + logger.debug(f'Notify level {task_name}? {success}') + + except Exception as exception: + logger.error('', exc_info=True) + + if send: + # Send trigger message + rc_rmq.publish_msg({ + 'routing_key': routing_key, + 'msg': message + }) + + logger.debug(f"Trigger message '{routing_key}' sent") + + logger.debug('Previous level messages acknowledged') + + # Send report to admin + if completed or terminated: + + notify_admin(username, current) + + update_db(username, {'reported': True}) + + tracking.pop(username) + + logger.debug('Admin report sent') + + # Acknowledge message + ch.basic_ack(method.delivery_tag) + + +def timeout_handler(signum, frame): + current_time = datetime.now() + for user in tuple(tracking): + delta = current_time - tracking[user]['last_update'] + + if delta.seconds > timeout: + + rc_rmq.publish_msg({ + 'routing_key': 'complete.' + user, + 'msg': { + 'username': user, + 'success': False, + 'errmsg': ["Timeout on " + ', '.join(tracking[user]['waiting'])] + } + }) + + notify_admin(user, tracking[user]) + + update_db(user, {'reported': True}) + + tracking.pop(user) + + +# Set initial timeout timer +signal.signal(signal.SIGALRM, timeout_handler) +signal.setitimer(signal.ITIMER_REAL, timeout, timeout) + +logger.info(f'Start listening to queue: {task}') +rc_rmq.start_consume({ + 'queue': task, + 'routing_key': "confirm.*", + 'cb': task_manager +}) + +logger.info('Disconnected') +rc_rmq.disconnect() diff --git a/prod_rmq_agents/user_reg_event_logger.py b/prod_rmq_agents/user_reg_event_logger.py new file mode 100644 index 0000000000000000000000000000000000000000..02c12483ce3540e8f36c2d902f998c6e1ba50f9b --- /dev/null +++ b/prod_rmq_agents/user_reg_event_logger.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python +import sys +import json +from rc_rmq import RCRMQ + +task = 'user_reg_event_log' + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) + +# Define your callback function +def log_user_reg_events(ch, method, properties, body): + + # Retrieve message + msg = json.loads(body) + + # Retrieve routing key + routing_key = method.routing_key + action = routing_key.split(".")[0] + user = routing_key.split(".")[1] + print(f'Got a {action} message for {user} with routing key: {routing_key}') + print(msg) + + # Acknowledge message + ch.basic_ack(delivery_tag=method.delivery_tag) + + +print("Start listening to queue: {}".format(task)) +rc_rmq.start_consume({ + 'queue': task, # Define your Queue name + 'routing_key': "#", # Define your routing key + 'cb': log_user_reg_events # Pass in callback function you just define +}) diff --git a/prod_rmq_agents/user_reg_logger.py b/prod_rmq_agents/user_reg_logger.py new file mode 100755 index 0000000000000000000000000000000000000000..402c9e319584d36560287be9e14c55d16ab509e6 --- /dev/null +++ b/prod_rmq_agents/user_reg_logger.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python +import json +import sys +import dataset +import rc_util +from rc_rmq import RCRMQ +from datetime import datetime + +# Define queue name +task = 'reg_logger' + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) + +# Parse arguments +args = rc_util.get_args() + +# Logger +logger = rc_util.get_logger() + +# Open registry table in DB +db = dataset.connect('sqlite:///reg_logger.db') +account_req_table = db['registry'] + +# Define registration logger callback +def log_registration(ch, method, properties, body): + + account_req = json.loads(body) + account_req['req_time'] = datetime.now(), + account_req_table.insert(account_req) + logger.info("logged account request for %s", account_req['username']) + + ch.basic_ack(delivery_tag=method.delivery_tag) + +logger.info("Start listening to queue: {}".format(task)) + +# Start consuming messages from queue with callback function +rc_rmq.start_consume({ + 'queue': task, + 'routing_key': "create.*", + 'cb': log_registration +}) + diff --git a/rabbit_config.py.example b/rabbit_config.py.example index 5643bb163e71c667dffdb6890c854b84d3810596..acd1535855531a517e062e60cae7ecfa332d4dee 100644 --- a/rabbit_config.py.example +++ b/rabbit_config.py.example @@ -4,3 +4,46 @@ Password = 'CHANGE_IT_TO_YOUR_OWN_PASSWORD' VHost = '/' Server = 'ohpc' Port = 5672 + +# time delay to let account creation finish +# to avoid concurrency with downstream agents +Delay = 5 + +# dir_verify agent config +User_dirs = ['/home', '/data/user', '/data/scratch'] + +# git_commit agent config +rc_users_ldap_repo_loc = "~/git/rc-users" + +# Config related to email +Server = 'localhost' +Admin_email = 'root@localhost' +Sender = 'ROOT@LOCALHOST' +Sender_alias = 'Services' +Subject = 'New User Account' +Info_url = 'https://www.google.com' +Mail_list = 'root@localhost' +Mail_list_bcc = 'cmsupport@localhost' + +Head = f"""From: {Sender_alias} <{Sender}> +To: <{{{{ to }}}}> +Subject: {Subject} +""" + +Body = f""" +Hi {{{{ username }}}} +Your account has been set up with: + +============================ +User ID: {{{{ username }}}} +============================ + +If you have any questions, please visit: +{Info_url} + +or email at {Admin_email} + +Cheers, +""" + +Whole_mail = Head + Body diff --git a/rc_util.py b/rc_util.py index 0e7c4c1e6ec7d0cba4cc7370cf35370efa7f7354..e16fe306583750d200fb56de6ff9408ea55a70e7 100644 --- a/rc_util.py +++ b/rc_util.py @@ -4,7 +4,7 @@ from rc_rmq import RCRMQ import json rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) -tasks = {'ohpc_account': None, 'ood_account': None, 'slurm_account': None} +tasks = {'create_account': None, 'git_commit': None, 'dir_verify': None, 'subscribe_mail_list': None, 'notify_user': None} logger_fmt = '%(asctime)s [%(module)s] - %(message)s' def add_account(username, email, full='', reason=''): @@ -21,27 +21,29 @@ def add_account(username, email, full='', reason=''): def worker(ch, method, properties, body): msg = json.loads(body) - task = msg['task'] - tasks[task] = msg['success'] - print("Got msg: {}({})".format(msg['task'], msg['success'])) + username = msg['username'] - # Check if all tasks are done - done = True - for key, status in tasks.items(): - if not status: - print("{} is not done yet.".format(key)) - done = False - if done: - rc_rmq.stop_consume() - rc_rmq.delete_queue() + if msg['success']: + print(f'Account for {username} has been created.') + else: + print(f"There's some issue while creating account for {username}") + errmsg = msg.get('errmsg', []) + for err in errmsg: + print(err) + + rc_rmq.stop_consume() + rc_rmq.delete_queue() + +def consume(username, routing_key='', callback=worker, debug=False): + if routing_key == '': + routing_key = 'complete.' + username -def consume(username, callback=worker, debug=False): if debug: sleep(5) else: rc_rmq.start_consume({ 'queue': username, - 'routing_key': 'confirm.' + username, + 'routing_key': routing_key, 'cb': callback }) rc_rmq.disconnect() diff --git a/remove_user_login_node.sh b/remove_user_login_node.sh new file mode 100644 index 0000000000000000000000000000000000000000..ef984547cf06261ca6c26fa6afd490d7883750c1 --- /dev/null +++ b/remove_user_login_node.sh @@ -0,0 +1,21 @@ +#!/bin/sh +username="$1" +usage() { + echo "Usage: $0 USERNAME" +} +if [[ "$EUID" -ne 0 ]]; then + echo "This script must be run as root!" + exit 1 +fi +if [ -z "$username" ]; then + usage + exit 1 +fi +if id "$username" &>/dev/null; then + echo "Deleting nginx process running under user: ${username}" + kill -9 ` ps -ef | grep 'nginx' | grep ${username} | awk '{print $2}'` + echo "Deleted process" +else + echo "user: ${username} not found." + exit 1 +fi diff --git a/requirements.txt b/requirements.txt index becc2ce1d48131ef086f528906361c6b94cd5487..562cb606a776a6080f5acab0f53490d766dd2a2b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,5 @@ pika==1.1.0 +pyldap==3.0.0.post1 +dataset==1.3.1 +Jinja2==2.11.2 +sh==1.12.14 \ No newline at end of file