Skip to content
Snippets Groups Projects
Unverified Commit 9877c0ba authored by Ravi Tripathi's avatar Ravi Tripathi Committed by GitHub
Browse files

Merge pull request #18 from diedpigs/feat-redesign-mq

Redesign RabbitMQ - This PR adds a fan-out architecture for rabbitmq_agents 
parents 41552582 6210465b
No related branches found
No related tags found
No related merge requests found
...@@ -41,7 +41,10 @@ def callback_function(ch, method, properties, body): ...@@ -41,7 +41,10 @@ def callback_function(ch, method, properties, body):
# start consume messagre from queue with callback function # start consume messagre from queue with callback function
rc_rmq.start_consume({ rc_rmq.start_consume({
'queue': 'queue_name', 'queue': 'queue_name',
'routing_key: 'your_key',
'cb': callback_function 'cb': callback_function
}) })
# don't forget to close connection
rc_rmq.disconnect()
``` ```
#!/usr/bin/env python
import sys
import json
from rc_rmq import RCRMQ
task = 'task_name'
# Instantiate rabbitmq object
rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'})
# Define your callback function
def on_message(ch, method, properties, body):
# Retrieve routing key
routing_key = method.routing_key
# Retrieve message
msg = json.loads(body)
# Do Something
print('[{}]: Callback called.'.format(task))
# Acknowledge message
ch.basic_ack(delivery_tag=method.delivery_tag)
print("Start listening to queue: {}".format(task))
rc_rmq.start_consume({
'queue': task, # Define your Queue name
'routing_key': "#", # Define your routing key
'cb': on_message # Pass in callback function you just define
})
#!/usr/bin/env python
import sys
import rc_util
if len(sys.argv) < 2:
print("Usage: {} USERNAME [EMAIL] [FULL_NAME] [REASON]".format(sys.argv[0]), file=sys.stderr)
exit(1)
domain = 'uab.edu'
user_name = sys.argv[1]
email = sys.argv[2] if len(sys.argv) >= 3 else ''
full_name = sys.argv[3] if len(sys.argv) >= 4 else ''
reason = sys.argv[4] if len(sys.argv) >= 5 else ''
if email == '':
if '@' in user_name:
email = user_name
else:
email = user_name + '@' + domain
rc_util.add_account(user_name, email=email, full=full_name, reason=reason)
print("Account requested for user: {}".format(user_name))
print("Waiting for confirmation...")
rc_util.consume(user_name)
#!/usr/bin/env python #!/usr/bin/env python
import pika # python client
import sys import sys
import rabbit_config as rcfg
import socket
import subprocess
import time
import json import json
import subprocess
from pwd import getpwnam from pwd import getpwnam
from rc_rmq import RCRMQ
hostname = socket.gethostname().split(".", 1)[0] task = "ohpc_account"
connect_host = rcfg.Server if hostname != rcfg.Server else "localhost"
queue_name = "ohpc_account_create"
duration = 2
# Set up credentials to connect to RabbitMQ server
credentials = pika.PlainCredentials(rcfg.User, rcfg.Password)
parameters = pika.ConnectionParameters(connect_host,
rcfg.Port,
rcfg.VHost,
credentials)
# Establish connection to RabbitMQ server
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
print("connection established. Listening for messages:")
# create exchange to pass messages # Instantiate rabbitmq object
channel.exchange_declare(exchange=rcfg.Exchange, exchange_type='direct') rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'})
# creates a random name for the newly generated queue
result = channel.queue_declare(queue=queue_name, exclusive=False)
channel.queue_bind(exchange=rcfg.Exchange, queue=queue_name, routing_key=queue_name)
def ohpc_account_create(ch, method, properties, body): def ohpc_account_create(ch, method, properties, body):
msg = json.loads(body) msg = json.loads(body)
print("Message received {}".format(msg)) print("Message received {}".format(msg))
username = msg['username'] username = msg['username']
success = False
try: try:
subprocess.call(["sudo", "useradd", username]) subprocess.call(["sudo", "useradd", username])
print("User {} has been added to {}".format(username, hostname)) print("[{}]: User {} has been added".format(task, username))
success = True
except: except:
print("Failed to create user") e = sys.exc_info()[0]
print("[{}]: Error: {}".format(task, e))
channel.basic_ack(delivery_tag=method.delivery_tag) ch.basic_ack(delivery_tag=method.delivery_tag)
msg['uid'] = getpwnam(username).pw_uid msg['uid'] = getpwnam(username).pw_uid
msg['gid'] = getpwnam(username).pw_gid msg['gid'] = getpwnam(username).pw_gid
channel.basic_publish(exchange=rcfg.Exchange, routing_key='ood_account_create', body=json.dumps(msg))
# ingest messages
channel.basic_consume(queue=queue_name, on_message_callback=ohpc_account_create)
# initiate message ingestion
try:
channel.start_consuming()
except KeyboardInterrupt:
print("Disconnecting from broker.")
channel.stop_consuming()
connection.close() # send confirm message
rc_rmq.publish_msg({
'routing_key': 'confirm.' + username,
'msg': {
'task': task,
'success': success
}
})
if success:
# send create message to other agent
rc_rmq.publish_msg({
'routing_key': 'create.' + username,
'msg': msg
})
print("Start Listening to queue: {}".format(task))
rc_rmq.start_consume({
'queue': task,
'routing_key': 'request.*',
'cb': ohpc_account_create
})
#!/usr/bin/env python #!/usr/bin/env python
import pika # python client
import sys import sys
import rabbit_config as rcfg
import socket
import subprocess
import time
import json import json
import subprocess
from rc_rmq import RCRMQ
hostname = socket.gethostname().split(".", 1)[0] task = 'ood_account'
connect_host = rcfg.Server if hostname != rcfg.Server else "localhost"
queue_name = "ood_account_create"
duration = 2
# Set up credentials to connect to RabbitMQ server
credentials = pika.PlainCredentials(rcfg.User, rcfg.Password)
parameters = pika.ConnectionParameters(connect_host,
rcfg.Port,
rcfg.VHost,
credentials)
# Establish connection to RabbitMQ server
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
print("connection established. Listening for messages:")
# create exchange to pass messages
channel.exchange_declare(exchange=rcfg.Exchange, exchange_type='direct')
# creates a random name for the newly generated queue
result = channel.queue_declare(queue=queue_name, exclusive=False)
channel.queue_bind(exchange=rcfg.Exchange, queue=queue_name, routing_key=queue_name) # Instantiate rabbitmq object
rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'})
def ood_account_create(ch, method, properties, body): def ood_account_create(ch, method, properties, body):
msg = json.loads(body) msg = json.loads(body)
...@@ -39,26 +15,30 @@ def ood_account_create(ch, method, properties, body): ...@@ -39,26 +15,30 @@ def ood_account_create(ch, method, properties, body):
username = msg['username'] username = msg['username']
user_uid = str(msg['uid']) user_uid = str(msg['uid'])
user_gid = str(msg['gid']) user_gid = str(msg['gid'])
success = False
try: try:
subprocess.call(["sudo", "groupadd", "-r", "-g", user_gid, username]) subprocess.call(["sudo", "groupadd", "-r", "-g", user_gid, username])
subprocess.call(["sudo", "useradd", "-u", user_uid, "-g", user_gid, username]) subprocess.call(["sudo", "useradd", "-u", user_uid, "-g", user_gid, username])
print("User {} has been added to {}".format(username, hostname)) print("[{}]: User {} has been added".format(task, username))
success = True
except: except:
print("Failed to create user") e = sys.exc_info()[0]
print("[{}]: Error: {}".format(task, e))
channel.basic_ack(delivery_tag=method.delivery_tag)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_publish(exchange=rcfg.Exchange, routing_key='slurm_add_account', body=json.dumps(msg))
# send confirm message
rc_rmq.publish_msg({
# ingest messages 'routing_key': 'confirm.' + username,
channel.basic_consume(queue=queue_name, on_message_callback=ood_account_create) 'msg': {
'task': task,
# initiate message ingestion 'success': success
try: }
channel.start_consuming() })
except KeyboardInterrupt:
print("Disconnecting from broker.") print("Start listening to queue: {}".format(task))
channel.stop_consuming() rc_rmq.start_consume({
'queue': task,
connection.close() 'routing_key': "create.*",
'cb': ood_account_create
})
...@@ -15,8 +15,9 @@ class RCRMQ(object): ...@@ -15,8 +15,9 @@ class RCRMQ(object):
QUEUE = None QUEUE = None
DURABLE = True DURABLE = True
ROUTING_KEY = None ROUTING_KEY = None
DEBUG = False
def __init__(self, config=None): def __init__(self, config=None, debug=False):
if config: if config:
if 'exchange' in config: if 'exchange' in config:
self.EXCHANGE = config['exchange'] self.EXCHANGE = config['exchange']
...@@ -30,6 +31,23 @@ class RCRMQ(object): ...@@ -30,6 +31,23 @@ class RCRMQ(object):
self.PASSWORD = rcfg.Password self.PASSWORD = rcfg.Password
self.VHOST = rcfg.VHost self.VHOST = rcfg.VHost
self.PORT = rcfg.Port self.PORT = rcfg.Port
self.DEBUG = debug
if self.DEBUG:
print("""
Created RabbitMQ instance with:
Exchange name: {},
Exchange type: {},
Host: {},
User: {},
VHost: {},
Port: {}
""".format(self.EXCHANGE, self.EXCHANGE_TYPE, self.HOST, self.USER, self.VHOST, self.PORT))
self._consumer_tag = None
self._connection = None
self._consuming = False
self._channel = None
self._parameters = pika.ConnectionParameters( self._parameters = pika.ConnectionParameters(
self.HOST, self.HOST,
self.PORT, self.PORT,
...@@ -37,22 +55,26 @@ class RCRMQ(object): ...@@ -37,22 +55,26 @@ class RCRMQ(object):
pika.PlainCredentials(self.USER, self.PASSWORD)) pika.PlainCredentials(self.USER, self.PASSWORD))
def connect(self): def connect(self):
if self.DEBUG:
print("Connecting...\n" + "Exchange: " + self.EXCHANGE + " Exchange type: " + self.EXCHANGE_TYPE)
self._connection = pika.BlockingConnection(self._parameters) self._connection = pika.BlockingConnection(self._parameters)
self._channel = self._connection.channel() self._channel = self._connection.channel()
self._channel.exchange_declare( self._channel.exchange_declare(
exchange=self.EXCHANGE, exchange=self.EXCHANGE,
exchange_type=self.EXCHANGE_TYPE, exchange_type=self.EXCHANGE_TYPE,
durable=True) durable=True)
if self.QUEUE is not None: def bind_queue(self):
self._channel.queue_declare(queue=self.QUEUE, durable=self.DURABLE) self._channel.queue_declare(queue=self.QUEUE, durable=self.DURABLE)
self._channel.queue_bind(exchange=self.EXCHANGE, self._channel.queue_bind(exchange=self.EXCHANGE,
queue=self.QUEUE, queue=self.QUEUE,
routing_key=self.ROUTING_KEY) routing_key=self.ROUTING_KEY)
def disconnect(self): def disconnect(self):
self._channel.close() self._channel.close()
self._connection.close() self._connection.close()
self._connection = None
def delete_queue(self): def delete_queue(self):
self._channel.queue_delete(self.QUEUE) self._channel.queue_delete(self.QUEUE)
...@@ -61,14 +83,13 @@ class RCRMQ(object): ...@@ -61,14 +83,13 @@ class RCRMQ(object):
if 'routing_key' in obj: if 'routing_key' in obj:
self.ROUTING_KEY = obj['routing_key'] self.ROUTING_KEY = obj['routing_key']
self.connect() if self._connection is None:
self.connect()
self._channel.basic_publish(exchange=self.EXCHANGE, self._channel.basic_publish(exchange=self.EXCHANGE,
routing_key=self.ROUTING_KEY, routing_key=self.ROUTING_KEY,
body=json.dumps(obj['msg'])) body=json.dumps(obj['msg']))
self.disconnect()
def start_consume(self, obj): def start_consume(self, obj):
if 'queue' in obj: if 'queue' in obj:
self.QUEUE = obj['queue'] self.QUEUE = obj['queue']
...@@ -76,15 +97,20 @@ class RCRMQ(object): ...@@ -76,15 +97,20 @@ class RCRMQ(object):
if 'durable' in obj: if 'durable' in obj:
self.DURABLE = obj['durable'] self.DURABLE = obj['durable']
self.connect() if self.DEBUG:
print("Queue: " + self.QUEUE + "\nRouting_key: " + self.ROUTING_KEY)
if self._connection is None:
self.connect()
self.bind_queue()
self._consumer_tag = self._channel.basic_consume(self.QUEUE,obj['cb']) self._consumer_tag = self._channel.basic_consume(self.QUEUE,obj['cb'])
self._consuming = True
try: try:
self._channel.start_consuming() self._channel.start_consuming()
except KeyboardInterrupt: except KeyboardInterrupt:
self._channel.stop_consuming() self._channel.stop_consuming()
self.disconnect()
def stop_consume(self): def stop_consume(self):
self._channel.basic_cancel(self._consumer_tag) self._channel.basic_cancel(self._consumer_tag)
import logging
import argparse
from rc_rmq import RCRMQ from rc_rmq import RCRMQ
import json import json
rc_rmq = RCRMQ({'exchange': 'Register'}) rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'})
confirm_rmq = RCRMQ({'exchange': 'Confirm'}) tasks = {'ohpc_account': None, 'ood_account': None, 'slurm_account': None}
tasks = {'ohpc_account': False, 'ohpc_homedir': False, 'ood_account': False, 'slurm_account': False} logger_fmt = '%(asctime)s [%(module)s] - %(message)s'
def add_account(username, full='', reason=''): def add_account(username, email, full='', reason=''):
rc_rmq.publish_msg({ rc_rmq.publish_msg({
'routing_key': 'ohpc_account', 'routing_key': 'request.' + username,
'msg': { 'msg': {
"username": username, "username": username,
"email": email,
"fullname": full, "fullname": full,
"reason": reason "reason": reason
} }
}) })
rc_rmq.disconnect()
def worker(ch, method, properties, body): def worker(ch, method, properties, body):
msg = json.loads(body) msg = json.loads(body)
task = msg['task'] task = msg['task']
print("get msg: {}".format(task))
tasks[task] = msg['success'] tasks[task] = msg['success']
print("Got msg: {}({})".format(msg['task'], msg['success']))
# Check if all tasks are done # Check if all tasks are done
done = True done = True
for key, status in tasks.items(): for key, status in tasks.items():
if not status: if not status:
print("{} is not done yet.".format(key)) print("{} is not done yet.".format(key))
done = False done = False
if done: if done:
confirm_rmq.stop_consume() rc_rmq.stop_consume()
confirm_rmq.delete_queue() rc_rmq.delete_queue()
def consume(username, callback, debug=False): def consume(username, callback=worker, debug=False):
if debug: if debug:
sleep(5) sleep(5)
else: else:
confirm_rmq.start_consume({ rc_rmq.start_consume({
'queue': username, 'queue': username,
'routing_key': 'confirm.' + username,
'cb': callback 'cb': callback
}) })
rc_rmq.disconnect()
return { 'success' : True } return { 'success' : True }
def get_args():
# Parse arguments
parser = argparse.ArgumentParser()
parser.add_argument('-v', '--verbose', action='store_true', help='verbose output')
parser.add_argument('-n', '--dry-run', action='store_true', help='enable dry run mode')
return parser.parse_args()
def get_logger(args=None):
if args is None:
args = get_args()
logger_lvl = logging.WARNING
if args.verbose:
logger_lvl = logging.DEBUG
if args.dry_run:
logger_lvl = logging.INFO
logging.basicConfig(format=logger_fmt, level=logger_lvl)
return logging.getLogger(__name__)
#!/usr/bin/env python #!/usr/bin/env python
import pika # python client
import sys import sys
import rabbit_config as rcfg
import socket
import subprocess
import time
import json import json
import subprocess
from rc_rmq import RCRMQ
hostname = socket.gethostname().split(".", 1)[0] task = 'slurm_account'
connect_host = rcfg.Server if hostname != rcfg.Server else "localhost"
queue_name = "slurm_add_account"
duration = 2
# Set up credentials to connect to RabbitMQ server
credentials = pika.PlainCredentials(rcfg.User, rcfg.Password)
parameters = pika.ConnectionParameters(connect_host,
rcfg.Port,
rcfg.VHost,
credentials)
# Establish connection to RabbitMQ server
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
print("connection established. Listening for messages:")
# create exchange to pass messages
channel.exchange_declare(exchange=rcfg.Exchange, exchange_type='direct')
# creates a random name for the newly generated queue
result = channel.queue_declare(queue=queue_name, exclusive=False)
channel.queue_bind(exchange=rcfg.Exchange, queue=queue_name, routing_key=queue_name) # Instantiate rabbitmq object
rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'})
def slurm_account_create(ch, method, properties, body): def slurm_account_create(ch, method, properties, body):
msg = json.loads(body) msg = json.loads(body)
print("Message received {}".format(msg)) print("Message received {}".format(msg))
username = msg['username'] username = msg['username']
success = False
try: try:
subprocess.call(["sudo", "sacctmgr", "add", "account", username, "-i", "Descripition: Add user"]) subprocess.call(["sudo", "sacctmgr", "add", "account", username, "-i", "Descripition: Add user"])
subprocess.call(["sudo", "sacctmgr", "add", "user", username, "account="+username, "-i"]) subprocess.call(["sudo", "sacctmgr", "add", "user", username, "account=" + username, "-i"])
print("SLURM account for user {} has been added".format(username)) print("SLURM account for user {} has been added".format(username))
success = True
except: except:
print("Failed to create user") e = sys.exc_info()[0]
print("[{}]: Error: {}".format(task, e))
channel.basic_ack(delivery_tag=method.delivery_tag)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_publish(exchange=rcfg.Exchange, routing_key=username, body=json.dumps(msg))
# send confirm message
rc_rmq.publish_msg({
# ingest messages 'routing_key': 'confirm.' + username,
channel.basic_consume(queue=queue_name, on_message_callback=slurm_account_create) 'msg': {
'task': task,
# initiate message ingestion 'success': success
try: }
channel.start_consuming() })
except KeyboardInterrupt:
print("Disconnecting from broker.") print("Start listening to queue: {}".format(task))
channel.stop_consuming() rc_rmq.start_consume({
'queue': task,
connection.close() 'routing_key': "create.*",
'cb': slurm_account_create
})
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment