""" This python module defines celery tasks for following fucntions: * Account creation * Account certification """ # standard imports import json import sys import time # third-party imports from celery import Celery from flask_socketio import SocketIO # local imports import app_vars sys.path.append(app_vars.rabbitmq_agents_loc) # pylint: disable=wrong-import-position import rc_util # noqa: E402 # pylint: enable=wrong-import-position broker_url = app_vars.broker_url celery = Celery(app_vars.celery_app, broker=broker_url) socketio = SocketIO(message_queue=app_vars.message_queue) timeout = 30 def gen_f(room): """ This function defines the callback function for creating account process. Inputs: room: Room/session ID for the flask session, so that communications are going to the right session. Output: RabbitMQ callback function. """ def callback(channel, method, properties, body): """ This function defines the RabbitMQ callback function executed after account creation process. Inputs: channel: channel over which the RabbitMQ communication is happening. method: Defines meta information regarding the message delivery. properties: user-defined properties on the message body: Message that is passed throughout account creation process across multiple agents. Output: Send appropriate message to the frontend, for account creation success or failure. And delete the queue. """ msg = json.loads(body) username = msg["username"] queuename = msg["queuename"] if msg["success"]: print(f"Account for {username} has been created.") send_msg("account ready", room) else: print(f"There's some issue while creating account for {username}") errmsg = msg.get("errmsg", []) for err in errmsg: print(err) socketio.emit("account error", errmsg, room=room) rc_util.rc_rmq.stop_consume() rc_util.rc_rmq.delete_queue(queuename) return callback def certify_gen_f(room): """ This function defines the callback function for certifying account process. Inputs: room: Room/session ID for the flask session, so that communications are going to the right session. Output: RabbitMQ callback function. """ def callback(channel, method, properties, body): """ This function defines the RabbitMQ callback function executed after account certification process. Inputs: channel: channel over which the RabbitMQ communication is happening method: Defines meta information regarding the message delivery. properties: user-defined properties on the message body: Message that is passed throughout account creation process across multiple agents. Output: Send appropriate message to the frontend, for account certification success or failure. And delete the queue. """ msg = json.loads(body) username = msg["username"] queuename = msg["queuename"] if msg["success"]: print(f"Account for {username} has been certified.") send_msg("certified", room) else: print( f"There's some issue while certifying account for {username}" ) errmsg = msg.get("errmsg", []) for err in errmsg: print(err) socketio.emit("certify error", errmsg, room=room) rc_util.rc_rmq.stop_consume() rc_util.rc_rmq.delete_queue(queuename) return callback def send_msg(event, room): """ This function is used to send messages via socketio Input: string event, room: Output: string: emit event to room """ socketio.emit(event, room=room) # def timeout_handler(signum, frame): # print("Process timeout, there's might some issue with agents") # socketio.emit("account error", errmsg, room=room) # rc_util.rc_rmq.stop_consume() # rc_util.rc_rmq.delete_queue() @celery.task def celery_create_account(msg, session): """ This function is used to create account for new users Input: msg: json object of all user attributes and session/room Output: gen_f(room): callback to check for success or failure """ room = session username = msg["username"] email = msg["email"] fullname = msg["fullname"] reason = msg["reason"] # aup= msg['aup'] queuename = rc_util.encode_name(username) updated_by = f"{username}" host = app_vars.app_host print( time.strftime("%m-%d-%Y_%H:%M:%S") + "\tUser " + username + " added to queue" ) send_msg("creating account", room) print(username) rc_util.add_account( username, queuename, email, fullname, reason, updated_by, host ) print("sent account info") print("Waiting for completion...") rc_util.consume( queuename, routing_key=f"complete.{queuename}", callback=gen_f(room) ) @celery.task def celery_certify_account(msg, session): """ This function is used to certify account for existing users Input: msg: json object of all user attributes and session/room Output: gen_f(room): callback to check for success or failure """ room = session username = msg["username"] queuename = rc_util.encode_name(username) updated_by = f"{username}" host = app_vars.app_host print( "CERTIFY : " + time.strftime("%m-%d-%Y_%H:%M:%S") + "\tUser " + username + " added to queue" ) send_msg("certifying account", room) print(username) rc_util.certify_account(username, queuename, "ok", "all", updated_by, host) print("sent account info") print("Waiting for certification...") rc_util.consume( queuename, routing_key=f"certified.{queuename}", callback=certify_gen_f(room), )