Skip to content
Snippets Groups Projects

Handle errors in acctmgr

2 files
+ 56
51
Compare changes
  • Side-by-side
  • Inline
Files
2
@@ -10,7 +10,7 @@ from rc_rmq import RCRMQ
@@ -10,7 +10,7 @@ from rc_rmq import RCRMQ
import rabbit_config as rcfg
import rabbit_config as rcfg
import time
import time
task = "acctmgr"
task = "acctmgr-workflow"
# Instantiate rabbitmq object
# Instantiate rabbitmq object
rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"})
rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"})
@@ -25,62 +25,67 @@ def manage_acct(ch, method, properties, body):
@@ -25,62 +25,67 @@ def manage_acct(ch, method, properties, body):
service = msg["service"]
service = msg["service"]
queuename = msg["queuename"]
queuename = msg["queuename"]
if username in tracking:
try:
current = tracking[username]
if username in tracking:
else:
current = tracking[username]
current = tracking[username] = {}
if op == 'request':
if state == 'blocked' or state == 'certification':
msg["action"] = "lock"
elif state == 'ok':
msg["action"] = "unlock"
else:
else:
print("Invalid state provided. Check the help menu.")
current = tracking[username] = {}
 
 
if op == 'request':
 
if state == 'blocked' or state == 'certification':
 
msg["action"] = "lock"
 
elif state == 'ok':
 
msg["action"] = "unlock"
 
else:
 
print("Invalid state provided. Check the help menu.")
 
 
if service == 'all':
 
current["new_jobs"] = None
 
current["expire_account"] = None
 
# send a broadcast message to all agents
 
rc_rmq.publish_msg(
 
{
 
"routing_key": f"{msg['action']}.{queuename}",
 
"msg": msg,
 
}
 
)
 
else:
 
for each_service in service:
 
current[each_service] = None
 
rc_rmq.publish_msg(
 
{
 
"routing_key": f"{each_service}.{queuename}",
 
"msg": msg
 
}
 
)
 
 
 
elif op == 'done':
 
# Check if each task/agent returned success
 
current[msg["task"]] = msg["success"]
 
 
done = True
 
 
for task in current.keys():
 
if current[task] is None:
 
done = False
 
 
if done:
 
rc_util.update_state(username, state)
if service == 'all':
# Send done msg to account_manager.py
current["new_jobs"] = None
current["expire_account"] = None
# send a broadcast message to all agents
rc_rmq.publish_msg(
{
"routing_key": f"{msg['action']}.{queuename}",
"msg": msg,
}
)
else:
for each_service in service:
current[each_service] = None
rc_rmq.publish_msg(
rc_rmq.publish_msg(
{
{
"routing_key": f"{each_service}.{queuename}",
"routing_key": f'certified.{queuename}',
"msg": msg
"msg": msg,
}
}
)
)
elif op == 'done':
# Check if each task/agent returned success
current[msg["task"]] = msg["success"]
done = True
for task in current.keys():
if current[task] is None:
done = False
if done:
rc_util.update_state(username, state)
# Send done msg to account_manager.py
ch.basic_ack(delivery_tag=method.delivery_tag)
rc_rmq.publish_msg(
{
"routing_key": f'certified.{queuename}',
"msg": msg,
}
)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception:
 
msg["errmsg"] = "Exception raised in account manager workflow agent , check the logs for stack trace"
 
logger.error("", exc_info=True)
rc_rmq.bind_queue(queue=task, routing_key='acctmgr.request.*', durable=True)
rc_rmq.bind_queue(queue=task, routing_key='acctmgr.request.*', durable=True)
rc_rmq.bind_queue(queue=task, routing_key='acctmgr.done.*', durable=True)
rc_rmq.bind_queue(queue=task, routing_key='acctmgr.done.*', durable=True)
Loading