diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000000000000000000000000000000000000..c786d1bc0fd7e67779e94063058c626e1f973785 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,15 @@ +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v3.2.0 + hooks: + - id: trailing-whitespace + - id: end-of-file-fixer + - id: check-yaml + - repo: https://github.com/psf/black + rev: 21.5b0 + hooks: + - id: black + - repo: https://github.com/pycqa/flake8 + rev: 3.9.1 + hooks: + - id: flake8 diff --git a/agent_template.py b/agent_template.py index c6e722338bad04999621bd7dfd47a41d2b962d28..3e09cfd651d7339c028d0ff6939048d249972a17 100644 --- a/agent_template.py +++ b/agent_template.py @@ -3,10 +3,10 @@ import sys import json from rc_rmq import RCRMQ -task = 'task_name' +task = "task_name" # Instantiate rabbitmq object -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) # Define your callback function def on_message(ch, method, properties, body): @@ -18,15 +18,17 @@ def on_message(ch, method, properties, body): msg = json.loads(body) # Do Something - print('[{}]: Callback called.'.format(task)) + print("[{}]: Callback called.".format(task)) # Acknowledge message ch.basic_ack(delivery_tag=method.delivery_tag) print("Start listening to queue: {}".format(task)) -rc_rmq.start_consume({ - 'queue': task, # Define your Queue name - 'routing_key': "#", # Define your routing key - 'cb': on_message # Pass in callback function you just define -}) +rc_rmq.start_consume( + { + "queue": task, # Define your Queue name + "routing_key": "#", # Define your routing key + "cb": on_message, # Pass in callback function you just define + } +) diff --git a/dev_rmq_agents/ohpc_account_create.py b/dev_rmq_agents/ohpc_account_create.py index 4504702f8d8f8d611b8db1ab4b52811b08ec6ade..206c4045094814751adc18876bf51d797911fd53 100644 --- a/dev_rmq_agents/ohpc_account_create.py +++ b/dev_rmq_agents/ohpc_account_create.py @@ -8,12 +8,13 @@ from rc_rmq import RCRMQ task = "ohpc_account" # Instantiate rabbitmq object -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) + def ohpc_account_create(ch, method, properties, body): msg = json.loads(body) print("Message received {}".format(msg)) - username = msg['username'] + username = msg["username"] success = False try: subprocess.call(["sudo", "useradd", username]) @@ -24,28 +25,25 @@ def ohpc_account_create(ch, method, properties, body): print("[{}]: Error: {}".format(task, e)) ch.basic_ack(delivery_tag=method.delivery_tag) - msg['uid'] = getpwnam(username).pw_uid - msg['gid'] = getpwnam(username).pw_gid + msg["uid"] = getpwnam(username).pw_uid + msg["gid"] = getpwnam(username).pw_gid # send confirm message - rc_rmq.publish_msg({ - 'routing_key': 'confirm.' + username, - 'msg': { - 'task': task, - 'success': success + rc_rmq.publish_msg( + { + "routing_key": "confirm." + username, + "msg": {"task": task, "success": success}, } - }) + ) if success: # send create message to other agent - rc_rmq.publish_msg({ - 'routing_key': 'create.' + username, - 'msg': msg - }) + rc_rmq.publish_msg( + {"routing_key": "create." + username, "msg": msg} + ) + print("Start Listening to queue: {}".format(task)) -rc_rmq.start_consume({ - 'queue': task, - 'routing_key': 'request.*', - 'cb': ohpc_account_create -}) +rc_rmq.start_consume( + {"queue": task, "routing_key": "request.*", "cb": ohpc_account_create} +) diff --git a/dev_rmq_agents/ood_account_create.py b/dev_rmq_agents/ood_account_create.py index 1ff9cc9fd8a61df98ae1473c20e01480f6e4aa90..339819ec86044ae180dfe1ecb40345ee6f3d5904 100644 --- a/dev_rmq_agents/ood_account_create.py +++ b/dev_rmq_agents/ood_account_create.py @@ -4,21 +4,26 @@ import json import subprocess from rc_rmq import RCRMQ -task = 'ood_account' +task = "ood_account" # Instantiate rabbitmq object -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) + def ood_account_create(ch, method, properties, body): msg = json.loads(body) print("Message received {}".format(msg)) - username = msg['username'] - user_uid = str(msg['uid']) - user_gid = str(msg['gid']) + username = msg["username"] + user_uid = str(msg["uid"]) + user_gid = str(msg["gid"]) success = False try: - subprocess.call(["sudo", "groupadd", "-r", "-g", user_gid, username]) - subprocess.call(["sudo", "useradd", "-u", user_uid, "-g", user_gid, username]) + subprocess.call( + ["sudo", "groupadd", "-r", "-g", user_gid, username] + ) + subprocess.call( + ["sudo", "useradd", "-u", user_uid, "-g", user_gid, username] + ) print("[{}]: User {} has been added".format(task, username)) success = True except: @@ -28,17 +33,15 @@ def ood_account_create(ch, method, properties, body): ch.basic_ack(delivery_tag=method.delivery_tag) # send confirm message - rc_rmq.publish_msg({ - 'routing_key': 'confirm.' + username, - 'msg': { - 'task': task, - 'success': success + rc_rmq.publish_msg( + { + "routing_key": "confirm." + username, + "msg": {"task": task, "success": success}, } - }) + ) + print("Start listening to queue: {}".format(task)) -rc_rmq.start_consume({ - 'queue': task, - 'routing_key': "create.*", - 'cb': ood_account_create -}) +rc_rmq.start_consume( + {"queue": task, "routing_key": "create.*", "cb": ood_account_create} +) diff --git a/dev_rmq_agents/slurm_agent.py b/dev_rmq_agents/slurm_agent.py index e07af9bd233b952aa385848a4504859f31a22177..eacc44c92ee61de611c6576cc0642adf6ed7992a 100755 --- a/dev_rmq_agents/slurm_agent.py +++ b/dev_rmq_agents/slurm_agent.py @@ -4,19 +4,40 @@ import json import subprocess from rc_rmq import RCRMQ -task = 'slurm_account' +task = "slurm_account" # Instantiate rabbitmq object -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) + def slurm_account_create(ch, method, properties, body): msg = json.loads(body) print("Message received {}".format(msg)) - username = msg['username'] + username = msg["username"] success = False try: - subprocess.call(["sudo", "sacctmgr", "add", "account", username, "-i", "Descripition: Add user"]) - subprocess.call(["sudo", "sacctmgr", "add", "user", username, "account=" + username, "-i"]) + subprocess.call( + [ + "sudo", + "sacctmgr", + "add", + "account", + username, + "-i", + "Descripition: Add user", + ] + ) + subprocess.call( + [ + "sudo", + "sacctmgr", + "add", + "user", + username, + "account=" + username, + "-i", + ] + ) print("SLURM account for user {} has been added".format(username)) success = True except: @@ -26,17 +47,15 @@ def slurm_account_create(ch, method, properties, body): ch.basic_ack(delivery_tag=method.delivery_tag) # send confirm message - rc_rmq.publish_msg({ - 'routing_key': 'confirm.' + username, - 'msg': { - 'task': task, - 'success': success + rc_rmq.publish_msg( + { + "routing_key": "confirm." + username, + "msg": {"task": task, "success": success}, } - }) + ) + print("Start listening to queue: {}".format(task)) -rc_rmq.start_consume({ - 'queue': task, - 'routing_key': "create.*", - 'cb': slurm_account_create -}) +rc_rmq.start_consume( + {"queue": task, "routing_key": "create.*", "cb": slurm_account_create} +) diff --git a/flask_producer.py b/flask_producer.py index dc3c44634745278fa426821d2ea0a2ecc9e61e3f..1ef993ad630fb5cc1a0ba25a72b69c8a9e619471 100755 --- a/flask_producer.py +++ b/flask_producer.py @@ -15,7 +15,7 @@ user_name = sys.argv[2] message = { "username": user_name, "fullname": "Full Name", - "reason": "Reason1, Reason2." + "reason": "Reason1, Reason2.", } hostname = socket.gethostname().split(".", 1)[0] @@ -23,33 +23,44 @@ connect_host = rcfg.Server if hostname != rcfg.Server else "localhost" # Set up credentials to connect to RabbitMQ server credentials = pika.PlainCredentials(rcfg.User, rcfg.Password) -parameters = pika.ConnectionParameters(connect_host, - rcfg.Port, - rcfg.VHost, - credentials) +parameters = pika.ConnectionParameters( + connect_host, rcfg.Port, rcfg.VHost, credentials +) # Establish connection to RabbitMQ server connection = pika.BlockingConnection(parameters) channel = connection.channel() -channel.exchange_declare(exchange=rcfg.Exchange, exchange_type='direct') +channel.exchange_declare(exchange=rcfg.Exchange, exchange_type="direct") -channel.basic_publish(exchange=rcfg.Exchange, routing_key=node, body=json.dumps(message)) +channel.basic_publish( + exchange=rcfg.Exchange, routing_key=node, body=json.dumps(message) +) print(" [x] Sent {}: {}".format(node, json.dumps(message))) # creates a named queue -result = channel.queue_declare(queue=user_name, exclusive=False, durable=True) +result = channel.queue_declare( + queue=user_name, exclusive=False, durable=True +) # bind the queue with exchange -channel.queue_bind(exchange=rcfg.Exchange, queue=user_name, routing_key=user_name) +channel.queue_bind( + exchange=rcfg.Exchange, queue=user_name, routing_key=user_name +) + def work(ch, method, properties, body): msg = json.loads(body) - print("Received message from {}: \n\t{}".format(method.routing_key, msg)) - #queue_unbind(queue, exchange=None, routing_key=None, arguments=None, callback=None) + print( + "Received message from {}: \n\t{}".format(method.routing_key, msg) + ) + # queue_unbind(queue, exchange=None, routing_key=None, arguments=None, callback=None) channel.queue_delete(method.routing_key) + # ingest messages, and assume delivered via auto_ack -channel.basic_consume(queue=sys.argv[2], on_message_callback=work, auto_ack=True) +channel.basic_consume( + queue=sys.argv[2], on_message_callback=work, auto_ack=True +) print("Subscribing to queue: {}".format(sys.argv[2])) # initiate message ingestion diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000000000000000000000000000000000000..1d15ad7d9a23fece8d2321b3f09abba072fa4c6e --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,3 @@ +[tool.black] +line-length=79 +target-version=['py36'] diff --git a/rc_rmq.py b/rc_rmq.py index 37afa3f44f872454a3b96fccf2d5a6a538fe01cf..daa61c237e77662b86238b29c18e2afe5ba6c001 100644 --- a/rc_rmq.py +++ b/rc_rmq.py @@ -3,15 +3,16 @@ import pika import socket import rabbit_config as rcfg + class RCRMQ(object): - USER = 'guest' - PASSWORD = 'guest' - HOST = 'localhost' + USER = "guest" + PASSWORD = "guest" + HOST = "localhost" PORT = 5672 - VHOST = '/' - EXCHANGE = '' - EXCHANGE_TYPE = 'direct' + VHOST = "/" + EXCHANGE = "" + EXCHANGE_TYPE = "direct" QUEUE = None DURABLE = True ROUTING_KEY = None @@ -19,10 +20,10 @@ class RCRMQ(object): def __init__(self, config=None, debug=False): if config: - if 'exchange' in config: - self.EXCHANGE = config['exchange'] - if 'exchange_type' in config: - self.EXCHANGE_TYPE = config['exchange_type'] + if "exchange" in config: + self.EXCHANGE = config["exchange"] + if "exchange_type" in config: + self.EXCHANGE_TYPE = config["exchange_type"] hostname = socket.gethostname().split(".", 1)[0] @@ -34,7 +35,8 @@ class RCRMQ(object): self.DEBUG = debug if self.DEBUG: - print(""" + print( + """ Created RabbitMQ instance with: Exchange name: {}, Exchange type: {}, @@ -42,34 +44,52 @@ class RCRMQ(object): User: {}, VHost: {}, Port: {} - """.format(self.EXCHANGE, self.EXCHANGE_TYPE, self.HOST, self.USER, self.VHOST, self.PORT)) + """.format( + self.EXCHANGE, + self.EXCHANGE_TYPE, + self.HOST, + self.USER, + self.VHOST, + self.PORT, + ) + ) self._consumer_tag = None self._connection = None self._consuming = False self._channel = None self._parameters = pika.ConnectionParameters( - self.HOST, - self.PORT, - self.VHOST, - pika.PlainCredentials(self.USER, self.PASSWORD)) + self.HOST, + self.PORT, + self.VHOST, + pika.PlainCredentials(self.USER, self.PASSWORD), + ) def connect(self): if self.DEBUG: - print("Connecting...\n" + "Exchange: " + self.EXCHANGE + " Exchange type: " + self.EXCHANGE_TYPE) + print( + "Connecting...\n" + + "Exchange: " + + self.EXCHANGE + + " Exchange type: " + + self.EXCHANGE_TYPE + ) self._connection = pika.BlockingConnection(self._parameters) self._channel = self._connection.channel() self._channel.exchange_declare( - exchange=self.EXCHANGE, - exchange_type=self.EXCHANGE_TYPE, - durable=True) + exchange=self.EXCHANGE, + exchange_type=self.EXCHANGE_TYPE, + durable=True, + ) def bind_queue(self): self._channel.queue_declare(queue=self.QUEUE, durable=self.DURABLE) - self._channel.queue_bind(exchange=self.EXCHANGE, - queue=self.QUEUE, - routing_key=self.ROUTING_KEY) + self._channel.queue_bind( + exchange=self.EXCHANGE, + queue=self.QUEUE, + routing_key=self.ROUTING_KEY, + ) def disconnect(self): self._channel.close() @@ -80,32 +100,43 @@ class RCRMQ(object): self._channel.queue_delete(self.QUEUE) def publish_msg(self, obj): - if 'routing_key' in obj: - self.ROUTING_KEY = obj['routing_key'] + if "routing_key" in obj: + self.ROUTING_KEY = obj["routing_key"] if self._connection is None: self.connect() - self._channel.basic_publish(exchange=self.EXCHANGE, - routing_key=self.ROUTING_KEY, - body=json.dumps(obj['msg'])) + self._channel.basic_publish( + exchange=self.EXCHANGE, + routing_key=self.ROUTING_KEY, + body=json.dumps(obj["msg"]), + ) def start_consume(self, obj): - if 'queue' in obj: - self.QUEUE = obj['queue'] - self.ROUTING_KEY = obj['routing_key'] if 'routing_key' in obj else self.QUEUE - if 'durable' in obj: - self.DURABLE = obj['durable'] + if "queue" in obj: + self.QUEUE = obj["queue"] + self.ROUTING_KEY = ( + obj["routing_key"] if "routing_key" in obj else self.QUEUE + ) + if "durable" in obj: + self.DURABLE = obj["durable"] if self.DEBUG: - print("Queue: " + self.QUEUE + "\nRouting_key: " + self.ROUTING_KEY) + print( + "Queue: " + + self.QUEUE + + "\nRouting_key: " + + self.ROUTING_KEY + ) if self._connection is None: self.connect() self.bind_queue() - self._consumer_tag = self._channel.basic_consume(self.QUEUE,obj['cb']) + self._consumer_tag = self._channel.basic_consume( + self.QUEUE, obj["cb"] + ) self._consuming = True try: self._channel.start_consuming() diff --git a/rc_util.py b/rc_util.py index 8caba5d04bb432294993706a59d71ca81bab0e6e..d3cb3b613c6fd18408ff68392c39ecc7706d5e1e 100644 --- a/rc_util.py +++ b/rc_util.py @@ -29,7 +29,7 @@ def add_account(username, queuename, email, full="", reason=""): } ) rc_rmq.disconnect() - + def worker(ch, method, properties, body): msg = json.loads(body) diff --git a/requirements.txt b/requirements.txt index 562cb606a776a6080f5acab0f53490d766dd2a2b..e62954e164da5deb8aec7508d6a940265f40d2da 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,5 @@ pika==1.1.0 pyldap==3.0.0.post1 dataset==1.3.1 Jinja2==2.11.2 -sh==1.12.14 \ No newline at end of file +sh==1.12.14 +pre-commit==2.12.1