Skip to content
Snippets Groups Projects
Commit aaf7b81e authored by Eesaan Atluri's avatar Eesaan Atluri
Browse files

Add exception handling to acctmr workflow agent

parent dec2f974
No related branches found
No related tags found
2 merge requests!147Merge previous default branch feat-cod-rmq into main,!129Add exception handling to acctmr workflow agent
......@@ -25,64 +25,69 @@ def manage_acct(ch, method, properties, body):
service = msg["service"]
queuename = msg["queuename"]
if username in tracking:
current = tracking[username]
else:
current = tracking[username] = {}
if op == 'request':
if state == 'hold' or state == 'certification':
msg["action"] = "lock"
elif state == 'ok':
msg["action"] = "unlock"
try:
if username in tracking:
current = tracking[username]
else:
print("Invalid state provided. Check the help menu.")
if service == 'all':
current["new_jobs"] = None
current["expire_account"] = None
# send a broadcast message to all agents
rc_rmq.publish_msg(
{
"routing_key": f"{msg['action']}.{queuename}",
"msg": msg,
}
)
else:
for each_service in service:
current[each_service] = None
current = tracking[username] = {}
if op == 'request':
if state == 'blocked' or state == 'certification':
msg["action"] = "lock"
elif state == 'ok':
msg["action"] = "unlock"
else:
print("Invalid state provided. Check the help menu.")
if service == 'all':
current["new_jobs"] = None
current["expire_account"] = None
# send a broadcast message to all agents
rc_rmq.publish_msg(
{
"routing_key": f"{each_service}.{queuename}",
"msg": msg
"routing_key": f"{msg['action']}.{queuename}",
"msg": msg,
}
)
else:
for each_service in service:
current[each_service] = None
rc_rmq.publish_msg(
{
"routing_key": f"{each_service}.{queuename}",
"msg": msg
}
)
elif op == 'done':
# Check if each task/agent returned success
current[msg["task"]] = msg["success"]
done = True
for task in current.keys():
if current[task] is None:
done = False
if done:
rc_util.update_state(
username, state, msg.get("updated_by"), msg.get("host")
)
# Send done msg to account_manager.py
rc_rmq.publish_msg(
{
"routing_key": f'certified.{queuename}',
"msg": msg,
}
)
elif op == 'done':
# Check if each task/agent returned success
current[msg["task"]] = msg["success"]
done = True
for task in current.keys():
if current[task] is None:
done = False
if done:
rc_util.update_state(
username, state, msg.get("updated_by"), msg.get("host")
)
# Send done msg to account_manager.py
rc_rmq.publish_msg(
{
"routing_key": f'certified.{queuename}',
"msg": msg,
}
)
ch.basic_ack(delivery_tag=method.delivery_tag)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception:
msg["errmsg"] = "Exception raised in account manager workflow agent , check the logs for stack trace"
logger.error("", exc_info=True)
rc_rmq.bind_queue(queue=task, routing_key='acctmgr.request.*', durable=True)
rc_rmq.bind_queue(queue=task, routing_key='acctmgr.done.*', durable=True)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment