diff --git a/README.md b/README.md index 45d4efbd67f980604363350fa6e696aca5a6b674..df4209656d1e0616c589282240d77d7c0b40eeb3 100644 --- a/README.md +++ b/README.md @@ -3,3 +3,45 @@ This repo keeps different rabbitmq agents that help in account creation on OHPC system. It has 2 branches ```develop``` and ```production``` , that houses agents based on where they are launched + +## Using RCRMQ class + +- First, rename `rabbit_config.py.example` to `rabbit_config.py` + +- Modify config file, at least the `Password` needs to be your own passwod + +- In your code: + +``` +# import the class +from rc_rmq import RCRMQ + +# instantiate an instance +rc_rmq = RCRMQ({'exchange': 'RegUsr'}) + +# publish a message to message queue +rc_rmq.publish_msg({ + 'routing_key': 'your_key', + 'msg': { + 'type': 'warning', + 'content': 'this is warning' + } +}) + +# to consume message from a queue +# you have to first define callback function +# with parameters: channel, method, properties, body +def callback_function(ch, method, properties, body): + msg = json.loads(body) + print("get msg: {}".format(msg['username') + + # this will stop the consumer + rc_rmq.stop_consumer() + +# start consume messagre from queue with callback function +rc_rmq.start_consume({ + 'queue': 'queue_name', + 'cb': callback_function +}) + +``` diff --git a/rc_rmq.py b/rc_rmq.py new file mode 100644 index 0000000000000000000000000000000000000000..f3102d496b09e7ad7a325eaf53c1672d96856288 --- /dev/null +++ b/rc_rmq.py @@ -0,0 +1,90 @@ +import json +import pika +import socket +import rabbit_config as rcfg + +class RCRMQ(object): + + USER = 'guest' + PASSWORD = 'guest' + HOST = 'localhost' + PORT = 5672 + VHOST = '/' + EXCHANGE = '' + EXCHANGE_TYPE = 'direct' + QUEUE = None + DURABLE = True + ROUTING_KEY = None + + def __init__(self, config=None): + if config: + if 'exchange' in config: + self.EXCHANGE = config['exchange'] + if 'exchange_type' in config: + self.EXCHANGE_TYPE = config['exchange_type'] + + hostname = socket.gethostname().split(".", 1)[0] + + self.HOST = rcfg.Server if hostname != rcfg.Server else "localhost" + self.USER = rcfg.User + self.PASSWORD = rcfg.Password + self.VHOST = rcfg.VHost + self.PORT = rcfg.Port + self._parameters = pika.ConnectionParameters( + self.HOST, + self.PORT, + self.VHOST, + pika.PlainCredentials(self.USER, self.PASSWORD)) + + def connect(self): + self._connection = pika.BlockingConnection(self._parameters) + self._channel = self._connection.channel() + self._channel.exchange_declare( + exchange=self.EXCHANGE, + exchange_type=self.EXCHANGE_TYPE, + durable=True) + + if self.QUEUE is not None: + self._channel.queue_declare(queue=self.QUEUE, durable=self.DURABLE) + self._channel.queue_bind(exchange=self.EXCHANGE, + queue=self.QUEUE, + routing_key=self.ROUTING_KEY) + + def disconnect(self): + self._channel.close() + self._connection.close() + + def delete_queue(self): + self._channel.queue_delete(self.QUEUE) + + def publish_msg(self, obj): + if 'routing_key' in obj: + self.ROUTING_KEY = obj['routing_key'] + + self.connect() + + self._channel.basic_publish(exchange=self.EXCHANGE, + routing_key=self.ROUTING_KEY, + body=json.dumps(obj['msg'])) + + self.disconnect() + + def start_consume(self, obj): + if 'queue' in obj: + self.QUEUE = obj['queue'] + self.ROUTING_KEY = obj['routing_key'] if 'routing_key' in obj else self.QUEUE + if 'durable' in obj: + self.DURABLE = obj['durable'] + + self.connect() + + self._consumer_tag = self._channel.basic_consume(self.QUEUE,obj['cb']) + try: + self._channel.start_consuming() + except KeyboardInterrupt: + self._channel.stop_consuming() + + self.disconnect() + + def stop_consume(self): + self._channel.basic_cancel(self._consumer_tag) diff --git a/rc_util.py b/rc_util.py new file mode 100644 index 0000000000000000000000000000000000000000..63acbbe37dcf2f2a1700ac98793a52a010d8e34e --- /dev/null +++ b/rc_util.py @@ -0,0 +1,43 @@ +from rc_rmq import RCRMQ +import json + +rc_rmq = RCRMQ({'exchange': 'Register'}) +confirm_rmq = RCRMQ({'exchange': 'Confirm'}) +tasks = {'ohpc_account': False, 'ohpc_homedir': False, 'ood_account': False, 'slurm_account': False} + +def add_account(username, full='', reason=''): + rc_rmq.publish_msg({ + 'routing_key': 'ohpc_account', + 'msg': { + "username": username, + "fullname": full, + "reason": reason + } + }) + +def worker(ch, method, properties, body): + msg = json.loads(body) + task = msg['task'] + print("get msg: {}".format(task)) + tasks[task] = msg['success'] + + # Check if all tasks are done + done = True + for key, status in tasks.items(): + if not status: + print("{} is not done yet.".format(key)) + done = False + if done: + confirm_rmq.stop_consume() + confirm_rmq.delete_queue() + +def consume(username, callback, debug=False): + if debug: + sleep(5) + else: + confirm_rmq.start_consume({ + 'queue': username, + 'cb': callback + }) + + return { 'success' : True }