Newer
Older
"""
This python module defines celery tasks for following fucntions:
* Account creation
* Account certification
"""
# standard imports

Krish Moodbidri
committed
import time

Krish Moodbidri
committed
from celery import Celery
from flask_socketio import SocketIO
# local imports
import app_vars
sys.path.append(app_vars.rabbitmq_agents_loc)
# pylint: disable=wrong-import-position
import rc_util # noqa: E402
Mitchell Moore
committed
Mitchell Moore
committed
broker_url = app_vars.broker_url
celery = Celery(app_vars.celery_app, broker=broker_url)
socketio = SocketIO(message_queue=app_vars.message_queue)
Mitchell Moore
committed

Krish Moodbidri
committed
def gen_f(room):
"""
This function defines the callback function for creating account process.
Inputs:
room: Room/session ID for the flask session, so that communications
are going to the right session.
Output:
RabbitMQ callback function.
"""

Krish Moodbidri
committed
def callback(channel, method, properties, body):
"""
This function defines the RabbitMQ callback function executed after
account creation process.
Inputs:
channel: channel over which the RabbitMQ communication is
happening.
method: Defines meta information regarding the message delivery.
properties: user-defined properties on the message
body: Message that is passed throughout account creation process
across multiple agents.
Output:
Send appropriate message to the frontend, for account creation
success or failure. And delete the queue.
"""

Krish Moodbidri
committed
msg = json.loads(body)
username = msg["username"]
queuename = msg["queuename"]

Krish Moodbidri
committed
if msg["success"]:
print(f"Account for {username} has been created.")
send_msg("account ready", room)

Krish Moodbidri
committed
else:
print(f"There's some issue while creating account for {username}")

Krish Moodbidri
committed
for err in errmsg:
print(err)
socketio.emit("account error", errmsg, room=room)

Krish Moodbidri
committed
rc_util.rc_rmq.stop_consume()
rc_util.rc_rmq.delete_queue(queuename)

Krish Moodbidri
committed
return callback
Mitchell Moore
committed
"""
This function defines the callback function for certifying account process.
Inputs:
room: Room/session ID for the flask session, so that communications are
going to the right session.
Output:
RabbitMQ callback function.
"""
"""
This function defines the RabbitMQ callback function executed after
account certification process.
Inputs:
channel: channel over which the RabbitMQ communication is happening
method: Defines meta information regarding the message delivery.
properties: user-defined properties on the message
body: Message that is passed throughout account creation process
across multiple agents.
Output:
Send appropriate message to the frontend, for account certification
success or failure. And delete the queue.
"""
username = msg["username"]
queuename = msg["queuename"]
if msg["success"]:
print(f"Account for {username} has been certified.")
send_msg("certified", room)
print(
f"There's some issue while certifying account for {username}"
)
errmsg = msg.get("errmsg", [])
socketio.emit("certify error", errmsg, room=room)
rc_util.rc_rmq.delete_queue(queuename)
Ravi Tripathi
committed
def send_msg(event, room):
"""
This function is used to send messages via socketio
Input:
string event, room:
Output:
string: emit event to room
"""

Krish Moodbidri
committed
socketio.emit(event, room=room)
Mitchell Moore
committed
# def timeout_handler(signum, frame):
# print("Process timeout, there's might some issue with agents")
# socketio.emit("account error", errmsg, room=room)
# rc_util.rc_rmq.stop_consume()
# rc_util.rc_rmq.delete_queue()
Mitchell Moore
committed
Mitchell Moore
committed
@celery.task
def celery_create_account(msg, session):
"""
This function is used to create account for new users
Input:
msg: json object of all user attributes and session/room
Output:
gen_f(room): callback to check for success or failure
"""
Ravi Tripathi
committed
room = session
username = msg["username"]
email = msg["email"]
fullname = msg["fullname"]
reason = msg["reason"]
# aup= msg['aup']
queuename = rc_util.encode_name(username)
updated_by = f"{username}"
host = app_vars.app_host
print(
time.strftime("%m-%d-%Y_%H:%M:%S")
+ "\tUser "
+ username
+ " added to queue"
)
send_msg("creating account", room)
Mitchell Moore
committed
print(username)
rc_util.add_account(
username, queuename, email, fullname, reason, updated_by, host
)
print("sent account info")
print("Waiting for completion...")
rc_util.consume(
queuename, routing_key=f"complete.{queuename}", callback=gen_f(room)
)
def celery_certify_account(msg, session):
"""
This function is used to certify account for existing users
Input:
msg: json object of all user attributes and session/room
Output:
gen_f(room): callback to check for success or failure
"""
username = msg["username"]
queuename = rc_util.encode_name(username)
updated_by = f"{username}"
host = app_vars.app_host
print(
"CERTIFY : "
+ time.strftime("%m-%d-%Y_%H:%M:%S")
+ "\tUser "
+ username
+ " added to queue"
)
send_msg("certifying account", room)
rc_util.certify_account(username, queuename, "ok", "all", updated_by, host)
print("sent account info")
print("Waiting for certification...")
rc_util.consume(
queuename,
routing_key=f"certified.{queuename}",
callback=certify_gen_f(room),
)