import vars import sys import json import time import signal from celery import Celery from flask_socketio import SocketIO sys.path.append(vars.rabbitmq_agents_loc) import rc_util broker_url = vars.broker_url celery = Celery(vars.celery_app, broker=broker_url) socketio = SocketIO(message_queue=vars.message_queue) timeout = 30 def gen_f(room): def callback(channel, method, properties, body): msg = json.loads(body) username = msg['username'] queuename = msg['queuename'] if msg['success']: print(f'Account for {username} has been created.') send_msg('account ready', room) else: print(f"There's some issue while creating account for {username}") errmsg = msg.get('errmsg', []) for err in errmsg: print(err) socketio.emit('account error', errmsg, room= room) rc_util.rc_rmq.stop_consume() rc_util.rc_rmq.delete_queue(queuename) return callback def certify_gen_f(room): def callback(channel, method, properties, body): msg = json.loads(body) username = msg['username'] queuename = msg['queuename'] if msg['success']: print(f'Account for {username} has been certified.') send_msg('certified', room) else: print(f"There's some issue while certifying account for {username}") errmsg = msg.get('errmsg', []) for err in errmsg: print(err) socketio.emit('certify error', errmsg, room= room) rc_util.rc_rmq.stop_consume() rc_util.rc_rmq.delete_queue(queuename) return callback def send_msg(event, room): socketio.emit(event, room=room) def timeout_handler(signum, frame): print("Process timeout, there's might some issue with agents") socketio.emit('account error', errmsg, room= room) rc_util.rc_rmq.stop_consume() rc_util.rc_rmq.delete_queue() @celery.task def celery_create_account(json, session): room = session username= json['username'] email= json['email'] fullname= json['fullname'] reason= json['reason'] queuename= rc_util.encode_name(username) updated_by= f'{username}' host= vars.app_host print(time.strftime("%m-%d-%Y_%H:%M:%S") + '\tUser ' + username + ' added to queue') send_msg('creating account', room) print(username) rc_util.add_account(username, queuename, email, fullname, reason, updated_by, host) print('sent account info') print('Waiting for completion...') rc_util.consume(queuename, routing_key=f'complete.{queuename}', callback=gen_f(room)) @celery.task def celery_certify_account(json, session): room = session username= json['username'] email= json['email'] fullname= json['fullname'] queuename= rc_util.encode_name(username) updated_by= f'{username}' host= vars.app_host print("CERTIFY : "+time.strftime("%m-%d-%Y_%H:%M:%S") + '\tUser ' + username + ' added to queue') send_msg('certifying account', room) print(username) rc_util.certify_account(username, queuename, 'ok', 'all', updated_by, host) print('sent account info') print('Waiting for certification...') rc_util.consume(queuename, routing_key=f'certified.{queuename}', callback=certify_gen_f(room))