Skip to content
Snippets Groups Projects
Unverified Commit 7aae9271 authored by Bo-Chun Chen's avatar Bo-Chun Chen Committed by GitHub
Browse files

Update code regarding rcrmq class change (#116)

* Update consume to accept new feature

* Move disconnect to callback

* Made use of exclusive queue

* Disconnect in message handlers

Since we are using exclusive queue, simply disconnect from it will
remove the queue
parent d459bcd2
No related branches found
No related tags found
1 merge request!147Merge previous default branch feat-cod-rmq into main
...@@ -34,7 +34,7 @@ if args.email == "": ...@@ -34,7 +34,7 @@ if args.email == "":
def timeout_handler(signum, frame): def timeout_handler(signum, frame):
print("Process timeout, there's might some issue with agents") print("Process timeout, there's might some issue with agents")
rc_util.rc_rmq.stop_consume() rc_util.rc_rmq.disconnect()
def callback(channel, method, properties, body): def callback(channel, method, properties, body):
...@@ -49,8 +49,7 @@ def callback(channel, method, properties, body): ...@@ -49,8 +49,7 @@ def callback(channel, method, properties, body):
for err in errmsg: for err in errmsg:
print(err) print(err)
rc_util.rc_rmq.stop_consume() rc_util.rc_rmq.disconnect()
rc_util.rc_rmq.delete_queue()
rc_util.add_account( rc_util.add_account(
...@@ -68,5 +67,8 @@ signal.setitimer(signal.ITIMER_REAL, timeout) ...@@ -68,5 +67,8 @@ signal.setitimer(signal.ITIMER_REAL, timeout)
print("Waiting for completion...") print("Waiting for completion...")
rc_util.consume( rc_util.consume(
queuename, routing_key=f"complete.{queuename}", callback=callback queuename,
routing_key=f"complete.{queuename}",
exclusive=True,
callback=callback,
) )
...@@ -47,11 +47,18 @@ def worker(ch, method, properties, body): ...@@ -47,11 +47,18 @@ def worker(ch, method, properties, body):
for err in errmsg: for err in errmsg:
print(err) print(err)
rc_rmq.stop_consume() rc_rmq.disconnect()
rc_rmq.delete_queue()
def consume(queuename, routing_key="", callback=worker, debug=False): def consume(
queuename,
routing_key="",
callback=worker,
bind=True,
durable=True,
exclusive=False,
debug=False,
):
if routing_key == "": if routing_key == "":
routing_key = "complete." + queuename routing_key = "complete." + queuename
...@@ -62,10 +69,12 @@ def consume(queuename, routing_key="", callback=worker, debug=False): ...@@ -62,10 +69,12 @@ def consume(queuename, routing_key="", callback=worker, debug=False):
{ {
"queue": queuename, "queue": queuename,
"routing_key": routing_key, "routing_key": routing_key,
"bind": bind,
"durable": durable,
"exclusive": exclusive,
"cb": callback, "cb": callback,
} }
) )
rc_rmq.disconnect()
return {"success": True} return {"success": True}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment