diff --git a/README.md b/README.md index df4209656d1e0616c589282240d77d7c0b40eeb3..5d7b9155b49e6906eaa6451a4660fce3536a5c73 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,10 @@ def callback_function(ch, method, properties, body): # start consume messagre from queue with callback function rc_rmq.start_consume({ 'queue': 'queue_name', + 'routing_key: 'your_key', 'cb': callback_function }) +# don't forget to close connection +rc_rmq.disconnect() ``` diff --git a/agent_template.py b/agent_template.py new file mode 100644 index 0000000000000000000000000000000000000000..c6e722338bad04999621bd7dfd47a41d2b962d28 --- /dev/null +++ b/agent_template.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python +import sys +import json +from rc_rmq import RCRMQ + +task = 'task_name' + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) + +# Define your callback function +def on_message(ch, method, properties, body): + + # Retrieve routing key + routing_key = method.routing_key + + # Retrieve message + msg = json.loads(body) + + # Do Something + print('[{}]: Callback called.'.format(task)) + + # Acknowledge message + ch.basic_ack(delivery_tag=method.delivery_tag) + + +print("Start listening to queue: {}".format(task)) +rc_rmq.start_consume({ + 'queue': task, # Define your Queue name + 'routing_key': "#", # Define your routing key + 'cb': on_message # Pass in callback function you just define +}) diff --git a/create_account.py b/create_account.py new file mode 100755 index 0000000000000000000000000000000000000000..b316cd0bca978d77f27a51ea067a3634c675049b --- /dev/null +++ b/create_account.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python +import sys +import rc_util + +if len(sys.argv) < 2: + print("Usage: {} USERNAME [EMAIL] [FULL_NAME] [REASON]".format(sys.argv[0]), file=sys.stderr) + exit(1) + +domain = 'uab.edu' +user_name = sys.argv[1] +email = sys.argv[2] if len(sys.argv) >= 3 else '' +full_name = sys.argv[3] if len(sys.argv) >= 4 else '' +reason = sys.argv[4] if len(sys.argv) >= 5 else '' + +if email == '': + if '@' in user_name: + email = user_name + else: + email = user_name + '@' + domain + +rc_util.add_account(user_name, email=email, full=full_name, reason=reason) +print("Account requested for user: {}".format(user_name)) + +print("Waiting for confirmation...") +rc_util.consume(user_name) diff --git a/ohpc_account_create.py b/ohpc_account_create.py index ee69364cfc859143ca6a560ba43469851b1f8d9c..4504702f8d8f8d611b8db1ab4b52811b08ec6ade 100644 --- a/ohpc_account_create.py +++ b/ohpc_account_create.py @@ -1,63 +1,51 @@ #!/usr/bin/env python -import pika # python client import sys -import rabbit_config as rcfg -import socket -import subprocess -import time import json +import subprocess from pwd import getpwnam +from rc_rmq import RCRMQ -hostname = socket.gethostname().split(".", 1)[0] -connect_host = rcfg.Server if hostname != rcfg.Server else "localhost" -queue_name = "ohpc_account_create" -duration = 2 - -# Set up credentials to connect to RabbitMQ server -credentials = pika.PlainCredentials(rcfg.User, rcfg.Password) -parameters = pika.ConnectionParameters(connect_host, - rcfg.Port, - rcfg.VHost, - credentials) - -# Establish connection to RabbitMQ server -connection = pika.BlockingConnection(parameters) -channel = connection.channel() - -print("connection established. Listening for messages:") +task = "ohpc_account" -# create exchange to pass messages -channel.exchange_declare(exchange=rcfg.Exchange, exchange_type='direct') - -# creates a random name for the newly generated queue -result = channel.queue_declare(queue=queue_name, exclusive=False) - -channel.queue_bind(exchange=rcfg.Exchange, queue=queue_name, routing_key=queue_name) +# Instantiate rabbitmq object +rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) def ohpc_account_create(ch, method, properties, body): msg = json.loads(body) print("Message received {}".format(msg)) username = msg['username'] + success = False try: subprocess.call(["sudo", "useradd", username]) - print("User {} has been added to {}".format(username, hostname)) + print("[{}]: User {} has been added".format(task, username)) + success = True except: - print("Failed to create user") + e = sys.exc_info()[0] + print("[{}]: Error: {}".format(task, e)) - channel.basic_ack(delivery_tag=method.delivery_tag) + ch.basic_ack(delivery_tag=method.delivery_tag) msg['uid'] = getpwnam(username).pw_uid msg['gid'] = getpwnam(username).pw_gid - channel.basic_publish(exchange=rcfg.Exchange, routing_key='ood_account_create', body=json.dumps(msg)) - - -# ingest messages -channel.basic_consume(queue=queue_name, on_message_callback=ohpc_account_create) - -# initiate message ingestion -try: - channel.start_consuming() -except KeyboardInterrupt: - print("Disconnecting from broker.") - channel.stop_consuming() -connection.close() + # send confirm message + rc_rmq.publish_msg({ + 'routing_key': 'confirm.' + username, + 'msg': { + 'task': task, + 'success': success + } + }) + + if success: + # send create message to other agent + rc_rmq.publish_msg({ + 'routing_key': 'create.' + username, + 'msg': msg + }) + +print("Start Listening to queue: {}".format(task)) +rc_rmq.start_consume({ + 'queue': task, + 'routing_key': 'request.*', + 'cb': ohpc_account_create +}) diff --git a/ood_account_create.py b/ood_account_create.py index 2abaa23429cd41e4565a25e2055ec18920713328..1ff9cc9fd8a61df98ae1473c20e01480f6e4aa90 100644 --- a/ood_account_create.py +++ b/ood_account_create.py @@ -1,37 +1,13 @@ #!/usr/bin/env python -import pika # python client import sys -import rabbit_config as rcfg -import socket -import subprocess -import time import json +import subprocess +from rc_rmq import RCRMQ -hostname = socket.gethostname().split(".", 1)[0] -connect_host = rcfg.Server if hostname != rcfg.Server else "localhost" -queue_name = "ood_account_create" -duration = 2 - -# Set up credentials to connect to RabbitMQ server -credentials = pika.PlainCredentials(rcfg.User, rcfg.Password) -parameters = pika.ConnectionParameters(connect_host, - rcfg.Port, - rcfg.VHost, - credentials) - -# Establish connection to RabbitMQ server -connection = pika.BlockingConnection(parameters) -channel = connection.channel() - -print("connection established. Listening for messages:") - -# create exchange to pass messages -channel.exchange_declare(exchange=rcfg.Exchange, exchange_type='direct') - -# creates a random name for the newly generated queue -result = channel.queue_declare(queue=queue_name, exclusive=False) +task = 'ood_account' -channel.queue_bind(exchange=rcfg.Exchange, queue=queue_name, routing_key=queue_name) +# Instantiate rabbitmq object +rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) def ood_account_create(ch, method, properties, body): msg = json.loads(body) @@ -39,26 +15,30 @@ def ood_account_create(ch, method, properties, body): username = msg['username'] user_uid = str(msg['uid']) user_gid = str(msg['gid']) + success = False try: subprocess.call(["sudo", "groupadd", "-r", "-g", user_gid, username]) subprocess.call(["sudo", "useradd", "-u", user_uid, "-g", user_gid, username]) - print("User {} has been added to {}".format(username, hostname)) + print("[{}]: User {} has been added".format(task, username)) + success = True except: - print("Failed to create user") - - channel.basic_ack(delivery_tag=method.delivery_tag) - - channel.basic_publish(exchange=rcfg.Exchange, routing_key='slurm_add_account', body=json.dumps(msg)) - - -# ingest messages -channel.basic_consume(queue=queue_name, on_message_callback=ood_account_create) - -# initiate message ingestion -try: - channel.start_consuming() -except KeyboardInterrupt: - print("Disconnecting from broker.") - channel.stop_consuming() - -connection.close() + e = sys.exc_info()[0] + print("[{}]: Error: {}".format(task, e)) + + ch.basic_ack(delivery_tag=method.delivery_tag) + + # send confirm message + rc_rmq.publish_msg({ + 'routing_key': 'confirm.' + username, + 'msg': { + 'task': task, + 'success': success + } + }) + +print("Start listening to queue: {}".format(task)) +rc_rmq.start_consume({ + 'queue': task, + 'routing_key': "create.*", + 'cb': ood_account_create +}) diff --git a/rc_rmq.py b/rc_rmq.py index f3102d496b09e7ad7a325eaf53c1672d96856288..37afa3f44f872454a3b96fccf2d5a6a538fe01cf 100644 --- a/rc_rmq.py +++ b/rc_rmq.py @@ -15,8 +15,9 @@ class RCRMQ(object): QUEUE = None DURABLE = True ROUTING_KEY = None + DEBUG = False - def __init__(self, config=None): + def __init__(self, config=None, debug=False): if config: if 'exchange' in config: self.EXCHANGE = config['exchange'] @@ -30,6 +31,23 @@ class RCRMQ(object): self.PASSWORD = rcfg.Password self.VHOST = rcfg.VHost self.PORT = rcfg.Port + self.DEBUG = debug + + if self.DEBUG: + print(""" + Created RabbitMQ instance with: + Exchange name: {}, + Exchange type: {}, + Host: {}, + User: {}, + VHost: {}, + Port: {} + """.format(self.EXCHANGE, self.EXCHANGE_TYPE, self.HOST, self.USER, self.VHOST, self.PORT)) + + self._consumer_tag = None + self._connection = None + self._consuming = False + self._channel = None self._parameters = pika.ConnectionParameters( self.HOST, self.PORT, @@ -37,22 +55,26 @@ class RCRMQ(object): pika.PlainCredentials(self.USER, self.PASSWORD)) def connect(self): + if self.DEBUG: + print("Connecting...\n" + "Exchange: " + self.EXCHANGE + " Exchange type: " + self.EXCHANGE_TYPE) + self._connection = pika.BlockingConnection(self._parameters) self._channel = self._connection.channel() self._channel.exchange_declare( - exchange=self.EXCHANGE, + exchange=self.EXCHANGE, exchange_type=self.EXCHANGE_TYPE, durable=True) - if self.QUEUE is not None: - self._channel.queue_declare(queue=self.QUEUE, durable=self.DURABLE) - self._channel.queue_bind(exchange=self.EXCHANGE, - queue=self.QUEUE, - routing_key=self.ROUTING_KEY) + def bind_queue(self): + self._channel.queue_declare(queue=self.QUEUE, durable=self.DURABLE) + self._channel.queue_bind(exchange=self.EXCHANGE, + queue=self.QUEUE, + routing_key=self.ROUTING_KEY) def disconnect(self): self._channel.close() self._connection.close() + self._connection = None def delete_queue(self): self._channel.queue_delete(self.QUEUE) @@ -61,14 +83,13 @@ class RCRMQ(object): if 'routing_key' in obj: self.ROUTING_KEY = obj['routing_key'] - self.connect() + if self._connection is None: + self.connect() self._channel.basic_publish(exchange=self.EXCHANGE, routing_key=self.ROUTING_KEY, body=json.dumps(obj['msg'])) - self.disconnect() - def start_consume(self, obj): if 'queue' in obj: self.QUEUE = obj['queue'] @@ -76,15 +97,20 @@ class RCRMQ(object): if 'durable' in obj: self.DURABLE = obj['durable'] - self.connect() + if self.DEBUG: + print("Queue: " + self.QUEUE + "\nRouting_key: " + self.ROUTING_KEY) + + if self._connection is None: + self.connect() + + self.bind_queue() self._consumer_tag = self._channel.basic_consume(self.QUEUE,obj['cb']) + self._consuming = True try: self._channel.start_consuming() except KeyboardInterrupt: self._channel.stop_consuming() - self.disconnect() - def stop_consume(self): self._channel.basic_cancel(self._consumer_tag) diff --git a/rc_util.py b/rc_util.py index 63acbbe37dcf2f2a1700ac98793a52a010d8e34e..0e7c4c1e6ec7d0cba4cc7370cf35370efa7f7354 100644 --- a/rc_util.py +++ b/rc_util.py @@ -1,43 +1,72 @@ +import logging +import argparse from rc_rmq import RCRMQ import json -rc_rmq = RCRMQ({'exchange': 'Register'}) -confirm_rmq = RCRMQ({'exchange': 'Confirm'}) -tasks = {'ohpc_account': False, 'ohpc_homedir': False, 'ood_account': False, 'slurm_account': False} +rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) +tasks = {'ohpc_account': None, 'ood_account': None, 'slurm_account': None} +logger_fmt = '%(asctime)s [%(module)s] - %(message)s' -def add_account(username, full='', reason=''): +def add_account(username, email, full='', reason=''): rc_rmq.publish_msg({ - 'routing_key': 'ohpc_account', + 'routing_key': 'request.' + username, 'msg': { "username": username, + "email": email, "fullname": full, "reason": reason } }) + rc_rmq.disconnect() def worker(ch, method, properties, body): msg = json.loads(body) task = msg['task'] - print("get msg: {}".format(task)) tasks[task] = msg['success'] + print("Got msg: {}({})".format(msg['task'], msg['success'])) # Check if all tasks are done done = True for key, status in tasks.items(): if not status: print("{} is not done yet.".format(key)) - done = False + done = False if done: - confirm_rmq.stop_consume() - confirm_rmq.delete_queue() - -def consume(username, callback, debug=False): + rc_rmq.stop_consume() + rc_rmq.delete_queue() + +def consume(username, callback=worker, debug=False): if debug: sleep(5) else: - confirm_rmq.start_consume({ + rc_rmq.start_consume({ 'queue': username, + 'routing_key': 'confirm.' + username, 'cb': callback }) - + rc_rmq.disconnect() + return { 'success' : True } + +def get_args(): + # Parse arguments + parser = argparse.ArgumentParser() + parser.add_argument('-v', '--verbose', action='store_true', help='verbose output') + parser.add_argument('-n', '--dry-run', action='store_true', help='enable dry run mode') + return parser.parse_args() + +def get_logger(args=None): + if args is None: + args = get_args() + + logger_lvl = logging.WARNING + + if args.verbose: + logger_lvl = logging.DEBUG + + if args.dry_run: + logger_lvl = logging.INFO + + logging.basicConfig(format=logger_fmt, level=logger_lvl) + return logging.getLogger(__name__) + diff --git a/slurm_agent.py b/slurm_agent.py index fde0949db0c57b42f16c9c04632140fc599aa2dc..e07af9bd233b952aa385848a4504859f31a22177 100755 --- a/slurm_agent.py +++ b/slurm_agent.py @@ -1,65 +1,42 @@ #!/usr/bin/env python -import pika # python client import sys -import rabbit_config as rcfg -import socket -import subprocess -import time import json +import subprocess +from rc_rmq import RCRMQ -hostname = socket.gethostname().split(".", 1)[0] -connect_host = rcfg.Server if hostname != rcfg.Server else "localhost" -queue_name = "slurm_add_account" -duration = 2 - -# Set up credentials to connect to RabbitMQ server -credentials = pika.PlainCredentials(rcfg.User, rcfg.Password) -parameters = pika.ConnectionParameters(connect_host, - rcfg.Port, - rcfg.VHost, - credentials) - -# Establish connection to RabbitMQ server -connection = pika.BlockingConnection(parameters) -channel = connection.channel() - -print("connection established. Listening for messages:") - -# create exchange to pass messages -channel.exchange_declare(exchange=rcfg.Exchange, exchange_type='direct') - -# creates a random name for the newly generated queue -result = channel.queue_declare(queue=queue_name, exclusive=False) +task = 'slurm_account' -channel.queue_bind(exchange=rcfg.Exchange, queue=queue_name, routing_key=queue_name) +# Instantiate rabbitmq object +rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) def slurm_account_create(ch, method, properties, body): msg = json.loads(body) print("Message received {}".format(msg)) username = msg['username'] + success = False try: subprocess.call(["sudo", "sacctmgr", "add", "account", username, "-i", "Descripition: Add user"]) - subprocess.call(["sudo", "sacctmgr", "add", "user", username, "account="+username, "-i"]) + subprocess.call(["sudo", "sacctmgr", "add", "user", username, "account=" + username, "-i"]) print("SLURM account for user {} has been added".format(username)) + success = True except: - print("Failed to create user") - - channel.basic_ack(delivery_tag=method.delivery_tag) - - channel.basic_publish(exchange=rcfg.Exchange, routing_key=username, body=json.dumps(msg)) - - -# ingest messages -channel.basic_consume(queue=queue_name, on_message_callback=slurm_account_create) - -# initiate message ingestion -try: - channel.start_consuming() -except KeyboardInterrupt: - print("Disconnecting from broker.") - channel.stop_consuming() - -connection.close() - - - + e = sys.exc_info()[0] + print("[{}]: Error: {}".format(task, e)) + + ch.basic_ack(delivery_tag=method.delivery_tag) + + # send confirm message + rc_rmq.publish_msg({ + 'routing_key': 'confirm.' + username, + 'msg': { + 'task': task, + 'success': success + } + }) + +print("Start listening to queue: {}".format(task)) +rc_rmq.start_consume({ + 'queue': task, + 'routing_key': "create.*", + 'cb': slurm_account_create +})