Skip to content
Snippets Groups Projects
Commit 9f499aec authored by Bo-Chun Chen's avatar Bo-Chun Chen
Browse files

Utilize rc_util

parent 4a853f16
No related branches found
No related tags found
8 merge requests!147Merge previous default branch feat-cod-rmq into main,!85kill nginx process running under user from login node,!51Fix acct create wait,!44WIP: Update Feat task manager to match new messaging structure,!39WIP:Feat cod rmq,!38WIP: Feat cod rmq,!34WIP: Update task manager agent,!25Feature task manager
#!/usr/bin/env python
import sys
import json
import rc_util
from rc_rmq import RCRMQ
from datetime import datetime
task = 'task_manager'
args = rc_util.get_args()
logger = rc_util.get_logger(args)
record = {
'uid': -1,
'gid': -1,
......@@ -46,6 +50,8 @@ def task_manager(ch, method, properties, body):
current['uid'] = msg.get('uid', -1)
current['gid'] = msg.get('gid', -1)
current['email'] = msg.get('email', '')
logger.debug(f'Tracking user {username}')
else:
current = tracking[username]
......@@ -61,6 +67,8 @@ def task_manager(ch, method, properties, body):
routing_key = 'create.' + username
done = success
logger.debug(f'Request level task(s) done?{done}')
elif task_name in current['create']:
current['create'][task_name] = success
routing_key = 'verify.' + username
......@@ -69,6 +77,8 @@ def task_manager(ch, method, properties, body):
if status is not True:
done = False
logger.debug(f'Create level task(s) done?{done}')
elif task_name in current['verify']:
current['verify'][task_name] = success
routing_key = 'notify.' + username
......@@ -77,14 +87,17 @@ def task_manager(ch, method, properties, body):
if status is not True:
done = False
logger.debug(f'Verify level task(s) done?{done}')
elif task_name in current['notify']:
current['verify'][task_name] = success
routing_key = 'complete.' + username
done = success
except:
e = sys.exc_info()[0]
print("[{}]: Error: {}".format(task, e))
logger.debug(f'Notify level task(s) done?{done}')
except Exception as exception:
logger.error('', exc_info=True)
if done:
# acknowledge all message from last level
......@@ -92,6 +105,8 @@ def task_manager(ch, method, properties, body):
ch.basic_ack(tag)
current['delivery_tags'] = []
logger.debug('Previous level messages acknowledged')
# send trigger message
rc_rmq.publish_msg({
'routing_key': routing_key,
......@@ -103,13 +118,15 @@ def task_manager(ch, method, properties, body):
}
})
logger.debug(f"Trigger message '{routing_key}' sent")
print("Start listening to queue: {}".format(task))
logger.info(f'Start listening to queue: {task}')
rc_rmq.start_consume({
'queue': task,
'routing_key': "confirm.*",
'cb': task_manager
})
print("Disconnected")
logger.info('Disconnected')
rc_rmq.disconnect()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment