diff --git a/create_account.py b/create_account.py index 99f6d590a8d75f61e4e32df91e089d7244a23ff7..6869f98918653e01d15f587d89d0cc31024e639f 100755 --- a/create_account.py +++ b/create_account.py @@ -6,13 +6,23 @@ import argparse import signal parser = argparse.ArgumentParser() -parser.add_argument('username', help='username that will be created') -parser.add_argument('email', nargs='?', default='', help="User's email") -parser.add_argument('full_name', nargs='?', default='', help="User's full name") -parser.add_argument('reason', nargs='?', default='', help='Reason of requesting') -parser.add_argument('--domain', default='localhost', help='domain of email') -parser.add_argument('-v', '--verbose', action='store_true', help='verbose output') -parser.add_argument('-n', '--dry-run', action='store_true', help='enable dry run mode') +parser.add_argument("username", help="username that will be created") +parser.add_argument("email", nargs="?", default="", help="User's email") +parser.add_argument( + "full_name", nargs="?", default="", help="User's full name" +) +parser.add_argument( + "reason", nargs="?", default="", help="Reason of requesting" +) +parser.add_argument( + "--domain", default="localhost", help="domain of email" +) +parser.add_argument( + "-v", "--verbose", action="store_true", help="verbose output" +) +parser.add_argument( + "-n", "--dry-run", action="store_true", help="enable dry run mode" +) args = parser.parse_args() timeout = 60 @@ -21,8 +31,9 @@ queuename = rc_util.encode_name(args.username) if args.email == "": args.email = args.username - if '@' not in args.email: - args.email = args.username + '@' + args.domain + if "@" not in args.email: + args.email = args.username + "@" + args.domain + def timeout_handler(signum, frame): print("Process timeout, there's might some issue with agents") @@ -31,13 +42,13 @@ def timeout_handler(signum, frame): def callback(channel, method, properties, body): msg = json.loads(body) - username = msg['username'] + username = msg["username"] - if msg['success']: - print(f'Account for {username} has been created.') + if msg["success"]: + print(f"Account for {username} has been created.") else: print(f"There's some issue while creating account for {username}") - errmsg = msg.get('errmsg', []) + errmsg = msg.get("errmsg", []) for err in errmsg: print(err) diff --git a/prod_rmq_agents/dir_verify.py b/prod_rmq_agents/dir_verify.py index 108ad85001270876bb453ec40e47c15a5d049fef..fd2fa8842e891e80674c75e1d6f6a96440c0b1ef 100644 --- a/prod_rmq_agents/dir_verify.py +++ b/prod_rmq_agents/dir_verify.py @@ -8,68 +8,73 @@ from pathlib import Path from rc_rmq import RCRMQ import rabbit_config as rcfg -task = 'dir_verify' +task = "dir_verify" dirs = rcfg.User_dirs args = rc_util.get_args() logger = rc_util.get_logger(args) # Instantiate rabbitmq object -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) + def dir_verify(ch, method, properties, body): msg = json.loads(body) - username = msg['username'] - msg['task'] = task - msg['success'] = True + username = msg["username"] + msg["task"] = task + msg["success"] = True missing_dirs = [] try: for d in dirs: - path = Path(d) / msg['username'] + path = Path(d) / msg["username"] if args.dry_run: - logger.info(f'Checking dirs: {path}') + logger.info(f"Checking dirs: {path}") else: if not path.exists(): # check if dirs exist and record any missing dirs missing_dirs.append(path) - msg['success'] = False - msg['errmsg'] = f"Error: missing dirs {missing_dirs}" - logger.info(f'{path} does not exist') + msg["success"] = False + msg["errmsg"] = f"Error: missing dirs {missing_dirs}" + logger.info(f"{path} does not exist") else: # check existing dirs for correct ownership and permissions status = os.stat(path) mask = oct(status.st_mode)[-3:] uid = str(status.st_uid) gid = str(status.st_gid) - if mask!='700' or uid!=msg['uid'] or gid!=msg['gid']: - msg['success'] = False - msg['errmsg'] = f"Error: dir {path} permissions or ownership are wrong" + if ( + mask != "700" + or uid != msg["uid"] + or gid != msg["gid"] + ): + msg["success"] = False + msg[ + "errmsg" + ] = f"Error: dir {path} permissions or ownership are wrong" except Exception as exception: - msg['success'] = False - msg['errmsg'] = "Exception raised, check the logs for stack trace" - logger.error('', exc_info=True) + msg["success"] = False + msg["errmsg"] = "Exception raised, check the logs for stack trace" + logger.error("", exc_info=True) # send confirm message rc_rmq.publish_msg( {"routing_key": "confirm." + msg["queuename"], "msg": msg} ) - logger.debug(f'User {username} confirmation sent') + logger.debug(f"User {username} confirmation sent") ch.basic_ack(delivery_tag=method.delivery_tag) -logger.info(f'Start listening to queue: {task}') -rc_rmq.start_consume({ - 'queue': task, - 'routing_key': "verify.*", - 'cb': dir_verify -}) +logger.info(f"Start listening to queue: {task}") +rc_rmq.start_consume( + {"queue": task, "routing_key": "verify.*", "cb": dir_verify} +) -logger.info('Disconnected') +logger.info("Disconnected") rc_rmq.disconnect() diff --git a/prod_rmq_agents/get-next-uid-gid.py b/prod_rmq_agents/get-next-uid-gid.py index 9a1e2f82edfefbea7ea9a3c00a63ff6f2edeb014..fc3d816f223fe5968e5beefa1e0dbeb5efcb529a 100644 --- a/prod_rmq_agents/get-next-uid-gid.py +++ b/prod_rmq_agents/get-next-uid-gid.py @@ -10,35 +10,36 @@ from os import popen from rc_rmq import RCRMQ import rabbit_config as rcfg -task = 'create_account' +task = "create_account" # Instantiate rabbitmq object -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) args = rc_util.get_args() # Logger logger = rc_util.get_logger() -#Account creation +# Account creation def create_account(msg): - logger.info(f'Account creation request received: {msg}') - username = msg['username'] - uid = msg['uid'] - email = msg['email'] - fullname = msg['fullname'] - msg['success'] = False + logger.info(f"Account creation request received: {msg}") + username = msg["username"] + uid = msg["uid"] + email = msg["email"] + fullname = msg["fullname"] + msg["success"] = False - # Bright command to create user - cmd = '/cm/local/apps/cmd/bin/cmsh -c ' + # Bright command to create user + cmd = "/cm/local/apps/cmd/bin/cmsh -c " cmd += f'"user; add {username}; set id {uid}; set email {email}; set commonname \\"{fullname}\\"; ' cmd += 'commit;"' if not args.dry_run: popen(cmd) time.sleep(rcfg.Delay) - logger.info(f'Bright command to create user:{cmd}') + logger.info(f"Bright command to create user:{cmd}") + # Define your callback function def resolve_uid_gid(ch, method, properties, body): @@ -46,8 +47,8 @@ def resolve_uid_gid(ch, method, properties, body): # Retrieve message msg = json.loads(body) logger.info("Received {}".format(msg)) - username = msg['username'] - msg['success'] = False + username = msg["username"] + msg["success"] = False # Determine next available UID try: @@ -56,27 +57,29 @@ def resolve_uid_gid(ch, method, properties, body): if user_exists: logger.info("The user, {} already exists".format(username)) - msg['uid'] = user_exists.split(':')[2] - msg['gid'] = user_exists.split(':')[3] + msg["uid"] = user_exists.split(":")[2] + msg["gid"] = user_exists.split(":")[3] else: cmd_uid = "/usr/bin/getent passwd | \ awk -F: 'BEGIN { maxuid=10000 } ($3>10000) && ($3<20000) && ($3>maxuid) { maxuid=$3; } END { print maxuid+1; }'" - msg['uid'] = popen(cmd_uid).read().rstrip() + msg["uid"] = popen(cmd_uid).read().rstrip() logger.info(f"UID query: {cmd_uid}") cmd_gid = "/usr/bin/getent group | \ awk -F: 'BEGIN { maxgid=10000 } ($3>10000) && ($3<20000) && ($3>maxgid) { maxgid=$3; } END { print maxgid+1; }'" - msg['gid'] = popen(cmd_gid).read().rstrip() + msg["gid"] = popen(cmd_gid).read().rstrip() logger.info(f"GID query: {cmd_gid}") create_account(msg) - msg['task'] = task - msg['success'] = True + msg["task"] = task + msg["success"] = True except Exception as exception: - msg['success'] = False - msg['errmsg'] = f"Exception raised during account creation, check logs for stack trace" - logger.error('', exc_info=True) + msg["success"] = False + msg[ + "errmsg" + ] = f"Exception raised during account creation, check logs for stack trace" + logger.error("", exc_info=True) # Acknowledge message ch.basic_ack(delivery_tag=method.delivery_tag) @@ -90,11 +93,9 @@ def resolve_uid_gid(ch, method, properties, body): logger.info("Start listening to queue: {}".format(task)) -rc_rmq.start_consume({ - 'queue': task, - 'routing_key': "request.*", - 'cb': resolve_uid_gid -}) +rc_rmq.start_consume( + {"queue": task, "routing_key": "request.*", "cb": resolve_uid_gid} +) logger.info("Disconnected") rc_rmq.disconnect() diff --git a/prod_rmq_agents/git_commit.py b/prod_rmq_agents/git_commit.py index 81a0e2b30c647d5aff72073991226036b797e7d1..1eeeec729cce8ef0cf0f7bbad02a90719871aeb6 100644 --- a/prod_rmq_agents/git_commit.py +++ b/prod_rmq_agents/git_commit.py @@ -7,77 +7,109 @@ import rc_util from rc_rmq import RCRMQ import rabbit_config as rmq_cfg -task = 'git_commit' +task = "git_commit" # Instantiate rabbitmq object -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) # Define some location repo_location = os.path.expanduser(rmq_cfg.rc_users_ldap_repo_loc) -users_dir = repo_location + '/users' -groups_dir = repo_location + '/groups' +users_dir = repo_location + "/users" +groups_dir = repo_location + "/groups" args = rc_util.get_args() logger = rc_util.get_logger(args) if not args.dry_run: - git = sh.git.bake('--git-dir', repo_location+'/.git', '--work-tree', repo_location) - ldapsearch = sh.Command('ldapsearch') + git = sh.git.bake( + "--git-dir", repo_location + "/.git", "--work-tree", repo_location + ) + ldapsearch = sh.Command("ldapsearch") else: - git = sh.echo.bake('--git-dir', repo_location+'/.git', '--work-tree', repo_location) - ldapsearch = sh.echo.bake('ldapsearch') + git = sh.echo.bake( + "--git-dir", repo_location + "/.git", "--work-tree", repo_location + ) + ldapsearch = sh.echo.bake("ldapsearch") + def git_commit(ch, method, properties, body): msg = json.loads(body) - username = msg['username'] - msg['task'] = task - msg['success'] = False - branch_name = 'issue-add-users-' + username.lower() - user_ldif = users_dir + f'/{username}.ldif' - group_ldif = groups_dir + f'/{username}.ldif' + username = msg["username"] + msg["task"] = task + msg["success"] = False + branch_name = "issue-add-users-" + username.lower() + user_ldif = users_dir + f"/{username}.ldif" + group_ldif = groups_dir + f"/{username}.ldif" logger.info("Received: %s", msg) logger.debug("branch_name: %s", branch_name) try: - logger.debug('git checkout master') - git.checkout('master') - logger.debug('git pull') + logger.debug("git checkout master") + git.checkout("master") + logger.debug("git pull") git.pull() - branch_exists = git.branch('--list', branch_name) + branch_exists = git.branch("--list", branch_name) if not branch_exists: - logger.debug('git checkout -b %s', branch_name) - git.checkout('-b', branch_name) - logger.debug("open(%s, 'w'), open(%s, 'w')", user_ldif, group_ldif) - with open(user_ldif, 'w') as ldif_u,\ - open(group_ldif, 'w') as ldif_g: - logger.debug(f"ldapsearch -LLL -x -H ldaps://ldapserver -b 'dc=cm,dc=cluster' uid={username} > {user_ldif}") - ldapsearch('-LLL', '-x', '-H', 'ldaps://ldapserver', '-b', "dc=cm,dc=cluster", f"uid={username}", _out=ldif_u) - logger.debug(f"ldapsearch -LLL -x -H ldapserver -b 'ou=Group,dc=cm,dc=cluster' cn={username} > {group_ldif}") - ldapsearch('-LLL', '-x', '-H', 'ldaps://ldapserver', '-b', "ou=Group,dc=cm,dc=cluster", f"cn={username}", _out=ldif_g) - logger.info('user ldif files generated.') - - logger.debug('git add %s', user_ldif) + logger.debug("git checkout -b %s", branch_name) + git.checkout("-b", branch_name) + logger.debug( + "open(%s, 'w'), open(%s, 'w')", user_ldif, group_ldif + ) + with open(user_ldif, "w") as ldif_u, open( + group_ldif, "w" + ) as ldif_g: + logger.debug( + f"ldapsearch -LLL -x -H ldaps://ldapserver -b 'dc=cm,dc=cluster' uid={username} > {user_ldif}" + ) + ldapsearch( + "-LLL", + "-x", + "-H", + "ldaps://ldapserver", + "-b", + "dc=cm,dc=cluster", + f"uid={username}", + _out=ldif_u, + ) + logger.debug( + f"ldapsearch -LLL -x -H ldapserver -b 'ou=Group,dc=cm,dc=cluster' cn={username} > {group_ldif}" + ) + ldapsearch( + "-LLL", + "-x", + "-H", + "ldaps://ldapserver", + "-b", + "ou=Group,dc=cm,dc=cluster", + f"cn={username}", + _out=ldif_g, + ) + logger.info("user ldif files generated.") + + logger.debug("git add %s", user_ldif) git.add(user_ldif) - logger.debug('git add %s', group_ldif) + logger.debug("git add %s", group_ldif) git.add(group_ldif) - logger.debug("git commit -m 'Added new cheaha user: %s'", username) + logger.debug( + "git commit -m 'Added new cheaha user: %s'", username + ) git.commit(m="Added new cheaha user: " + username) - logger.debug('git checkout master') - git.checkout('master') + logger.debug("git checkout master") + git.checkout("master") - logger.debug('git merge %s --no-ff --no-edit', branch_name) - git.merge(branch_name, '--no-ff', '--no-edit') - logger.debug('git push origin master') - git.push('origin', 'master') + logger.debug("git merge %s --no-ff --no-edit", branch_name) + git.merge(branch_name, "--no-ff", "--no-edit") + logger.debug("git push origin master") + git.push("origin", "master") # merge with gitlab api - logger.info('Added ldif files and committed to git repo') + logger.info("Added ldif files and committed to git repo") - msg['success'] = True + msg["success"] = True except Exception as exception: - logger.error('', exc_info=True) + logger.error("", exc_info=True) # Send confirm message logger.debug("rc_rmq.publish_msge()") @@ -87,16 +119,14 @@ def git_commit(ch, method, properties, body): logger.info("confirmation sent") # Acknowledge message - logger.debug('ch.basic_ack()') + logger.debug("ch.basic_ack()") ch.basic_ack(delivery_tag=method.delivery_tag) logger.info("Start listening to queue: %s", task) -rc_rmq.start_consume({ - 'queue': task, - 'routing_key': "verify.*", - 'cb': git_commit -}) +rc_rmq.start_consume( + {"queue": task, "routing_key": "verify.*", "cb": git_commit} +) logger.info("Disconnected") rc_rmq.disconnect() diff --git a/prod_rmq_agents/notify_user.py b/prod_rmq_agents/notify_user.py index b378dea929da4b55f48507318cb608d9fbc1ddee..92b442458a4c04d7695b32da897432f6225181f5 100644 --- a/prod_rmq_agents/notify_user.py +++ b/prod_rmq_agents/notify_user.py @@ -9,24 +9,25 @@ from jinja2 import Template from datetime import datetime import rabbit_config as rcfg import mail_config as mail_cfg -task = 'notify_user' + +task = "notify_user" args = rc_util.get_args() logger = rc_util.get_logger(args) -db = dataset.connect(f'sqlite:///.agent_db/user_reg.db') -table = db['users'] +db = dataset.connect(f"sqlite:///.agent_db/user_reg.db") +table = db["users"] # Instantiate rabbitmq object -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) # Email instruction to user def notify_user(ch, method, properties, body): msg = json.loads(body) - username = msg['username'] - user_email = msg['email'] - msg['task'] = task - msg['success'] = False + username = msg["username"] + user_email = msg["email"] + msg["task"] = task + msg["success"] = False errmsg = "" try: @@ -34,70 +35,77 @@ def notify_user(ch, method, properties, body): # Search username in database record = table.find_one(username=username) - if record['sent'] is not None: - errmsg = 'Updating database counter' + if record["sent"] is not None: + errmsg = "Updating database counter" # Update counter - count = record['count'] + count = record["count"] if args.dry_run: - logger.info('Update counter in database') + logger.info("Update counter in database") else: - table.update({ - 'username': username, - 'count': count + 1 - }, ['username']) + table.update( + {"username": username, "count": count + 1}, + ["username"], + ) - logger.debug(f'User {username} counter updated to {count + 1}') + logger.debug(f"User {username} counter updated to {count + 1}") else: # Send email to user receivers = [user_email, rcfg.Admin_email] - message = Template(mail_cfg.Whole_mail).render(username=username, to=user_email) + message = Template(mail_cfg.Whole_mail).render( + username=username, to=user_email + ) if args.dry_run: - logger.info(f'smtp = smtplib.SMTP({rcfg.Mail_server})') - logger.info(f'smtp.sendmail({rcfg.Sender}, {receivers}, message)') - logger.info(f"table.update({{'username': {username}, 'count': 1, 'sent_at': datetime.now()}}, ['username'])") + logger.info(f"smtp = smtplib.SMTP({rcfg.Mail_server})") + logger.info( + f"smtp.sendmail({rcfg.Sender}, {receivers}, message)" + ) + logger.info( + f"table.update({{'username': {username}, 'count': 1, 'sent_at': datetime.now()}}, ['username'])" + ) else: - errmsg = 'Sending email to user' + errmsg = "Sending email to user" smtp = smtplib.SMTP(rcfg.Mail_server) smtp.sendmail(rcfg.Sender, receivers, message) - logger.debug(f'Email sent to: {user_email}') + logger.debug(f"Email sent to: {user_email}") - errmsg = 'Updating database email sent time' - table.update({ - 'username': username, - 'count': 1, - 'sent': datetime.now() - }, ['username']) + errmsg = "Updating database email sent time" + table.update( + { + "username": username, + "count": 1, + "sent": datetime.now(), + }, + ["username"], + ) - logger.debug(f'User {username} inserted into database') + logger.debug(f"User {username} inserted into database") - msg['success'] = True + msg["success"] = True except Exception as exception: - logger.error('', exc_info=True) - msg['errmsg'] = errmsg if errmsg else 'Unexpected error' + logger.error("", exc_info=True) + msg["errmsg"] = errmsg if errmsg else "Unexpected error" # Send confirm message rc_rmq.publish_msg( {"routing_key": "confirm." + msg["queuename"], "msg": msg} ) - logger.debug(f'User {username} confirmation sent') + logger.debug(f"User {username} confirmation sent") # Acknowledge the message ch.basic_ack(delivery_tag=method.delivery_tag) if __name__ == "__main__": - logger.info(f'Start listening to queue: {task}') - rc_rmq.start_consume({ - 'queue': task, - 'routing_key': "notify.*", - 'cb': notify_user - }) - - logger.info('Disconnected') + logger.info(f"Start listening to queue: {task}") + rc_rmq.start_consume( + {"queue": task, "routing_key": "notify.*", "cb": notify_user} + ) + + logger.info("Disconnected") rc_rmq.disconnect() diff --git a/prod_rmq_agents/subscribe_mail_lists.py b/prod_rmq_agents/subscribe_mail_lists.py index 8d9a91aba8fd5f4a1d8cbf4cc10b1c0e2cdd12c9..fcbc8c8e9c93c41ff66e569a83a8b475d72a534f 100644 --- a/prod_rmq_agents/subscribe_mail_lists.py +++ b/prod_rmq_agents/subscribe_mail_lists.py @@ -9,44 +9,45 @@ from email.message import EmailMessage from rc_rmq import RCRMQ import rabbit_config as rcfg -task = 'subscribe_mail_list' +task = "subscribe_mail_list" # Instantiate rabbitmq object -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) # Parse arguments args = rc_util.get_args() # Logger -logger = rc_util.get_logger()# Define your callback function +logger = rc_util.get_logger() # Define your callback function + def mail_list_subscription(ch, method, properties, body): # Retrieve message msg = json.loads(body) logger.info("Received msg {}".format(msg)) - username = msg['username'] - fullname = msg['fullname'] - email = msg['email'] + username = msg["username"] + fullname = msg["fullname"] + email = msg["email"] mail_list_admin = rcfg.Sender mail_list = rcfg.Mail_list mail_list_bcc = rcfg.Mail_list_bcc server = rcfg.Mail_server - listserv_cmd = f'QUIET ADD hpc-announce {email} {fullname} \ - \nQUIET ADD hpc-users {email} {fullname}' + listserv_cmd = f"QUIET ADD hpc-announce {email} {fullname} \ + \nQUIET ADD hpc-users {email} {fullname}" logger.info("Adding user{} to mail list".format(username)) - msg['success'] = False + msg["success"] = False try: # Create a text/plain message email_msg = EmailMessage() - email_msg['From'] = mail_list_admin - email_msg['To'] = mail_list - email_msg['Subject'] = '' - email_msg['Bcc'] = mail_list_bcc + email_msg["From"] = mail_list_admin + email_msg["To"] = mail_list + email_msg["Subject"] = "" + email_msg["Bcc"] = mail_list_bcc # Create an smtp object and send email s = smtplib.SMTP(server) @@ -54,13 +55,15 @@ def mail_list_subscription(ch, method, properties, body): email_msg.set_content(listserv_cmd) if not args.dry_run: s.send_message(email_msg) - logging.info(f'This email will add user {username} to listserv \n{email_msg}') + logging.info( + f"This email will add user {username} to listserv \n{email_msg}" + ) s.quit() - msg['task'] = task - msg['success'] = True + msg["task"] = task + msg["success"] = True except Exception as exception: - logger.error('', exc_info=True) + logger.error("", exc_info=True) # Acknowledge message ch.basic_ack(delivery_tag=method.delivery_tag) @@ -74,11 +77,13 @@ def mail_list_subscription(ch, method, properties, body): logger.info("Start listening to queue: {}".format(task)) -rc_rmq.start_consume({ - 'queue': task, # Define your Queue name - 'routing_key': "verify.*", # Define your routing key - 'cb': mail_list_subscription # Pass in callback function you just define -}) +rc_rmq.start_consume( + { + "queue": task, # Define your Queue name + "routing_key": "verify.*", # Define your routing key + "cb": mail_list_subscription, # Pass in callback function you just define + } +) logger.info("Disconnected") rc_rmq.disconnect() diff --git a/prod_rmq_agents/task_manager.py b/prod_rmq_agents/task_manager.py index 135c714474078bb5a436b87d696a64ba4aeccc54..036540ac65c837881e74601eea4ab4d366be590c 100644 --- a/prod_rmq_agents/task_manager.py +++ b/prod_rmq_agents/task_manager.py @@ -12,57 +12,59 @@ from datetime import datetime import mail_config as mail_cfg import rabbit_config as rcfg -task = 'task_manager' +task = "task_manager" timeout = 30 args = rc_util.get_args() logger = rc_util.get_logger(args) -db = dataset.connect(f'sqlite:///.agent_db/user_reg.db') -table = db['users'] +db = dataset.connect(f"sqlite:///.agent_db/user_reg.db") +table = db["users"] record = { - 'uid': -1, - 'gid': -1, - 'email': '', - 'reason': '', - 'fullname': '', - 'last_update': datetime.now(), - 'errmsg': None, - 'waiting': set(), - 'request': { - 'create_account': None + "uid": -1, + "gid": -1, + "email": "", + "reason": "", + "fullname": "", + "last_update": datetime.now(), + "errmsg": None, + "waiting": set(), + "request": {"create_account": None}, + "verify": { + "git_commit": None, + "dir_verify": None, + "subscribe_mail_list": None, }, - 'verify': { - 'git_commit': None, - 'dir_verify': None, - 'subscribe_mail_list': None - }, - 'notify': { - 'notify_user': None - }, - 'reported': False + "notify": {"notify_user": None}, + "reported": False, } # Currently tracking users tracking = {} # Instantiate rabbitmq object -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) + def notify_admin(username, user_record): receivers = [rcfg.Admin_email] - result = "SUCCESS" if user_record["request"]["create_account"] and\ - user_record["verify"]["git_commit"] and\ - user_record["verify"]["dir_verify"] and\ - user_record["verify"]["subscribe_mail_list"] and\ - user_record["notify"]["notify_user"]\ - else "FAILED" + result = ( + "SUCCESS" + if user_record["request"]["create_account"] + and user_record["verify"]["git_commit"] + and user_record["verify"]["dir_verify"] + and user_record["verify"]["subscribe_mail_list"] + and user_record["notify"]["notify_user"] + else "FAILED" + ) - message = Template(mail_cfg.UserReportHead).render(username=username, fullname=user_record['fullname'], result=result) - if user_record['reported']: - message += ' (Duplicate)' + message = Template(mail_cfg.UserReportHead).render( + username=username, fullname=user_record["fullname"], result=result + ) + if user_record["reported"]: + message += " (Duplicate)" message += f""" \n User Creation Report for user {username} uid: {user_record["uid"]}, gid: {user_record["gid"]} @@ -73,24 +75,26 @@ def notify_admin(username, user_record): 'subscribe_mail_list': {user_record["verify"]["subscribe_mail_list"]} 'notify_user': {user_record["notify"]["notify_user"]} """ - if user_record['errmsg']: + if user_record["errmsg"]: message += """ Error(s): """ - for msg in user_record['errmsg']: + for msg in user_record["errmsg"]: message += msg + "\n" if args.dry_run: - logger.info(f'smtp = smtplib.SMTP({rcfg.Mail_server})') - logger.info(f'smtp.sendmail({rcfg.Sender}, {rcfg.Admin_email}, message)') + logger.info(f"smtp = smtplib.SMTP({rcfg.Mail_server})") + logger.info( + f"smtp.sendmail({rcfg.Sender}, {rcfg.Admin_email}, message)" + ) logger.info(message) else: smtp = smtplib.SMTP(rcfg.Mail_server) smtp.sendmail(rcfg.Sender, receivers, message) - logger.debug(f'User report sent to: {rcfg.Admin_email}') + logger.debug(f"User report sent to: {rcfg.Admin_email}") def insert_db(username, msg): @@ -121,8 +125,8 @@ def insert_db(username, msg): def update_db(username, data): - obj = { 'username': username, **data } - table.update(obj, ['username']) + obj = {"username": username, **data} + table.update(obj, ["username"]) def task_manager(ch, method, properties, body): @@ -155,41 +159,45 @@ def task_manager(ch, method, properties, body): if user_db: # Restore task status - current['request']['create_account'] = user_db['create_account'] - current['verify']['git_commit'] = user_db['git_commit'] - current['verify']['dir_verify'] = user_db['dir_verify'] - current['verify']['subscribe_mail_list'] = user_db['subscribe_mail_list'] - current['notify']['notify_user'] = user_db['notify_user'] - - current['reported'] = user_db['reported'] - - for t in ['git_commit', 'dir_verify', 'subscribe_mail_list']: + current["request"]["create_account"] = user_db[ + "create_account" + ] + current["verify"]["git_commit"] = user_db["git_commit"] + current["verify"]["dir_verify"] = user_db["dir_verify"] + current["verify"]["subscribe_mail_list"] = user_db[ + "subscribe_mail_list" + ] + current["notify"]["notify_user"] = user_db["notify_user"] + + current["reported"] = user_db["reported"] + + for t in ["git_commit", "dir_verify", "subscribe_mail_list"]: if user_db[t] is None: - current['waiting'].add(t) + current["waiting"].add(t) - if not current['waiting'] and user_db['notify_user'] is None: - current['waiting'].add('notify_user') + if not current["waiting"] and user_db["notify_user"] is None: + current["waiting"].add("notify_user") - logger.debug(f'Loaded user {username} from DB') + logger.debug(f"Loaded user {username} from DB") else: insert_db(username, msg) - logger.debug(f'Tracking user {username}') + logger.debug(f"Tracking user {username}") - current['last_update'] = datetime.now() + current["last_update"] = datetime.now() # Update Database - update_db(username, { - task_name: success, - 'last_update': current['last_update']} + update_db( + username, + {task_name: success, "last_update": current["last_update"]}, ) # Save error message if the task was failed if not success: - errmsg = msg.get('errmsg', '') + errmsg = msg.get("errmsg", "") if errmsg: - current['errmsg'].append(f"{task_name}: {errmsg}") + current["errmsg"].append(f"{task_name}: {errmsg}") # Define message that's going to be published message = { @@ -215,26 +223,30 @@ def task_manager(ch, method, properties, body): message["errmsg"] = current["errmsg"] send = True - current['waiting'] = {'git_commit', 'dir_verify', 'subscribe_mail_list'} - logger.debug(f'Request level {task_name}? {success}') + current["waiting"] = { + "git_commit", + "dir_verify", + "subscribe_mail_list", + } + logger.debug(f"Request level {task_name}? {success}") elif task_name in current["verify"]: current["verify"][task_name] = success current["waiting"].discard(task_name) routing_key = "notify." + queuename - if not current['waiting']: + if not current["waiting"]: send = True - current['waiting'] = {'notify_user'} + current["waiting"] = {"notify_user"} # Terminate if dir_verify failed and all agents has responsed - if send and not current['verify']['dir_verify']: + if send and not current["verify"]["dir_verify"]: terminated = True routing_key = "complete." + queuename message["success"] = False message["errmsg"] = current["errmsg"] - logger.debug(f'Verify level {task_name}? {success}') + logger.debug(f"Verify level {task_name}? {success}") elif task_name in current["notify"]: current["notify"][task_name] = success @@ -248,32 +260,29 @@ def task_manager(ch, method, properties, body): # The whole creation process has completed completed = True - logger.debug(f'Notify level {task_name}? {success}') + logger.debug(f"Notify level {task_name}? {success}") except Exception as exception: - logger.error('', exc_info=True) + logger.error("", exc_info=True) if send: # Send trigger message - rc_rmq.publish_msg({ - 'routing_key': routing_key, - 'msg': message - }) + rc_rmq.publish_msg({"routing_key": routing_key, "msg": message}) logger.debug(f"Trigger message '{routing_key}' sent") - logger.debug('Previous level messages acknowledged') + logger.debug("Previous level messages acknowledged") # Send report to admin if completed or terminated: notify_admin(username, current) - update_db(username, {'reported': True}) + update_db(username, {"reported": True}) tracking.pop(username) - logger.debug('Admin report sent') + logger.debug("Admin report sent") # Acknowledge message ch.basic_ack(method.delivery_tag) @@ -282,22 +291,27 @@ def task_manager(ch, method, properties, body): def timeout_handler(signum, frame): current_time = datetime.now() for user in tuple(tracking): - delta = current_time - tracking[user]['last_update'] + delta = current_time - tracking[user]["last_update"] if delta.seconds > timeout: - rc_rmq.publish_msg({ - 'routing_key': 'complete.' + user, - 'msg': { - 'username': user, - 'success': False, - 'errmsg': ["Timeout on " + ', '.join(tracking[user]['waiting'])] + rc_rmq.publish_msg( + { + "routing_key": "complete." + user, + "msg": { + "username": user, + "success": False, + "errmsg": [ + "Timeout on " + + ", ".join(tracking[user]["waiting"]) + ], + }, } - }) + ) notify_admin(user, tracking[user]) - update_db(user, {'reported': True}) + update_db(user, {"reported": True}) tracking.pop(user) @@ -306,12 +320,10 @@ def timeout_handler(signum, frame): signal.signal(signal.SIGALRM, timeout_handler) signal.setitimer(signal.ITIMER_REAL, timeout, timeout) -logger.info(f'Start listening to queue: {task}') -rc_rmq.start_consume({ - 'queue': task, - 'routing_key': "confirm.*", - 'cb': task_manager -}) +logger.info(f"Start listening to queue: {task}") +rc_rmq.start_consume( + {"queue": task, "routing_key": "confirm.*", "cb": task_manager} +) -logger.info('Disconnected') +logger.info("Disconnected") rc_rmq.disconnect() diff --git a/prod_rmq_agents/user_reg_event_logger.py b/prod_rmq_agents/user_reg_event_logger.py index 02c12483ce3540e8f36c2d902f998c6e1ba50f9b..63b6f5bfb0e557a3030631c70d221b2d45aeef9e 100644 --- a/prod_rmq_agents/user_reg_event_logger.py +++ b/prod_rmq_agents/user_reg_event_logger.py @@ -3,10 +3,10 @@ import sys import json from rc_rmq import RCRMQ -task = 'user_reg_event_log' +task = "user_reg_event_log" # Instantiate rabbitmq object -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) # Define your callback function def log_user_reg_events(ch, method, properties, body): @@ -18,7 +18,9 @@ def log_user_reg_events(ch, method, properties, body): routing_key = method.routing_key action = routing_key.split(".")[0] user = routing_key.split(".")[1] - print(f'Got a {action} message for {user} with routing key: {routing_key}') + print( + f"Got a {action} message for {user} with routing key: {routing_key}" + ) print(msg) # Acknowledge message @@ -26,8 +28,10 @@ def log_user_reg_events(ch, method, properties, body): print("Start listening to queue: {}".format(task)) -rc_rmq.start_consume({ - 'queue': task, # Define your Queue name - 'routing_key': "#", # Define your routing key - 'cb': log_user_reg_events # Pass in callback function you just define -}) +rc_rmq.start_consume( + { + "queue": task, # Define your Queue name + "routing_key": "#", # Define your routing key + "cb": log_user_reg_events, # Pass in callback function you just define + } +) diff --git a/prod_rmq_agents/user_reg_logger.py b/prod_rmq_agents/user_reg_logger.py index 402c9e319584d36560287be9e14c55d16ab509e6..dfe8d612b280cd9aa994e1be0f8b3e314a359259 100755 --- a/prod_rmq_agents/user_reg_logger.py +++ b/prod_rmq_agents/user_reg_logger.py @@ -6,11 +6,11 @@ import rc_util from rc_rmq import RCRMQ from datetime import datetime -# Define queue name -task = 'reg_logger' +# Define queue name +task = "reg_logger" # Instantiate rabbitmq object -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) # Parse arguments args = rc_util.get_args() @@ -19,25 +19,23 @@ args = rc_util.get_args() logger = rc_util.get_logger() # Open registry table in DB -db = dataset.connect('sqlite:///reg_logger.db') -account_req_table = db['registry'] +db = dataset.connect("sqlite:///reg_logger.db") +account_req_table = db["registry"] # Define registration logger callback def log_registration(ch, method, properties, body): account_req = json.loads(body) - account_req['req_time'] = datetime.now(), + account_req["req_time"] = (datetime.now(),) account_req_table.insert(account_req) - logger.info("logged account request for %s", account_req['username']) + logger.info("logged account request for %s", account_req["username"]) ch.basic_ack(delivery_tag=method.delivery_tag) + logger.info("Start listening to queue: {}".format(task)) # Start consuming messages from queue with callback function -rc_rmq.start_consume({ - 'queue': task, - 'routing_key': "create.*", - 'cb': log_registration -}) - +rc_rmq.start_consume( + {"queue": task, "routing_key": "create.*", "cb": log_registration} +) diff --git a/rc_util.py b/rc_util.py index 74d8b7e57888f3e79c95303aa3e6e63ee37dd0bb..8caba5d04bb432294993706a59d71ca81bab0e6e 100644 --- a/rc_util.py +++ b/rc_util.py @@ -4,40 +4,49 @@ from rc_rmq import RCRMQ import json from urllib.parse import quote -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) -tasks = {'create_account': None, 'git_commit': None, 'dir_verify': None, 'subscribe_mail_list': None, 'notify_user': None} -logger_fmt = '%(asctime)s [%(module)s] - %(message)s' +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) +tasks = { + "create_account": None, + "git_commit": None, + "dir_verify": None, + "subscribe_mail_list": None, + "notify_user": None, +} +logger_fmt = "%(asctime)s [%(module)s] - %(message)s" + def add_account(username, queuename, email, full="", reason=""): rc_rmq.publish_msg( { "routing_key": "request." + queuename, "msg": { - "username": username, - "email": email, - "fullname": full, + "username": username, + "email": email, + "fullname": full, "reason": reason, "queuename": queuename, }, - } + } ) rc_rmq.disconnect() + def worker(ch, method, properties, body): msg = json.loads(body) - username = msg['username'] + username = msg["username"] - if msg['success']: - print(f'Account for {username} has been created.') + if msg["success"]: + print(f"Account for {username} has been created.") else: print(f"There's some issue while creating account for {username}") - errmsg = msg.get('errmsg', []) + errmsg = msg.get("errmsg", []) for err in errmsg: print(err) rc_rmq.stop_consume() rc_rmq.delete_queue() + def consume(queuename, routing_key="", callback=worker, debug=False): if routing_key == "": routing_key = "complete." + queuename @@ -54,15 +63,21 @@ def consume(queuename, routing_key="", callback=worker, debug=False): ) rc_rmq.disconnect() - return { 'success' : True } + return {"success": True} + def get_args(): # Parse arguments parser = argparse.ArgumentParser() - parser.add_argument('-v', '--verbose', action='store_true', help='verbose output') - parser.add_argument('-n', '--dry-run', action='store_true', help='enable dry run mode') + parser.add_argument( + "-v", "--verbose", action="store_true", help="verbose output" + ) + parser.add_argument( + "-n", "--dry-run", action="store_true", help="enable dry run mode" + ) return parser.parse_args() + def get_logger(args=None): if args is None: args = get_args()