Skip to content
Snippets Groups Projects
task_manager.py 3.52 KiB
Newer Older
Bo-Chun Chen's avatar
Bo-Chun Chen committed
#!/usr/bin/env python
import sys
import json
Bo-Chun Chen's avatar
Bo-Chun Chen committed
import rc_util
Bo-Chun Chen's avatar
Bo-Chun Chen committed
from rc_rmq import RCRMQ
from datetime import datetime

task = 'task_manager'

Bo-Chun Chen's avatar
Bo-Chun Chen committed
args = rc_util.get_args()
logger = rc_util.get_logger(args)

Bo-Chun Chen's avatar
Bo-Chun Chen committed
record = {
    'uid': -1,
    'gid': -1,
    'email': '',
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    'fullname': '',
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    'last_update': datetime.now(),
    'request': {
Bo-Chun Chen's avatar
Bo-Chun Chen committed
        'get_next_uid_gid': None
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    },
    'create': {
Bo-Chun Chen's avatar
Bo-Chun Chen committed
        'subscribe_mail_list': None,
        'bright_account': None
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    },
    'verify': {
        'git_commit': None,
        'dir_verify': None
    },
    'notify': {
        'notify_user': None
    'delivery_tags': None
Bo-Chun Chen's avatar
Bo-Chun Chen committed
}

# Currently tracking users
tracking = {}

# Instantiate rabbitmq object
rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'})

def task_manager(ch, method, properties, body):
    msg = json.loads(body)
    username = method.routing_key.split('.')[1]
    task_name = msg['task']
    done = success = msg['success']
Bo-Chun Chen's avatar
Bo-Chun Chen committed

    if username not in tracking:
        current = tracking[username] = record.copy()
        current['delivery_tags'] = []
Bo-Chun Chen's avatar
Bo-Chun Chen committed
        current['uid'] = msg.get('uid', -1)
        current['gid'] = msg.get('gid', -1)
        current['email'] = msg.get('email', '')
Bo-Chun Chen's avatar
Bo-Chun Chen committed
        current['fullname'] = msg.get('fullname', '')
Bo-Chun Chen's avatar
Bo-Chun Chen committed

        logger.debug(f'Tracking user {username}')
    else:
        current = tracking[username]
    # Save the delivery tags for future use
    current['delivery_tags'].append(method.delivery_tag)

Bo-Chun Chen's avatar
Bo-Chun Chen committed
    try:
        if task_name in current['request']:
            current['request'][task_name] = success
            routing_key = 'create.' + username
            done = success

Bo-Chun Chen's avatar
Bo-Chun Chen committed
            logger.debug(f'Request level task(s) done?{done}')

Bo-Chun Chen's avatar
Bo-Chun Chen committed
        elif task_name in current['create']:
            current['create'][task_name] = success
            routing_key = 'verify.' + username
            done = True
            for status in current['create'].values():
                if status is not True:
                    done = False

Bo-Chun Chen's avatar
Bo-Chun Chen committed
            logger.debug(f'Create level task(s) done?{done}')

Bo-Chun Chen's avatar
Bo-Chun Chen committed
        elif task_name in current['verify']:
            current['verify'][task_name] = success
            routing_key = 'notify.' + username
            done = True
            for status in current['verify'].values():
                if status is not True:
                    done = False

Bo-Chun Chen's avatar
Bo-Chun Chen committed
            logger.debug(f'Verify level task(s) done?{done}')

        elif task_name in current['notify']:
            current['verify'][task_name] = success
            routing_key = 'complete.' + username
            done = success

Bo-Chun Chen's avatar
Bo-Chun Chen committed
            logger.debug(f'Notify level task(s) done?{done}')

    except Exception as exception:
        logger.error('', exc_info=True)
Bo-Chun Chen's avatar
Bo-Chun Chen committed

    if done:
        # Send trigger message
Bo-Chun Chen's avatar
Bo-Chun Chen committed
        rc_rmq.publish_msg({
Bo-Chun Chen's avatar
Bo-Chun Chen committed
            'routing_key': routing_key,
Bo-Chun Chen's avatar
Bo-Chun Chen committed
            'msg': {
                'username': username,
Bo-Chun Chen's avatar
Bo-Chun Chen committed
                'fullname': current['fullname'],
Bo-Chun Chen's avatar
Bo-Chun Chen committed
                'email': current['email'],
                'uid': current['uid'],
                'gid': current['gid']
            }
        })

Bo-Chun Chen's avatar
Bo-Chun Chen committed
        logger.debug(f"Trigger message '{routing_key}' sent")

        # Acknowledge all message from last level
        for tag in current['delivery_tags']:
            ch.basic_ack(tag)
        current['delivery_tags'] = []

        logger.debug('Previous level messages acknowledged')

Bo-Chun Chen's avatar
Bo-Chun Chen committed
logger.info(f'Start listening to queue: {task}')
Bo-Chun Chen's avatar
Bo-Chun Chen committed
rc_rmq.start_consume({
    'queue': task,
    'routing_key': "confirm.*",
    'cb': task_manager
})

Bo-Chun Chen's avatar
Bo-Chun Chen committed
logger.info('Disconnected')
Bo-Chun Chen's avatar
Bo-Chun Chen committed
rc_rmq.disconnect()