Skip to content
Snippets Groups Projects
task_manager.py 8.63 KiB
Newer Older
Bo-Chun Chen's avatar
Bo-Chun Chen committed
#!/usr/bin/env python
import sys
import copy
Bo-Chun Chen's avatar
Bo-Chun Chen committed
import json
Bo-Chun Chen's avatar
Bo-Chun Chen committed
import signal
Bo-Chun Chen's avatar
Bo-Chun Chen committed
import dataset
Bo-Chun Chen's avatar
Bo-Chun Chen committed
import rc_util
Bo-Chun Chen's avatar
Bo-Chun Chen committed
import smtplib
Bo-Chun Chen's avatar
Bo-Chun Chen committed
from rc_rmq import RCRMQ
Bo-Chun Chen's avatar
Bo-Chun Chen committed
from datetime import datetime
Bo-Chun Chen's avatar
Bo-Chun Chen committed
import mail_config as mail_cfg
Bo-Chun Chen's avatar
Bo-Chun Chen committed

task = 'task_manager'
Bo-Chun Chen's avatar
Bo-Chun Chen committed
timeout = 30
Bo-Chun Chen's avatar
Bo-Chun Chen committed
args = rc_util.get_args()
logger = rc_util.get_logger(args)

Bo-Chun Chen's avatar
Bo-Chun Chen committed
db = dataset.connect(f'sqlite:///.agent_db/user_reg.db')
table = db['users']

Bo-Chun Chen's avatar
Bo-Chun Chen committed
record = {
    'uid': -1,
    'gid': -1,
    'email': '',
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    'reason': '',
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    'fullname': '',
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    'last_update': datetime.now(),
    'errmsg': None,
    'waiting': set(),
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    'request': {
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    },
    'verify': {
        'git_commit': None,
        'dir_verify': None,
        'subscribe_mail_list': None
    },
    'notify': {
        'notify_user': None
    },
    'reported': False
Bo-Chun Chen's avatar
Bo-Chun Chen committed
}

# Currently tracking users
tracking = {}

# Instantiate rabbitmq object
rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'})

def notify_admin(username, user_record):
    receivers = [user_record['email'], mail_cfg.Admin_email]
    message = Template(mail_cfg.UserReportHead).render(username=username, fullname=user_record['fullname'])
    if user_record['reported']:
        message += ' (Duplicate)'
    message += f""" \n
    User Creation Report for user {username}
    uid: {user_record["uid"]}, gid: {user_record["gid"]}
    Tasks:
    'create_account':      {user_record["request"]["create_account"]}
    'git_commit':          {user_record["verify"]["git_commit"]}
    'dir_verify':          {user_record["verify"]["dir_verify"]}
    'subscribe_mail_list': {user_record["verify"]["subscribe_mail_list"]}
    'notify_user':         {user_record["notify"]["notify_user"]}
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    """
    if user_record['errmsg']:
        message += """

        Error(s):
        """
        for msg in user_record['errmsg']:
            message += msg + "\n"
Bo-Chun Chen's avatar
Bo-Chun Chen committed

    if args.dry_run:
        logger.info(f'smtp = smtplib.SMTP({mail_cfg.Server})')
        logger.info(f'smtp.sendmail({mail_cfg.Sender}, {mail_cfg.Admin_email}, message)')
        logger.info(message)

    else:
        smtp = smtplib.SMTP(mail_cfg.Server)
        smtp.sendmail(mail_cfg.Sender, receivers, message)

        logger.debug(f'User report sent to: {mail_cfg.Admin_email}')


Bo-Chun Chen's avatar
Bo-Chun Chen committed
def insert_db(username, msg):
    # Search username in db
    record = table.find_one(username=username)

    if not record:
        # SQL insert
        table.insert({
          'username': username,
          'uid': msg.get('uid', -1),
          'gid': msg.get('gid', -1),
          'email': msg.get('email', ''),
          'reason': msg.get('reason', ''),
          'fullname': msg.get('fullname', ''),
          'create_account': None,
          'git_commit': None,
          'dir_verify': None,
          'subscribe_mail_list': None,
          'notify_user': None,
Bo-Chun Chen's avatar
Bo-Chun Chen committed
          'sent': None,
          'reported': False,
Bo-Chun Chen's avatar
Bo-Chun Chen committed
          'last_update': datetime.now()
        })


def update_db(username, data):
    obj = { 'username': username, **data }
    table.update(obj, ['username'])


Bo-Chun Chen's avatar
Bo-Chun Chen committed
def task_manager(ch, method, properties, body):
    msg = json.loads(body)
    username = method.routing_key.split('.')[1]
    task_name = msg['task']
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    success = msg['success']
    send = completed = terminated = False
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    if username in tracking:
        current = tracking[username]

    else:
        user_db = table.find_one(username=username)

        current = tracking[username] = copy.deepcopy(record)
        current['errmsg'] = []
        current['uid'] = user_db['uid'] if user_db else msg['uid']
        current['gid'] = user_db['gid'] if user_db else msg['gid']
        current['email'] = user_db['email'] if user_db else msg['email']
        current['reason'] = user_db['reason'] if user_db else msg['reason']
        current['fullname'] = user_db['fullname'] if user_db else msg['fullname']

        if user_db:
            # Restore task status
            current['request']['create_account'] = user_db['create_account']
            current['verify']['git_commit'] = user_db['git_commit']
            current['verify']['dir_verify'] = user_db['dir_verify']
            current['verify']['subscribe_mail_list'] = user_db['subscribe_mail_list']
            current['notify']['notify_user'] = user_db['notify_user']

            current['reported'] = user_db['reported']

            for t in ['git_commit', 'dir_verify', 'subscribe_mail_list']:
                if user_db[t] is None:
                    current['waiting'].add(t)

            if not current['waiting'] and user_db['notify_user'] is None:
                current['waiting'].add('notify_user')

            logger.debug(f'Loaded user {username} from DB')

        else:
            insert_db(username, msg)

            logger.debug(f'Tracking user {username}')
    current['last_update'] = datetime.now()

    # Update Database
    update_db(username, {
        task_name: success,
        'last_update': current['last_update']}
    )

Bo-Chun Chen's avatar
Bo-Chun Chen committed
    # Save error message if the task was failed
    if not success:
        errmsg = msg.get('errmsg', '')
        if errmsg:
            current['errmsg'].append(f"{task_name}: {errmsg}")

    # Define message that's going to be published
    message = {
        'username': username,
        'uid': current['uid'],
        'gid': current['gid'],
        'email': current['email'],
        'reason': current['reason'],
        'fullname': current['fullname']
    }

Bo-Chun Chen's avatar
Bo-Chun Chen committed
    try:
        if task_name in current['request']:
            current['request'][task_name] = success
            routing_key = 'verify.' + username
Bo-Chun Chen's avatar
Bo-Chun Chen committed
            # Terminate the process if failed
            if not success:
                terminated = True
                routing_key = 'complete.' + username
                message['success'] = False
                message['errmsg'] = current['errmsg']
Bo-Chun Chen's avatar
Bo-Chun Chen committed
            send = True
            current['waiting'] = {'git_commit', 'dir_verify', 'subscribe_mail_list'}
Bo-Chun Chen's avatar
Bo-Chun Chen committed
            logger.debug(f'Request level {task_name}? {success}')
Bo-Chun Chen's avatar
Bo-Chun Chen committed

Bo-Chun Chen's avatar
Bo-Chun Chen committed
        elif task_name in current['verify']:
            current['verify'][task_name] = success
            current['waiting'].discard(task_name)
Bo-Chun Chen's avatar
Bo-Chun Chen committed
            routing_key = 'notify.' + username
            if not current['waiting']:
                send = True
                current['waiting'] = {'notify_user'}
            # Terminate if dir_verify failed and all agents has responsed
            if send and not current['verify']['dir_verify']:
                terminated = True
                routing_key = 'complete.' + username
                message['success'] = False
                message['errmsg'] = current['errmsg']
Bo-Chun Chen's avatar
Bo-Chun Chen committed
            logger.debug(f'Verify level {task_name}? {success}')
Bo-Chun Chen's avatar
Bo-Chun Chen committed

        elif task_name in current['notify']:
            current['waiting'].discard(task_name)
            routing_key = 'complete.' + username
            message['success'] = success
            message['errmsg'] = current['errmsg']

Bo-Chun Chen's avatar
Bo-Chun Chen committed
            send = True

Bo-Chun Chen's avatar
Bo-Chun Chen committed
            # The whole creation process has completed
            completed = True
Bo-Chun Chen's avatar
Bo-Chun Chen committed
            logger.debug(f'Notify level {task_name}? {success}')
Bo-Chun Chen's avatar
Bo-Chun Chen committed

    except Exception as exception:
        logger.error('', exc_info=True)
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    if send:
        # Send trigger message
Bo-Chun Chen's avatar
Bo-Chun Chen committed
        rc_rmq.publish_msg({
Bo-Chun Chen's avatar
Bo-Chun Chen committed
            'routing_key': routing_key,
            'msg': message
Bo-Chun Chen's avatar
Bo-Chun Chen committed
        logger.debug(f"Trigger message '{routing_key}' sent")

        logger.debug('Previous level messages acknowledged')

    # Send report to admin
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    if completed or terminated:
        notify_admin(username, current)
        update_db(username, {'reported': True})

        tracking.pop(username)

        logger.debug('Admin report sent')

    # Acknowledge message
    ch.basic_ack(method.delivery_tag)

Bo-Chun Chen's avatar
Bo-Chun Chen committed
def timeout_handler(signum, frame):
    current_time = datetime.now()
    for user in tuple(tracking):
        delta = current_time - tracking[user]['last_update']
Bo-Chun Chen's avatar
Bo-Chun Chen committed

        if delta.seconds > timeout:

            rc_rmq.publish_msg({
                'routing_key': 'complete.' + user,
                'msg': {
                    'username': user,
                    'success': False,
                    'errmsg': ["Timeout on " + ', '.join(tracking[user]['waiting'])]
                }
            })

            notify_admin(user, tracking[user])

            update_db(user, {'reported': True})

Bo-Chun Chen's avatar
Bo-Chun Chen committed
            tracking.pop(user)


# Set initial timeout timer
signal.signal(signal.SIGALRM, timeout_handler)
signal.setitimer(signal.ITIMER_REAL, timeout, timeout)

Bo-Chun Chen's avatar
Bo-Chun Chen committed
logger.info(f'Start listening to queue: {task}')
Bo-Chun Chen's avatar
Bo-Chun Chen committed
rc_rmq.start_consume({
    'queue': task,
    'routing_key': "confirm.*",
    'cb': task_manager
})

Bo-Chun Chen's avatar
Bo-Chun Chen committed
logger.info('Disconnected')
Bo-Chun Chen's avatar
Bo-Chun Chen committed
rc_rmq.disconnect()