Skip to content
Snippets Groups Projects
task_manager.py 5.77 KiB
Newer Older
Bo-Chun Chen's avatar
Bo-Chun Chen committed
#!/usr/bin/env python
import sys
import json
Bo-Chun Chen's avatar
Bo-Chun Chen committed
import rc_util
Bo-Chun Chen's avatar
Bo-Chun Chen committed
import smtplib
Bo-Chun Chen's avatar
Bo-Chun Chen committed
from rc_rmq import RCRMQ
from datetime import datetime
Bo-Chun Chen's avatar
Bo-Chun Chen committed
import mail_config as mail_cfg
Bo-Chun Chen's avatar
Bo-Chun Chen committed

task = 'task_manager'

Bo-Chun Chen's avatar
Bo-Chun Chen committed
args = rc_util.get_args()
logger = rc_util.get_logger(args)

Bo-Chun Chen's avatar
Bo-Chun Chen committed
record = {
    'uid': -1,
    'gid': -1,
    'email': '',
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    'reason': '',
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    'fullname': '',
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    'last_update': datetime.now(),
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    'errmsg': [],
    'waiting': set(),
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    'request': {
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    },
    'verify': {
        'git_commit': None,
        'dir_verify': None,
        'subscribe_mail_list': None
    },
    'notify': {
        'notify_user': None
    'delivery_tags': None
Bo-Chun Chen's avatar
Bo-Chun Chen committed
}

# Currently tracking users
tracking = {}

# Instantiate rabbitmq object
rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'})

Bo-Chun Chen's avatar
Bo-Chun Chen committed
def notify_admin(username, user_record):
    receivers = [user_record['email'], mail_cfg.Admin_email]
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    message = mail_cfg.UserReportHead
    message += f"""
        User Creation Report for user {username}
        uid: {user_record["uid"]}, gid: {user_record["gid"]}
        Tasks:
            'create_account':      {user_record["request"]["create_account"]}
            'git_commit':          {user_record["verify"]["git_commit"]}
            'dir_verify':          {user_record["verify"]["dir_verify"]}
            'subscribe_mail_list': {user_record["verify"]["subscribe_mail_list"]}
            'notify_user':         {user_record["notify"]["notify_user"]}
    """
    if user_record['errmsg']:
        message += """

        Error(s):
        """
        for msg in user_record['errmsg']:
            message += msg + "\n"
Bo-Chun Chen's avatar
Bo-Chun Chen committed

    if args.dry_run:
        logger.info(f'smtp = smtplib.SMTP({mail_cfg.Server})')
        logger.info(f'smtp.sendmail({mail_cfg.Sender}, {mail_cfg.Admin_email}, message)')
        logger.info(message)

    else:
        smtp = smtplib.SMTP(mail_cfg.Server)
        smtp.sendmail(mail_cfg.Sender, receivers, message)

        logger.debug(f'User report sent to: {mail_cfg.Admin_email}')


Bo-Chun Chen's avatar
Bo-Chun Chen committed
def task_manager(ch, method, properties, body):
    msg = json.loads(body)
    username = method.routing_key.split('.')[1]
    task_name = msg['task']
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    success = msg['success']
    send = completed = terminated = False
Bo-Chun Chen's avatar
Bo-Chun Chen committed

    if username not in tracking:
        current = tracking[username] = record.copy()
        current['delivery_tags'] = []
Bo-Chun Chen's avatar
Bo-Chun Chen committed
        current['uid'] = msg.get('uid', -1)
        current['gid'] = msg.get('gid', -1)
        current['email'] = msg.get('email', '')
Bo-Chun Chen's avatar
Bo-Chun Chen committed
        current['reason'] = msg.get('reason', '')
Bo-Chun Chen's avatar
Bo-Chun Chen committed
        current['fullname'] = msg.get('fullname', '')
Bo-Chun Chen's avatar
Bo-Chun Chen committed

        logger.debug(f'Tracking user {username}')
    else:
        current = tracking[username]
    # Save the delivery tags for future use
    current['delivery_tags'].append(method.delivery_tag)

Bo-Chun Chen's avatar
Bo-Chun Chen committed
    # Save error message if the task was failed
    if not success:
        errmsg = msg.get('errmsg', '')
        if errmsg:
            current['errmsg'].append(f"{task_name}: {errmsg}")

    # Define message that's going to be published
    message = {
        'username': username,
        'uid': current['uid'],
        'gid': current['gid'],
        'email': current['email'],
        'reason': current['reason'],
        'fullname': current['fullname']
    }

Bo-Chun Chen's avatar
Bo-Chun Chen committed
    try:
        if task_name in current['request']:
            current['request'][task_name] = success
            routing_key = 'verify.' + username
Bo-Chun Chen's avatar
Bo-Chun Chen committed
            # Terminate the process if failed
            if not success:
                terminated = True
                routing_key = 'complete.' + username
                message['success'] = False
                message['errmsg'] = current['errmsg']
Bo-Chun Chen's avatar
Bo-Chun Chen committed
            send = True
            current['waiting'] = {'git_commit', 'dir_verify', 'subscribe_mail_list'}
Bo-Chun Chen's avatar
Bo-Chun Chen committed
            logger.debug(f'Request level {task_name}? {success}')
Bo-Chun Chen's avatar
Bo-Chun Chen committed

Bo-Chun Chen's avatar
Bo-Chun Chen committed
        elif task_name in current['verify']:
            current['verify'][task_name] = success
            current['waiting'].discard(task_name)
Bo-Chun Chen's avatar
Bo-Chun Chen committed
            routing_key = 'notify.' + username
            if not current['waiting']:
                send = True
                current['waiting'] = {'notify_user'}
            # Terminate if dir_verify failed and all agents has responsed
            if send and not current['verify']['dir_verify']:
                terminated = True
                routing_key = 'complete.' + username
                message['success'] = False
                message['errmsg'] = current['errmsg']
Bo-Chun Chen's avatar
Bo-Chun Chen committed
            logger.debug(f'Verify level {task_name}? {success}')
Bo-Chun Chen's avatar
Bo-Chun Chen committed

        elif task_name in current['notify']:
            current['waiting'].discard(task_name)
            routing_key = 'complete.' + username
            message['success'] = success
            message['errmsg'] = current['errmsg']

Bo-Chun Chen's avatar
Bo-Chun Chen committed
            send = True

Bo-Chun Chen's avatar
Bo-Chun Chen committed
            # The whole creation process has completed
            completed = True
Bo-Chun Chen's avatar
Bo-Chun Chen committed
            logger.debug(f'Notify level {task_name}? {success}')
Bo-Chun Chen's avatar
Bo-Chun Chen committed

    except Exception as exception:
        logger.error('', exc_info=True)
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    if send:
        # Send trigger message
Bo-Chun Chen's avatar
Bo-Chun Chen committed
        rc_rmq.publish_msg({
Bo-Chun Chen's avatar
Bo-Chun Chen committed
            'routing_key': routing_key,
            'msg': message
Bo-Chun Chen's avatar
Bo-Chun Chen committed
        logger.debug(f"Trigger message '{routing_key}' sent")

        # Acknowledge all message from last level
        for tag in current['delivery_tags']:
            ch.basic_ack(tag)
        current['delivery_tags'] = []

        logger.debug('Previous level messages acknowledged')

Bo-Chun Chen's avatar
Bo-Chun Chen committed
    # Send report to admin after all tasks confirmed or terminated
    if completed or terminated:
        notify_admin(username, current)

Bo-Chun Chen's avatar
Bo-Chun Chen committed
logger.info(f'Start listening to queue: {task}')
Bo-Chun Chen's avatar
Bo-Chun Chen committed
rc_rmq.start_consume({
    'queue': task,
    'routing_key': "confirm.*",
    'cb': task_manager
})

Bo-Chun Chen's avatar
Bo-Chun Chen committed
logger.info('Disconnected')
Bo-Chun Chen's avatar
Bo-Chun Chen committed
rc_rmq.disconnect()