Skip to content
Snippets Groups Projects
tasks.py 6.1 KiB
Newer Older
Bo-Chun Chen's avatar
Bo-Chun Chen committed
"""
This python module defines celery tasks for following fucntions:
    * Account creation
    * Account certification
"""
# standard imports
Bo-Chun Chen's avatar
Bo-Chun Chen committed
import sys
Bo-Chun Chen's avatar
Bo-Chun Chen committed
# third-party imports
from celery import Celery
from flask_socketio import SocketIO
root's avatar
root committed

Bo-Chun Chen's avatar
Bo-Chun Chen committed
# local imports
import app_vars

sys.path.append(app_vars.rabbitmq_agents_loc)

# pylint: disable=wrong-import-position

import rc_util  # noqa: E402
Bo-Chun Chen's avatar
Bo-Chun Chen committed
# pylint: enable=wrong-import-position
Bo-Chun Chen's avatar
Bo-Chun Chen committed

broker_url = app_vars.broker_url
celery = Celery(app_vars.celery_app, broker=broker_url)

socketio = SocketIO(message_queue=app_vars.message_queue)
timeout = 30
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    """
    This function defines the callback function for creating account process.
    Inputs:
        room: Room/session ID for the flask session, so that communications
            are going to the right session.
    Output:
        RabbitMQ callback function.
    """

Bo-Chun Chen's avatar
Bo-Chun Chen committed
        """
        This function defines the RabbitMQ callback function executed after
        account creation process.
        Inputs:
            channel: channel over which the RabbitMQ communication is
                happening.
            method: Defines meta information regarding the message delivery.
            properties: user-defined properties on the message
            body: Message that is passed throughout account creation process
                across multiple agents.
        Output:
            Send appropriate message to the frontend, for account creation
            success or failure. And delete the queue.
        """
Bo-Chun Chen's avatar
Bo-Chun Chen committed
        username = msg["username"]
        queuename = msg["queuename"]
Bo-Chun Chen's avatar
Bo-Chun Chen committed
        if msg["success"]:
            print(f"Account for {username} has been created.")
            send_msg("account ready", room)
        else:
            print(f"There's some issue while creating account for {username}")
Bo-Chun Chen's avatar
Bo-Chun Chen committed
            errmsg = msg.get("errmsg", [])
Bo-Chun Chen's avatar
Bo-Chun Chen committed
            socketio.emit("account error", errmsg, room=room)
        rc_util.rc_rmq.delete_queue(queuename)
Krish Moodbidri's avatar
Krish Moodbidri committed
def certify_gen_f(room):
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    """
    This function defines the callback function for certifying account process.
    Inputs:
        room: Room/session ID for the flask session, so that communications are
            going to the right session.
    Output:
        RabbitMQ callback function.
    """

Krish Moodbidri's avatar
Krish Moodbidri committed
    def callback(channel, method, properties, body):
Bo-Chun Chen's avatar
Bo-Chun Chen committed
        """
        This function defines the RabbitMQ callback function executed after
        account certification process.
        Inputs:
            channel: channel over which the RabbitMQ communication is happening
            method: Defines meta information regarding the message delivery.
            properties: user-defined properties on the message
            body: Message that is passed throughout account creation process
                across multiple agents.
        Output:
            Send appropriate message to the frontend, for account certification
            success or failure. And delete the queue.
        """
Krish Moodbidri's avatar
Krish Moodbidri committed
        msg = json.loads(body)
Bo-Chun Chen's avatar
Bo-Chun Chen committed
        username = msg["username"]
        queuename = msg["queuename"]
Bo-Chun Chen's avatar
Bo-Chun Chen committed
        if msg["success"]:
            print(f"Account for {username} has been certified.")
            send_msg("certified", room)
Krish Moodbidri's avatar
Krish Moodbidri committed
        else:
Bo-Chun Chen's avatar
Bo-Chun Chen committed
            print(
                f"There's some issue while certifying account for {username}"
            )
            errmsg = msg.get("errmsg", [])
Krish Moodbidri's avatar
Krish Moodbidri committed
            for err in errmsg:
                print(err)
Bo-Chun Chen's avatar
Bo-Chun Chen committed
            socketio.emit("certify error", errmsg, room=room)
Krish Moodbidri's avatar
Krish Moodbidri committed

        rc_util.rc_rmq.stop_consume()
        rc_util.rc_rmq.delete_queue(queuename)
Krish Moodbidri's avatar
Krish Moodbidri committed
    return callback

Bo-Chun Chen's avatar
Bo-Chun Chen committed
    """
    This function is used to send messages via socketio
    Input:
        string event, room:
    Output:
        string: emit event to room
    """
Bo-Chun Chen's avatar
Bo-Chun Chen committed

# def timeout_handler(signum, frame):
#    print("Process timeout, there's might some issue with agents")
#    socketio.emit("account error", errmsg, room=room)
#    rc_util.rc_rmq.stop_consume()
#    rc_util.rc_rmq.delete_queue()

Bo-Chun Chen's avatar
Bo-Chun Chen committed
def celery_create_account(msg, session):
    """
    This function is used to create account for new users
    Input:
        msg: json object of all user attributes and session/room
    Output:
        gen_f(room): callback to check for success or failure
    """
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    username = msg["username"]
    email = msg["email"]
    fullname = msg["fullname"]
    reason = msg["reason"]
    #    aup= msg['aup']
    queuename = rc_util.encode_name(username)
    updated_by = f"{username}"
    host = app_vars.app_host

    print(
        time.strftime("%m-%d-%Y_%H:%M:%S")
        + "\tUser "
        + username
        + " added to queue"
    )
    send_msg("creating account", room)
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    rc_util.add_account(
        username, queuename, email, fullname, reason, updated_by, host
    )
    print("sent account info")
    print("Waiting for completion...")
    rc_util.consume(
        queuename, routing_key=f"complete.{queuename}", callback=gen_f(room)
    )

Bo-Chun Chen's avatar
Bo-Chun Chen committed
def celery_certify_account(msg, session):
    """
    This function is used to certify account for existing users
    Input:
        msg: json object of all user attributes and session/room
    Output:
        gen_f(room): callback to check for success or failure
    """
    room = session
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    username = msg["username"]
    queuename = rc_util.encode_name(username)
    updated_by = f"{username}"
    host = app_vars.app_host

    print(
        "CERTIFY : "
        + time.strftime("%m-%d-%Y_%H:%M:%S")
        + "\tUser "
        + username
        + " added to queue"
    )
    send_msg("certifying account", room)
    print(username)
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    rc_util.certify_account(username, queuename, "ok", "all", updated_by, host)
    print("sent account info")
    print("Waiting for certification...")
    rc_util.consume(
        queuename,
        routing_key=f"certified.{queuename}",
        callback=certify_gen_f(room),
    )