diff --git a/README.md b/README.md index fe62ac78a55aa42f8c2593cd9824e638e33910cf..df4209656d1e0616c589282240d77d7c0b40eeb3 100644 --- a/README.md +++ b/README.md @@ -1 +1,47 @@ -# rabbitmq_agents \ No newline at end of file +# rabbitmq_agents + +This repo keeps different rabbitmq agents that help in account creation on OHPC system. + +It has 2 branches ```develop``` and ```production``` , that houses agents based on where they are launched + +## Using RCRMQ class + +- First, rename `rabbit_config.py.example` to `rabbit_config.py` + +- Modify config file, at least the `Password` needs to be your own passwod + +- In your code: + +``` +# import the class +from rc_rmq import RCRMQ + +# instantiate an instance +rc_rmq = RCRMQ({'exchange': 'RegUsr'}) + +# publish a message to message queue +rc_rmq.publish_msg({ + 'routing_key': 'your_key', + 'msg': { + 'type': 'warning', + 'content': 'this is warning' + } +}) + +# to consume message from a queue +# you have to first define callback function +# with parameters: channel, method, properties, body +def callback_function(ch, method, properties, body): + msg = json.loads(body) + print("get msg: {}".format(msg['username') + + # this will stop the consumer + rc_rmq.stop_consumer() + +# start consume messagre from queue with callback function +rc_rmq.start_consume({ + 'queue': 'queue_name', + 'cb': callback_function +}) + +``` diff --git a/flask_producer.py b/flask_producer.py new file mode 100755 index 0000000000000000000000000000000000000000..dc3c44634745278fa426821d2ea0a2ecc9e61e3f --- /dev/null +++ b/flask_producer.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python +import pika +import sys +import socket +import json +import rabbit_config as rcfg + +if len(sys.argv) < 3: + sys.stderr.write("Usage: {} TAG USERNAME ".format(sys.argv[0])) + exit(1) + +node = sys.argv[1] +user_name = sys.argv[2] + +message = { + "username": user_name, + "fullname": "Full Name", + "reason": "Reason1, Reason2." +} + +hostname = socket.gethostname().split(".", 1)[0] +connect_host = rcfg.Server if hostname != rcfg.Server else "localhost" + +# Set up credentials to connect to RabbitMQ server +credentials = pika.PlainCredentials(rcfg.User, rcfg.Password) +parameters = pika.ConnectionParameters(connect_host, + rcfg.Port, + rcfg.VHost, + credentials) + +# Establish connection to RabbitMQ server +connection = pika.BlockingConnection(parameters) +channel = connection.channel() +channel.exchange_declare(exchange=rcfg.Exchange, exchange_type='direct') + +channel.basic_publish(exchange=rcfg.Exchange, routing_key=node, body=json.dumps(message)) +print(" [x] Sent {}: {}".format(node, json.dumps(message))) + +# creates a named queue +result = channel.queue_declare(queue=user_name, exclusive=False, durable=True) + +# bind the queue with exchange +channel.queue_bind(exchange=rcfg.Exchange, queue=user_name, routing_key=user_name) + +def work(ch, method, properties, body): + msg = json.loads(body) + print("Received message from {}: \n\t{}".format(method.routing_key, msg)) + #queue_unbind(queue, exchange=None, routing_key=None, arguments=None, callback=None) + channel.queue_delete(method.routing_key) + +# ingest messages, and assume delivered via auto_ack +channel.basic_consume(queue=sys.argv[2], on_message_callback=work, auto_ack=True) +print("Subscribing to queue: {}".format(sys.argv[2])) + +# initiate message ingestion +try: + channel.start_consuming() +except KeyboardInterrupt: + print("Disconnecting from broker.") + channel.stop_consuming() +connection.close() diff --git a/ohpc_account_create.py b/ohpc_account_create.py new file mode 100644 index 0000000000000000000000000000000000000000..ee69364cfc859143ca6a560ba43469851b1f8d9c --- /dev/null +++ b/ohpc_account_create.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python +import pika # python client +import sys +import rabbit_config as rcfg +import socket +import subprocess +import time +import json +from pwd import getpwnam + +hostname = socket.gethostname().split(".", 1)[0] +connect_host = rcfg.Server if hostname != rcfg.Server else "localhost" +queue_name = "ohpc_account_create" +duration = 2 + +# Set up credentials to connect to RabbitMQ server +credentials = pika.PlainCredentials(rcfg.User, rcfg.Password) +parameters = pika.ConnectionParameters(connect_host, + rcfg.Port, + rcfg.VHost, + credentials) + +# Establish connection to RabbitMQ server +connection = pika.BlockingConnection(parameters) +channel = connection.channel() + +print("connection established. Listening for messages:") + +# create exchange to pass messages +channel.exchange_declare(exchange=rcfg.Exchange, exchange_type='direct') + +# creates a random name for the newly generated queue +result = channel.queue_declare(queue=queue_name, exclusive=False) + +channel.queue_bind(exchange=rcfg.Exchange, queue=queue_name, routing_key=queue_name) + +def ohpc_account_create(ch, method, properties, body): + msg = json.loads(body) + print("Message received {}".format(msg)) + username = msg['username'] + try: + subprocess.call(["sudo", "useradd", username]) + print("User {} has been added to {}".format(username, hostname)) + except: + print("Failed to create user") + + channel.basic_ack(delivery_tag=method.delivery_tag) + msg['uid'] = getpwnam(username).pw_uid + msg['gid'] = getpwnam(username).pw_gid + channel.basic_publish(exchange=rcfg.Exchange, routing_key='ood_account_create', body=json.dumps(msg)) + + +# ingest messages +channel.basic_consume(queue=queue_name, on_message_callback=ohpc_account_create) + +# initiate message ingestion +try: + channel.start_consuming() +except KeyboardInterrupt: + print("Disconnecting from broker.") + channel.stop_consuming() + +connection.close() diff --git a/ood_account_create.py b/ood_account_create.py new file mode 100644 index 0000000000000000000000000000000000000000..2abaa23429cd41e4565a25e2055ec18920713328 --- /dev/null +++ b/ood_account_create.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python +import pika # python client +import sys +import rabbit_config as rcfg +import socket +import subprocess +import time +import json + +hostname = socket.gethostname().split(".", 1)[0] +connect_host = rcfg.Server if hostname != rcfg.Server else "localhost" +queue_name = "ood_account_create" +duration = 2 + +# Set up credentials to connect to RabbitMQ server +credentials = pika.PlainCredentials(rcfg.User, rcfg.Password) +parameters = pika.ConnectionParameters(connect_host, + rcfg.Port, + rcfg.VHost, + credentials) + +# Establish connection to RabbitMQ server +connection = pika.BlockingConnection(parameters) +channel = connection.channel() + +print("connection established. Listening for messages:") + +# create exchange to pass messages +channel.exchange_declare(exchange=rcfg.Exchange, exchange_type='direct') + +# creates a random name for the newly generated queue +result = channel.queue_declare(queue=queue_name, exclusive=False) + +channel.queue_bind(exchange=rcfg.Exchange, queue=queue_name, routing_key=queue_name) + +def ood_account_create(ch, method, properties, body): + msg = json.loads(body) + print("Message received {}".format(msg)) + username = msg['username'] + user_uid = str(msg['uid']) + user_gid = str(msg['gid']) + try: + subprocess.call(["sudo", "groupadd", "-r", "-g", user_gid, username]) + subprocess.call(["sudo", "useradd", "-u", user_uid, "-g", user_gid, username]) + print("User {} has been added to {}".format(username, hostname)) + except: + print("Failed to create user") + + channel.basic_ack(delivery_tag=method.delivery_tag) + + channel.basic_publish(exchange=rcfg.Exchange, routing_key='slurm_add_account', body=json.dumps(msg)) + + +# ingest messages +channel.basic_consume(queue=queue_name, on_message_callback=ood_account_create) + +# initiate message ingestion +try: + channel.start_consuming() +except KeyboardInterrupt: + print("Disconnecting from broker.") + channel.stop_consuming() + +connection.close() diff --git a/rabbit_config.py.example b/rabbit_config.py.example new file mode 100644 index 0000000000000000000000000000000000000000..5643bb163e71c667dffdb6890c854b84d3810596 --- /dev/null +++ b/rabbit_config.py.example @@ -0,0 +1,6 @@ +Exchange = 'RegUsr' +User = 'reggie' +Password = 'CHANGE_IT_TO_YOUR_OWN_PASSWORD' +VHost = '/' +Server = 'ohpc' +Port = 5672 diff --git a/rc_rmq.py b/rc_rmq.py new file mode 100644 index 0000000000000000000000000000000000000000..f3102d496b09e7ad7a325eaf53c1672d96856288 --- /dev/null +++ b/rc_rmq.py @@ -0,0 +1,90 @@ +import json +import pika +import socket +import rabbit_config as rcfg + +class RCRMQ(object): + + USER = 'guest' + PASSWORD = 'guest' + HOST = 'localhost' + PORT = 5672 + VHOST = '/' + EXCHANGE = '' + EXCHANGE_TYPE = 'direct' + QUEUE = None + DURABLE = True + ROUTING_KEY = None + + def __init__(self, config=None): + if config: + if 'exchange' in config: + self.EXCHANGE = config['exchange'] + if 'exchange_type' in config: + self.EXCHANGE_TYPE = config['exchange_type'] + + hostname = socket.gethostname().split(".", 1)[0] + + self.HOST = rcfg.Server if hostname != rcfg.Server else "localhost" + self.USER = rcfg.User + self.PASSWORD = rcfg.Password + self.VHOST = rcfg.VHost + self.PORT = rcfg.Port + self._parameters = pika.ConnectionParameters( + self.HOST, + self.PORT, + self.VHOST, + pika.PlainCredentials(self.USER, self.PASSWORD)) + + def connect(self): + self._connection = pika.BlockingConnection(self._parameters) + self._channel = self._connection.channel() + self._channel.exchange_declare( + exchange=self.EXCHANGE, + exchange_type=self.EXCHANGE_TYPE, + durable=True) + + if self.QUEUE is not None: + self._channel.queue_declare(queue=self.QUEUE, durable=self.DURABLE) + self._channel.queue_bind(exchange=self.EXCHANGE, + queue=self.QUEUE, + routing_key=self.ROUTING_KEY) + + def disconnect(self): + self._channel.close() + self._connection.close() + + def delete_queue(self): + self._channel.queue_delete(self.QUEUE) + + def publish_msg(self, obj): + if 'routing_key' in obj: + self.ROUTING_KEY = obj['routing_key'] + + self.connect() + + self._channel.basic_publish(exchange=self.EXCHANGE, + routing_key=self.ROUTING_KEY, + body=json.dumps(obj['msg'])) + + self.disconnect() + + def start_consume(self, obj): + if 'queue' in obj: + self.QUEUE = obj['queue'] + self.ROUTING_KEY = obj['routing_key'] if 'routing_key' in obj else self.QUEUE + if 'durable' in obj: + self.DURABLE = obj['durable'] + + self.connect() + + self._consumer_tag = self._channel.basic_consume(self.QUEUE,obj['cb']) + try: + self._channel.start_consuming() + except KeyboardInterrupt: + self._channel.stop_consuming() + + self.disconnect() + + def stop_consume(self): + self._channel.basic_cancel(self._consumer_tag) diff --git a/rc_util.py b/rc_util.py new file mode 100644 index 0000000000000000000000000000000000000000..63acbbe37dcf2f2a1700ac98793a52a010d8e34e --- /dev/null +++ b/rc_util.py @@ -0,0 +1,43 @@ +from rc_rmq import RCRMQ +import json + +rc_rmq = RCRMQ({'exchange': 'Register'}) +confirm_rmq = RCRMQ({'exchange': 'Confirm'}) +tasks = {'ohpc_account': False, 'ohpc_homedir': False, 'ood_account': False, 'slurm_account': False} + +def add_account(username, full='', reason=''): + rc_rmq.publish_msg({ + 'routing_key': 'ohpc_account', + 'msg': { + "username": username, + "fullname": full, + "reason": reason + } + }) + +def worker(ch, method, properties, body): + msg = json.loads(body) + task = msg['task'] + print("get msg: {}".format(task)) + tasks[task] = msg['success'] + + # Check if all tasks are done + done = True + for key, status in tasks.items(): + if not status: + print("{} is not done yet.".format(key)) + done = False + if done: + confirm_rmq.stop_consume() + confirm_rmq.delete_queue() + +def consume(username, callback, debug=False): + if debug: + sleep(5) + else: + confirm_rmq.start_consume({ + 'queue': username, + 'cb': callback + }) + + return { 'success' : True } diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..becc2ce1d48131ef086f528906361c6b94cd5487 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +pika==1.1.0 diff --git a/slurm_agent.py b/slurm_agent.py new file mode 100755 index 0000000000000000000000000000000000000000..fde0949db0c57b42f16c9c04632140fc599aa2dc --- /dev/null +++ b/slurm_agent.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python +import pika # python client +import sys +import rabbit_config as rcfg +import socket +import subprocess +import time +import json + +hostname = socket.gethostname().split(".", 1)[0] +connect_host = rcfg.Server if hostname != rcfg.Server else "localhost" +queue_name = "slurm_add_account" +duration = 2 + +# Set up credentials to connect to RabbitMQ server +credentials = pika.PlainCredentials(rcfg.User, rcfg.Password) +parameters = pika.ConnectionParameters(connect_host, + rcfg.Port, + rcfg.VHost, + credentials) + +# Establish connection to RabbitMQ server +connection = pika.BlockingConnection(parameters) +channel = connection.channel() + +print("connection established. Listening for messages:") + +# create exchange to pass messages +channel.exchange_declare(exchange=rcfg.Exchange, exchange_type='direct') + +# creates a random name for the newly generated queue +result = channel.queue_declare(queue=queue_name, exclusive=False) + +channel.queue_bind(exchange=rcfg.Exchange, queue=queue_name, routing_key=queue_name) + +def slurm_account_create(ch, method, properties, body): + msg = json.loads(body) + print("Message received {}".format(msg)) + username = msg['username'] + try: + subprocess.call(["sudo", "sacctmgr", "add", "account", username, "-i", "Descripition: Add user"]) + subprocess.call(["sudo", "sacctmgr", "add", "user", username, "account="+username, "-i"]) + print("SLURM account for user {} has been added".format(username)) + except: + print("Failed to create user") + + channel.basic_ack(delivery_tag=method.delivery_tag) + + channel.basic_publish(exchange=rcfg.Exchange, routing_key=username, body=json.dumps(msg)) + + +# ingest messages +channel.basic_consume(queue=queue_name, on_message_callback=slurm_account_create) + +# initiate message ingestion +try: + channel.start_consuming() +except KeyboardInterrupt: + print("Disconnecting from broker.") + channel.stop_consuming() + +connection.close() + + +