Skip to content
Snippets Groups Projects

Rewrite agents

Closed Bo-Chun Chen requested to merge github/fork/diedpigs/feat-rewrite-agents into develop
4 files
+ 98
138
Compare changes
  • Side-by-side
  • Inline
Files
4
+ 20
45
#!/usr/bin/env python
import pika # python client
import sys
import rabbit_config as rcfg
import socket
import subprocess
import time
import json
from pwd import getpwnam
from rc_rmq import RCRMQ
hostname = socket.gethostname().split(".", 1)[0]
connect_host = rcfg.Server if hostname != rcfg.Server else "localhost"
queue_name = "ohpc_account_create"
duration = 2
task = 'ohpc_account'
# Set up credentials to connect to RabbitMQ server
credentials = pika.PlainCredentials(rcfg.User, rcfg.Password)
parameters = pika.ConnectionParameters(connect_host,
rcfg.Port,
rcfg.VHost,
credentials)
# Establish connection to RabbitMQ server
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
print("connection established. Listening for messages:")
# create exchange to pass messages
channel.exchange_declare(exchange=rcfg.Exchange, exchange_type='direct')
# creates a random name for the newly generated queue
result = channel.queue_declare(queue=queue_name, exclusive=False)
channel.queue_bind(exchange=rcfg.Exchange, queue=queue_name, routing_key=queue_name)
# Instantiate rabbitmq object
rc_rmq = RCRMQ({'exchange': 'Register'})
confirm_rmq = RCRMQ({'exchange': 'Confirm'})
fanout_rmq = RCRMQ({'exchange': 'Create', 'exchange_type': 'fanout'})
def ohpc_account_create(ch, method, properties, body):
msg = json.loads(body)
print("Message received {}".format(msg))
username = msg['username']
success = False
try:
subprocess.call(["sudo", "useradd", username])
print("User {} has been added to {}".format(username, hostname))
subprocess.call(["sudo", "useradd", "-M", username])
print("User {} has been added".format(username))
success = True
except:
print("Failed to create user")
e = sys.exc_info()[0]
print("Error: {}".format(e))
channel.basic_ack(delivery_tag=method.delivery_tag)
ch.basic_ack(delivery_tag=method.delivery_tag)
msg['uid'] = getpwnam(username).pw_uid
msg['gid'] = getpwnam(username).pw_gid
channel.basic_publish(exchange=rcfg.Exchange, routing_key='ood_account_create', body=json.dumps(msg))
# ingest messages
channel.basic_consume(queue=queue_name, on_message_callback=ohpc_account_create)
# initiate message ingestion
try:
channel.start_consuming()
except KeyboardInterrupt:
print("Disconnecting from broker.")
channel.stop_consuming()
fanout_rmq.publish_msg({ 'routing_key': 'ohpc_create', 'msg': msg })
confirm_rmq.publish_msg({ 'routing_key': username, 'msg': { 'task': task, 'success': success }})
connection.close()
print("Start listening to queue '{}' in exchange 'Register'.".format(task))
rc_rmq.start_consume({
'queue': task,
'cb': ohpc_account_create
})
Loading