Skip to content
Snippets Groups Projects
tasks.py 3.14 KiB
Newer Older
root's avatar
root committed
import sys
Krish Moodbidri's avatar
Krish Moodbidri committed
import signal

from celery import Celery
from flask_socketio import SocketIO
root's avatar
root committed

broker_url = vars.broker_url
celery = Celery(vars.celery_app, broker=broker_url)
socketio = SocketIO(message_queue=vars.message_queue)
timeout = 30
def gen_f(room):
    def callback(channel, method, properties, body):
        msg = json.loads(body)
        username = msg['username']
        queuename = msg['queuename']

        if msg['success']:
            print(f'Account for {username} has been created.')
            send_msg('account ready', room)
        else:
            print(f"There's some issue while creating account for {username}")
            errmsg = msg.get('errmsg', [])
            for err in errmsg:
                print(err)
            socketio.emit('account error', errmsg, room= room)

        rc_util.rc_rmq.stop_consume()
        rc_util.rc_rmq.delete_queue(queuename)
Krish Moodbidri's avatar
Krish Moodbidri committed
def certify_gen_f(room):
    def callback(channel, method, properties, body):
        msg = json.loads(body)
        username = msg['username']
        queuename = msg['queuename']
Krish Moodbidri's avatar
Krish Moodbidri committed

        if msg['success']:
            print(f'Account for {username} has been certified.')
            send_msg('certified', room)
        else:
            print(f"There's some issue while certifying account for {username}")
            errmsg = msg.get('errmsg', [])
            for err in errmsg:
                print(err)
            socketio.emit('certify error', errmsg, room= room)

        rc_util.rc_rmq.stop_consume()
        rc_util.rc_rmq.delete_queue(queuename)
Krish Moodbidri's avatar
Krish Moodbidri committed
    return callback

Krish Moodbidri's avatar
Krish Moodbidri committed
def timeout_handler(signum, frame):
    print("Process timeout, there's might some issue with agents")
    socketio.emit('account error', errmsg, room= room)
Krish Moodbidri's avatar
Krish Moodbidri committed
    rc_util.rc_rmq.stop_consume()
    rc_util.rc_rmq.delete_queue()
def celery_create_account(json, session):
    username= json['username'] 
    email= json['email']
    fullname= json['fullname']
    reason= json['reason']
Krish Moodbidri's avatar
Krish Moodbidri committed
#    aup= json['aup']
    queuename= rc_util.encode_name(username)
    print(time.strftime("%m-%d-%Y_%H:%M:%S") + '\tUser ' + username + ' added to queue')
Krish Moodbidri's avatar
Krish Moodbidri committed
    rc_util.add_account(username, queuename, email, fullname, reason)
root's avatar
root committed
    print('sent account info')
    print('Waiting for completion...')
    rc_util.consume(queuename, routing_key=f'complete.{queuename}', callback=gen_f(room))

@celery.task
def celery_certify_account(json, session):
    room = session
    username= json['username']
    email= json['email']
    fullname= json['fullname']
    queuename= rc_util.encode_name(username)

    print("CERTIFY : "+time.strftime("%m-%d-%Y_%H:%M:%S") + '\tUser ' + username + ' added to queue')
    send_msg('certifying account', room)
    print(username)
Krish Moodbidri's avatar
Krish Moodbidri committed
    rc_util.certify_account(username, queuename, 'ok', 'all')
    print('sent account info')
    print('Waiting for certification...')
Krish Moodbidri's avatar
Krish Moodbidri committed
    rc_util.consume(queuename, routing_key=f'certified.{queuename}', callback=certify_gen_f(room))