From 7ec758f52718a0295a8d7b772453f65efc89c358 Mon Sep 17 00:00:00 2001 From: "Bo-Chun Louis Chen(VM)" <louistw@uab.edu> Date: Mon, 16 Mar 2020 21:45:02 +0000 Subject: [PATCH] Rewrite slurm account agent --- slurm_agent.py | 66 ++++++++++++++++++-------------------------------- 1 file changed, 23 insertions(+), 43 deletions(-) diff --git a/slurm_agent.py b/slurm_agent.py index fde0949..acbe891 100755 --- a/slurm_agent.py +++ b/slurm_agent.py @@ -6,60 +6,40 @@ import socket import subprocess import time import json +from rc_rmq import RCRMQ -hostname = socket.gethostname().split(".", 1)[0] -connect_host = rcfg.Server if hostname != rcfg.Server else "localhost" -queue_name = "slurm_add_account" -duration = 2 +task = 'slurm_account' -# Set up credentials to connect to RabbitMQ server -credentials = pika.PlainCredentials(rcfg.User, rcfg.Password) -parameters = pika.ConnectionParameters(connect_host, - rcfg.Port, - rcfg.VHost, - credentials) - -# Establish connection to RabbitMQ server -connection = pika.BlockingConnection(parameters) -channel = connection.channel() - -print("connection established. Listening for messages:") - -# create exchange to pass messages -channel.exchange_declare(exchange=rcfg.Exchange, exchange_type='direct') - -# creates a random name for the newly generated queue -result = channel.queue_declare(queue=queue_name, exclusive=False) - -channel.queue_bind(exchange=rcfg.Exchange, queue=queue_name, routing_key=queue_name) +# Instantiate rabbitmq object +rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) def slurm_account_create(ch, method, properties, body): msg = json.loads(body) print("Message received {}".format(msg)) username = msg['username'] + success = False try: subprocess.call(["sudo", "sacctmgr", "add", "account", username, "-i", "Descripition: Add user"]) subprocess.call(["sudo", "sacctmgr", "add", "user", username, "account="+username, "-i"]) print("SLURM account for user {} has been added".format(username)) + success = True except: print("Failed to create user") - channel.basic_ack(delivery_tag=method.delivery_tag) - - channel.basic_publish(exchange=rcfg.Exchange, routing_key=username, body=json.dumps(msg)) - - -# ingest messages -channel.basic_consume(queue=queue_name, on_message_callback=slurm_account_create) - -# initiate message ingestion -try: - channel.start_consuming() -except KeyboardInterrupt: - print("Disconnecting from broker.") - channel.stop_consuming() - -connection.close() - - - + ch.basic_ack(delivery_tag=method.delivery_tag) + + # send confirm message + rc_rmq.publish_msg({ + 'routing_key': 'confirm.' + username, + 'msg': { + 'task': task, + 'success': success + } + }) + +print("Start listening to queue: {}".format(task)) +rc_rmq.start_consume({ + 'queue': task, + 'routing_key': "create.*", + 'cb': slurm_account_create +}) -- GitLab