Newer
Older
from jinja2 import Template
args = rc_util.get_args()
logger = rc_util.get_logger(args)
db = dataset.connect(f'sqlite:///.agent_db/user_reg.db')
table = db['users']
record = {
'uid': -1,
'gid': -1,
'email': '',
'create_account': None
'dir_verify': None,
'subscribe_mail_list': None
},
'notify': {
'notify_user': None
}
# Currently tracking users
tracking = {}
# Instantiate rabbitmq object
rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'})
def notify_admin(username, user_record):
receivers = [user_record['email'], mail_cfg.Admin_email]
message = Template(mail_cfg.UserReportHead).render(username=username, fullname=user_record['fullname'])
if user_record['reported']:
message += ' (Duplicate)'
message += f""" \n
User Creation Report for user {username}
uid: {user_record["uid"]}, gid: {user_record["gid"]}
Tasks:
'create_account': {user_record["request"]["create_account"]}
'git_commit': {user_record["verify"]["git_commit"]}
'dir_verify': {user_record["verify"]["dir_verify"]}
'subscribe_mail_list': {user_record["verify"]["subscribe_mail_list"]}
'notify_user': {user_record["notify"]["notify_user"]}
if user_record['errmsg']:
message += """
Error(s):
"""
for msg in user_record['errmsg']:
message += msg + "\n"
if args.dry_run:
logger.info(f'smtp = smtplib.SMTP({mail_cfg.Server})')
logger.info(f'smtp.sendmail({mail_cfg.Sender}, {mail_cfg.Admin_email}, message)')
logger.info(message)
else:
smtp = smtplib.SMTP(mail_cfg.Server)
smtp.sendmail(mail_cfg.Sender, receivers, message)
logger.debug(f'User report sent to: {mail_cfg.Admin_email}')
def insert_db(username, msg):
# Search username in db
record = table.find_one(username=username)
if not record:
# SQL insert
table.insert({
'username': username,
'uid': msg.get('uid', -1),
'gid': msg.get('gid', -1),
'email': msg.get('email', ''),
'reason': msg.get('reason', ''),
'fullname': msg.get('fullname', ''),
'create_account': None,
'git_commit': None,
'dir_verify': None,
'subscribe_mail_list': None,
'notify_user': None,
'last_update': datetime.now()
})
def update_db(username, data):
obj = { 'username': username, **data }
table.update(obj, ['username'])
def task_manager(ch, method, properties, body):
msg = json.loads(body)
username = method.routing_key.split('.')[1]
task_name = msg['task']
success = msg['success']
send = completed = terminated = False
if username in tracking:
current = tracking[username]
else:
user_db = table.find_one(username=username)
current = tracking[username] = copy.deepcopy(record)
current['uid'] = user_db['uid'] if user_db else msg['uid']
current['gid'] = user_db['gid'] if user_db else msg['gid']
current['email'] = user_db['email'] if user_db else msg['email']
current['reason'] = user_db['reason'] if user_db else msg['reason']
current['fullname'] = user_db['fullname'] if user_db else msg['fullname']
if user_db:
# Restore task status
current['request']['create_account'] = user_db['create_account']
current['verify']['git_commit'] = user_db['git_commit']
current['verify']['dir_verify'] = user_db['dir_verify']
current['verify']['subscribe_mail_list'] = user_db['subscribe_mail_list']
current['notify']['notify_user'] = user_db['notify_user']
current['reported'] = user_db['reported']
for t in ['git_commit', 'dir_verify', 'subscribe_mail_list']:
if user_db[t] is None:
current['waiting'].add(t)
if not current['waiting'] and user_db['notify_user'] is None:
current['waiting'].add('notify_user')
logger.debug(f'Loaded user {username} from DB')
else:
insert_db(username, msg)
logger.debug(f'Tracking user {username}')
current['last_update'] = datetime.now()
# Update Database
update_db(username, {
task_name: success,
'last_update': current['last_update']}
)
# Save error message if the task was failed
if not success:
errmsg = msg.get('errmsg', '')
if errmsg:
current['errmsg'].append(f"{task_name}: {errmsg}")
# Define message that's going to be published
message = {
'username': username,
'uid': current['uid'],
'gid': current['gid'],
'email': current['email'],
'reason': current['reason'],
'fullname': current['fullname']
}
try:
if task_name in current['request']:
current['request'][task_name] = success
routing_key = 'verify.' + username
# Terminate the process if failed
if not success:
terminated = True
routing_key = 'complete.' + username
message['success'] = False
message['errmsg'] = current['errmsg']
current['waiting'] = {'git_commit', 'dir_verify', 'subscribe_mail_list'}
elif task_name in current['verify']:
current['verify'][task_name] = success
if not current['waiting']:
send = True
current['waiting'] = {'notify_user'}
# Terminate if dir_verify failed and all agents has responsed
if send and not current['verify']['dir_verify']:
terminated = True
routing_key = 'complete.' + username
message['success'] = False
message['errmsg'] = current['errmsg']
elif task_name in current['notify']:
Eesaan Atluri
committed
current['notify'][task_name] = success
routing_key = 'complete.' + username
message['success'] = success
message['errmsg'] = current['errmsg']
# The whole creation process has completed
completed = True
except Exception as exception:
logger.error('', exc_info=True)
logger.debug(f"Trigger message '{routing_key}' sent")
logger.debug('Previous level messages acknowledged')
notify_admin(username, current)
update_db(username, {'reported': True})
tracking.pop(username)
logger.debug('Admin report sent')
# Acknowledge message
ch.basic_ack(method.delivery_tag)
def timeout_handler(signum, frame):
current_time = datetime.now()
for user in tuple(tracking):
delta = current_time - tracking[user]['last_update']
if delta.seconds > timeout:
rc_rmq.publish_msg({
'routing_key': 'complete.' + user,
'msg': {
'username': user,
'success': False,
'errmsg': ["Timeout on " + ', '.join(tracking[user]['waiting'])]
}
})
notify_admin(user, tracking[user])
update_db(user, {'reported': True})
tracking.pop(user)
# Set initial timeout timer
signal.signal(signal.SIGALRM, timeout_handler)
signal.setitimer(signal.ITIMER_REAL, timeout, timeout)
rc_rmq.start_consume({
'queue': task,
'routing_key': "confirm.*",
'cb': task_manager
})