Skip to content
Snippets Groups Projects

Feat aup

Open Bo-Chun Chen requested to merge github/fork/diedpigs/feat-aup into feat-cod-rmq
6 files
+ 79
65
Compare changes
  • Side-by-side
  • Inline
Files
6
#!/usr/bin/env python3
#!/usr/bin/env python3
import json
import json
import rc_util
import rc_util
import argparse
import signal
import uuid
import pika
import rc_util
from rc_rmq import RCRMQ
from rc_rmq import RCRMQ
import rabbit_config as rcfg
import rabbit_config as rcfg
import time
task = "acctmgr"
task = "acctmgr"
@@ -17,6 +11,7 @@ rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"})
@@ -17,6 +11,7 @@ rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"})
tracking = {}
tracking = {}
 
def manage_acct(ch, method, properties, body):
def manage_acct(ch, method, properties, body):
msg = json.loads(body)
msg = json.loads(body)
op = method.routing_key.split(".")[1]
op = method.routing_key.split(".")[1]
@@ -28,38 +23,34 @@ def manage_acct(ch, method, properties, body):
@@ -28,38 +23,34 @@ def manage_acct(ch, method, properties, body):
if username in tracking:
if username in tracking:
current = tracking[username]
current = tracking[username]
else:
else:
current = tracking[username] = {}
current = tracking[username] = {}
if op == 'request':
if op == "request":
if state == 'blocked' or state == 'certification':
if state == "blocked" or state == "certification":
msg["action"] = "lock"
msg["action"] = "lock"
elif state == 'ok':
elif state == "ok":
msg["action"] = "unlock"
msg["action"] = "unlock"
else:
else:
print("Invalid state provided. Check the help menu.")
print("Invalid state provided. Check the help menu.")
if service == 'all':
if service == "all":
current["new_jobs"] = None
current["new_jobs"] = None
current["expire_account"] = None
current["expire_account"] = None
# send a broadcast message to all agents
# send a broadcast message to all agents
rc_rmq.publish_msg(
rc_rmq.publish_msg(
{
{
"routing_key": f"{msg['action']}.{queuename}",
"routing_key": f"{msg['action']}.{queuename}",
"msg": msg,
"msg": msg,
}
}
)
)
else:
else:
for each_service in service:
for each_service in service:
current[each_service] = None
current[each_service] = None
rc_rmq.publish_msg(
rc_rmq.publish_msg(
{
{"routing_key": f"{each_service}.{queuename}", "msg": msg}
"routing_key": f"{each_service}.{queuename}",
"msg": msg
}
)
)
elif op == "done":
elif op == 'done':
# Check if each task/agent returned success
# Check if each task/agent returned success
current[msg["task"]] = msg["success"]
current[msg["task"]] = msg["success"]
@@ -71,22 +62,20 @@ def manage_acct(ch, method, properties, body):
@@ -71,22 +62,20 @@ def manage_acct(ch, method, properties, body):
if done:
if done:
rc_util.update_state(username, state)
rc_util.update_state(username, state)
# Send done msg to account_manager.py
# Send done msg to account_manager.py
rc_rmq.publish_msg(
rc_rmq.publish_msg(
{
{
"routing_key": f'certified.{queuename}',
"routing_key": f"certified.{queuename}",
"msg": msg,
"msg": msg,
}
}
)
)
ch.basic_ack(delivery_tag=method.delivery_tag)
ch.basic_ack(delivery_tag=method.delivery_tag)
rc_rmq.bind_queue(queue=task, routing_key='acctmgr.request.*', durable=True)
rc_rmq.bind_queue(queue=task, routing_key='acctmgr.done.*', durable=True)
print("Waiting for completion...")
rc_rmq.bind_queue(queue=task, routing_key="acctmgr.request.*", durable=True)
rc_rmq.start_consume(
rc_rmq.bind_queue(queue=task, routing_key="acctmgr.done.*", durable=True)
{"queue": task, "cb": manage_acct}
)
 
print("Waiting for completion...")
 
rc_rmq.start_consume({"queue": task, "cb": manage_acct})
Loading