Skip to content
Snippets Groups Projects
Commit 8d63ca44 authored by Ravi Tripathi's avatar Ravi Tripathi
Browse files

Merge branch 'develop'

parents 33944125 74c77bec
No related branches found
No related tags found
1 merge request!38WIP: Feat cod rmq
......@@ -41,7 +41,10 @@ def callback_function(ch, method, properties, body):
# start consume messagre from queue with callback function
rc_rmq.start_consume({
'queue': 'queue_name',
'routing_key: 'your_key',
'cb': callback_function
})
# don't forget to close connection
rc_rmq.disconnect()
```
#!/usr/bin/env python
import sys
import json
from rc_rmq import RCRMQ
task = 'task_name'
# Instantiate rabbitmq object
rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'})
# Define your callback function
def on_message(ch, method, properties, body):
# Retrieve routing key
routing_key = method.routing_key
# Retrieve message
msg = json.loads(body)
# Do Something
print('[{}]: Callback called.'.format(task))
# Acknowledge message
ch.basic_ack(delivery_tag=method.delivery_tag)
print("Start listening to queue: {}".format(task))
rc_rmq.start_consume({
'queue': task, # Define your Queue name
'routing_key': "#", # Define your routing key
'cb': on_message # Pass in callback function you just define
})
#!/usr/bin/env python3
import sys
import rc_util
if len(sys.argv) < 2:
print("Usage: {} USERNAME [EMAIL] [FULL_NAME] [REASON]".format(sys.argv[0]), file=sys.stderr)
exit(1)
domain = 'uab.edu'
user_name = sys.argv[1]
email = sys.argv[2] if len(sys.argv) >= 3 else ''
full_name = sys.argv[3] if len(sys.argv) >= 4 else ''
reason = sys.argv[4] if len(sys.argv) >= 5 else ''
if email == '':
if '@' in user_name:
email = user_name
else:
email = user_name + '@' + domain
rc_util.add_account(user_name, email=email, full=full_name, reason=reason)
print("Account requested for user: {}".format(user_name))
print("Waiting for confirmation...")
rc_util.consume(user_name)
#!/usr/bin/env python
import pika # python client
import sys
import rabbit_config as rcfg
import socket
import subprocess
import time
import json
import subprocess
from pwd import getpwnam
from rc_rmq import RCRMQ
hostname = socket.gethostname().split(".", 1)[0]
connect_host = rcfg.Server if hostname != rcfg.Server else "localhost"
queue_name = "ohpc_account_create"
duration = 2
# Set up credentials to connect to RabbitMQ server
credentials = pika.PlainCredentials(rcfg.User, rcfg.Password)
parameters = pika.ConnectionParameters(connect_host,
rcfg.Port,
rcfg.VHost,
credentials)
# Establish connection to RabbitMQ server
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
print("connection established. Listening for messages:")
task = "ohpc_account"
# create exchange to pass messages
channel.exchange_declare(exchange=rcfg.Exchange, exchange_type='direct')
# creates a random name for the newly generated queue
result = channel.queue_declare(queue=queue_name, exclusive=False)
channel.queue_bind(exchange=rcfg.Exchange, queue=queue_name, routing_key=queue_name)
# Instantiate rabbitmq object
rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'})
def ohpc_account_create(ch, method, properties, body):
msg = json.loads(body)
print("Message received {}".format(msg))
username = msg['username']
success = False
try:
subprocess.call(["sudo", "useradd", username])
print("User {} has been added to {}".format(username, hostname))
print("[{}]: User {} has been added".format(task, username))
success = True
except:
print("Failed to create user")
e = sys.exc_info()[0]
print("[{}]: Error: {}".format(task, e))
channel.basic_ack(delivery_tag=method.delivery_tag)
ch.basic_ack(delivery_tag=method.delivery_tag)
msg['uid'] = getpwnam(username).pw_uid
msg['gid'] = getpwnam(username).pw_gid
channel.basic_publish(exchange=rcfg.Exchange, routing_key='ood_account_create', body=json.dumps(msg))
# ingest messages
channel.basic_consume(queue=queue_name, on_message_callback=ohpc_account_create)
# initiate message ingestion
try:
channel.start_consuming()
except KeyboardInterrupt:
print("Disconnecting from broker.")
channel.stop_consuming()
connection.close()
# send confirm message
rc_rmq.publish_msg({
'routing_key': 'confirm.' + username,
'msg': {
'task': task,
'success': success
}
})
if success:
# send create message to other agent
rc_rmq.publish_msg({
'routing_key': 'create.' + username,
'msg': msg
})
print("Start Listening to queue: {}".format(task))
rc_rmq.start_consume({
'queue': task,
'routing_key': 'request.*',
'cb': ohpc_account_create
})
#!/usr/bin/env python
import pika # python client
import sys
import rabbit_config as rcfg
import socket
import subprocess
import time
import json
import subprocess
from rc_rmq import RCRMQ
hostname = socket.gethostname().split(".", 1)[0]
connect_host = rcfg.Server if hostname != rcfg.Server else "localhost"
queue_name = "ood_account_create"
duration = 2
# Set up credentials to connect to RabbitMQ server
credentials = pika.PlainCredentials(rcfg.User, rcfg.Password)
parameters = pika.ConnectionParameters(connect_host,
rcfg.Port,
rcfg.VHost,
credentials)
# Establish connection to RabbitMQ server
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
print("connection established. Listening for messages:")
# create exchange to pass messages
channel.exchange_declare(exchange=rcfg.Exchange, exchange_type='direct')
# creates a random name for the newly generated queue
result = channel.queue_declare(queue=queue_name, exclusive=False)
task = 'ood_account'
channel.queue_bind(exchange=rcfg.Exchange, queue=queue_name, routing_key=queue_name)
# Instantiate rabbitmq object
rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'})
def ood_account_create(ch, method, properties, body):
msg = json.loads(body)
......@@ -39,26 +15,30 @@ def ood_account_create(ch, method, properties, body):
username = msg['username']
user_uid = str(msg['uid'])
user_gid = str(msg['gid'])
success = False
try:
subprocess.call(["sudo", "groupadd", "-r", "-g", user_gid, username])
subprocess.call(["sudo", "useradd", "-u", user_uid, "-g", user_gid, username])
print("User {} has been added to {}".format(username, hostname))
print("[{}]: User {} has been added".format(task, username))
success = True
except:
print("Failed to create user")
channel.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_publish(exchange=rcfg.Exchange, routing_key='slurm_add_account', body=json.dumps(msg))
# ingest messages
channel.basic_consume(queue=queue_name, on_message_callback=ood_account_create)
# initiate message ingestion
try:
channel.start_consuming()
except KeyboardInterrupt:
print("Disconnecting from broker.")
channel.stop_consuming()
connection.close()
e = sys.exc_info()[0]
print("[{}]: Error: {}".format(task, e))
ch.basic_ack(delivery_tag=method.delivery_tag)
# send confirm message
rc_rmq.publish_msg({
'routing_key': 'confirm.' + username,
'msg': {
'task': task,
'success': success
}
})
print("Start listening to queue: {}".format(task))
rc_rmq.start_consume({
'queue': task,
'routing_key': "create.*",
'cb': ood_account_create
})
......@@ -15,8 +15,9 @@ class RCRMQ(object):
QUEUE = None
DURABLE = True
ROUTING_KEY = None
DEBUG = False
def __init__(self, config=None):
def __init__(self, config=None, debug=False):
if config:
if 'exchange' in config:
self.EXCHANGE = config['exchange']
......@@ -30,6 +31,23 @@ class RCRMQ(object):
self.PASSWORD = rcfg.Password
self.VHOST = rcfg.VHost
self.PORT = rcfg.Port
self.DEBUG = debug
if self.DEBUG:
print("""
Created RabbitMQ instance with:
Exchange name: {},
Exchange type: {},
Host: {},
User: {},
VHost: {},
Port: {}
""".format(self.EXCHANGE, self.EXCHANGE_TYPE, self.HOST, self.USER, self.VHOST, self.PORT))
self._consumer_tag = None
self._connection = None
self._consuming = False
self._channel = None
self._parameters = pika.ConnectionParameters(
self.HOST,
self.PORT,
......@@ -37,22 +55,26 @@ class RCRMQ(object):
pika.PlainCredentials(self.USER, self.PASSWORD))
def connect(self):
if self.DEBUG:
print("Connecting...\n" + "Exchange: " + self.EXCHANGE + " Exchange type: " + self.EXCHANGE_TYPE)
self._connection = pika.BlockingConnection(self._parameters)
self._channel = self._connection.channel()
self._channel.exchange_declare(
exchange=self.EXCHANGE,
exchange=self.EXCHANGE,
exchange_type=self.EXCHANGE_TYPE,
durable=True)
if self.QUEUE is not None:
self._channel.queue_declare(queue=self.QUEUE, durable=self.DURABLE)
self._channel.queue_bind(exchange=self.EXCHANGE,
queue=self.QUEUE,
routing_key=self.ROUTING_KEY)
def bind_queue(self):
self._channel.queue_declare(queue=self.QUEUE, durable=self.DURABLE)
self._channel.queue_bind(exchange=self.EXCHANGE,
queue=self.QUEUE,
routing_key=self.ROUTING_KEY)
def disconnect(self):
self._channel.close()
self._connection.close()
self._connection = None
def delete_queue(self):
self._channel.queue_delete(self.QUEUE)
......@@ -61,14 +83,13 @@ class RCRMQ(object):
if 'routing_key' in obj:
self.ROUTING_KEY = obj['routing_key']
self.connect()
if self._connection is None:
self.connect()
self._channel.basic_publish(exchange=self.EXCHANGE,
routing_key=self.ROUTING_KEY,
body=json.dumps(obj['msg']))
self.disconnect()
def start_consume(self, obj):
if 'queue' in obj:
self.QUEUE = obj['queue']
......@@ -76,15 +97,20 @@ class RCRMQ(object):
if 'durable' in obj:
self.DURABLE = obj['durable']
self.connect()
if self.DEBUG:
print("Queue: " + self.QUEUE + "\nRouting_key: " + self.ROUTING_KEY)
if self._connection is None:
self.connect()
self.bind_queue()
self._consumer_tag = self._channel.basic_consume(self.QUEUE,obj['cb'])
self._consuming = True
try:
self._channel.start_consuming()
except KeyboardInterrupt:
self._channel.stop_consuming()
self.disconnect()
def stop_consume(self):
self._channel.basic_cancel(self._consumer_tag)
import logging
import argparse
from rc_rmq import RCRMQ
import json
rc_rmq = RCRMQ({'exchange': 'Register'})
confirm_rmq = RCRMQ({'exchange': 'Confirm'})
tasks = {'ohpc_account': False, 'ohpc_homedir': False, 'ood_account': False, 'slurm_account': False}
rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'})
tasks = {'ohpc_account': None, 'ood_account': None, 'slurm_account': None}
logger_fmt = '%(asctime)s [%(module)s] - %(message)s'
def add_account(username, full='', reason=''):
def add_account(username, email, full='', reason=''):
rc_rmq.publish_msg({
'routing_key': 'ohpc_account',
'routing_key': 'request.' + username,
'msg': {
"username": username,
"email": email,
"fullname": full,
"reason": reason
}
})
rc_rmq.disconnect()
def worker(ch, method, properties, body):
msg = json.loads(body)
task = msg['task']
print("get msg: {}".format(task))
tasks[task] = msg['success']
print("Got msg: {}({})".format(msg['task'], msg['success']))
# Check if all tasks are done
done = True
for key, status in tasks.items():
if not status:
print("{} is not done yet.".format(key))
done = False
done = False
if done:
confirm_rmq.stop_consume()
confirm_rmq.delete_queue()
def consume(username, callback, debug=False):
rc_rmq.stop_consume()
rc_rmq.delete_queue()
def consume(username, callback=worker, debug=False):
if debug:
sleep(5)
else:
confirm_rmq.start_consume({
rc_rmq.start_consume({
'queue': username,
'routing_key': 'confirm.' + username,
'cb': callback
})
rc_rmq.disconnect()
return { 'success' : True }
def get_args():
# Parse arguments
parser = argparse.ArgumentParser()
parser.add_argument('-v', '--verbose', action='store_true', help='verbose output')
parser.add_argument('-n', '--dry-run', action='store_true', help='enable dry run mode')
return parser.parse_args()
def get_logger(args=None):
if args is None:
args = get_args()
logger_lvl = logging.WARNING
if args.verbose:
logger_lvl = logging.DEBUG
if args.dry_run:
logger_lvl = logging.INFO
logging.basicConfig(format=logger_fmt, level=logger_lvl)
return logging.getLogger(__name__)
#!/usr/bin/env python
import pika # python client
import sys
import rabbit_config as rcfg
import socket
import subprocess
import time
import json
import subprocess
from rc_rmq import RCRMQ
hostname = socket.gethostname().split(".", 1)[0]
connect_host = rcfg.Server if hostname != rcfg.Server else "localhost"
queue_name = "slurm_add_account"
duration = 2
# Set up credentials to connect to RabbitMQ server
credentials = pika.PlainCredentials(rcfg.User, rcfg.Password)
parameters = pika.ConnectionParameters(connect_host,
rcfg.Port,
rcfg.VHost,
credentials)
# Establish connection to RabbitMQ server
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
print("connection established. Listening for messages:")
# create exchange to pass messages
channel.exchange_declare(exchange=rcfg.Exchange, exchange_type='direct')
# creates a random name for the newly generated queue
result = channel.queue_declare(queue=queue_name, exclusive=False)
task = 'slurm_account'
channel.queue_bind(exchange=rcfg.Exchange, queue=queue_name, routing_key=queue_name)
# Instantiate rabbitmq object
rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'})
def slurm_account_create(ch, method, properties, body):
msg = json.loads(body)
print("Message received {}".format(msg))
username = msg['username']
success = False
try:
subprocess.call(["sudo", "sacctmgr", "add", "account", username, "-i", "Descripition: Add user"])
subprocess.call(["sudo", "sacctmgr", "add", "user", username, "account="+username, "-i"])
subprocess.call(["sudo", "sacctmgr", "add", "user", username, "account=" + username, "-i"])
print("SLURM account for user {} has been added".format(username))
success = True
except:
print("Failed to create user")
channel.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_publish(exchange=rcfg.Exchange, routing_key=username, body=json.dumps(msg))
# ingest messages
channel.basic_consume(queue=queue_name, on_message_callback=slurm_account_create)
# initiate message ingestion
try:
channel.start_consuming()
except KeyboardInterrupt:
print("Disconnecting from broker.")
channel.stop_consuming()
connection.close()
e = sys.exc_info()[0]
print("[{}]: Error: {}".format(task, e))
ch.basic_ack(delivery_tag=method.delivery_tag)
# send confirm message
rc_rmq.publish_msg({
'routing_key': 'confirm.' + username,
'msg': {
'task': task,
'success': success
}
})
print("Start listening to queue: {}".format(task))
rc_rmq.start_consume({
'queue': task,
'routing_key': "create.*",
'cb': slurm_account_create
})
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment