From 521a74864f98a3cf82455bd65361044fba60fc87 Mon Sep 17 00:00:00 2001 From: Mitchell Moore <mmoo97@uab.edu> Date: Tue, 21 Jan 2020 08:43:38 -0600 Subject: [PATCH] Created a base_consumer script to test out procucer calls from flask run.py. Updated Readme and requiremnts - Add base consumer script - Update comment - Create multiple consumer files - add basic ood consumer - add basic ohpc consumer - add basic manager consumer/remove sys imports - complete base producer for ohpc/ood - Update readme - Fix readme format - Add supervisor to requirements - Add comments and display routing to manager - Link tutorial in README - Quick update with consumers - Allow robust command line args - Fix connection parameters for cluster - Update README.md --- README.md | 54 ++++++++++++++++++++++++++++++++++++++++++++++-- base_consumer.py | 39 ++++++++++++++++++++++++++++++++++ requirements.txt | 1 + run.py | 15 +++++++++----- test_producer.py | 23 +++++++++++++++++++++ 5 files changed, 125 insertions(+), 7 deletions(-) create mode 100644 base_consumer.py create mode 100644 test_producer.py diff --git a/README.md b/README.md index d48fc6b..a58d15f 100644 --- a/README.md +++ b/README.md @@ -4,18 +4,68 @@ To clone this repo use the command: ``` $ git clone https://gitlab.rc.uab.edu/mmoo97/flask_user_reg.git - ``` ## Prerequisites +### Clone Repository - Ensure `pip` is installed (see: https://packaging.python.org/guides/installing-using-pip-and-virtualenv/ ). + - Check if installed by typing `$ pip` for Mac/Linux or `$ py` for Windows. + - Mac/Linux: Install pip using `$ python -m pip install --user --upgrade pip`. + - Windows: Install pip using `$ py -m pip install --upgrade pip` - Ensure you have created a [virtual environment](https://packaging.python.org/guides/installing-using-pip-and-virtual-environments) called `venv` setup within the cloned project. - - Note, this project requires a virtual environment running python2 (2.7) + - Note, this project requires a virtual environment running python2 (2.7.x) - Ensure Flask and other dependencies are installed using the following commands: ``` $ cd ~/your/repo/path/flask_user_reg +$ git checkout version-1b-openstack-rabbitmq $ source venv/bin/activate $ pip install -r requirements.txt ``` - Note, to install flask in your own `$HOME` use `pip install --user Flask`. +### Install RabbitMQ +(Reference: [here](https://www.rabbitmq.com/tutorials/tutorial-four-python.html)) +- Install RabbitMQ server on the host machine. ([Installation Guide](https://www.rabbitmq.com/download.html)) + - #### Mac/Linux + - `$ brew update` + - `$ brew install rabbitmq` + - it is recommended that you add the following line to your `.bash_profile`: + `export PATH=$PATH:/usr/local/opt/rabbitmq/sbin`. + - Start server using the command `$ rabbitmq-server`. (Note, this implementation assumes RabbitMQ is running on localhost on standard port 5672) + - #### Windows + - Download the installer from [here](https://github.com/rabbitmq/rabbitmq-server/releases) and run. + - Post install, the server should be running. To check, run `$ rabbitmqctl.bat status`. + - #### CentOS 7 + - First, import signing key using `$ rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc` + - Next, install Erlang using `$ yum install https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm` + - Finally, install RabbitMq using `$ yum install rabbitmq-server-3.8.2-1.el8.noarch.rpm` + - The server is not started as a daemon by default when the RabbitMQ server package is installed. To start the + daemon by default when the system boots, as an administrator run: `$ chkconfig rabbitmq-server on`. + - As an administrator, start and stop the server as usual: <br/>`/sbin/service rabbitmq-server start` + <br/> `/sbin/service rabbitmq-server stop` + - Add the user "reggie" using the command `$ rabbitmqctl add_user reggie reggie`. + - The current configuration assumes the following username password combination. To change the password, type + `$ rabbitmqctl change_password reggie <new_password>`. + - Note that rabbitmqctl may require sudo permissions and that changing the password will require a password + change in the credentials within `test_producer.py` and `base_consumer.py` as well. + +## Test RabbitMQ +For a simple example on the functionality of RabbitMQ, do the following: +- Open up a new ssh terminal and ssh into your ohpc node and, in a separate window, ssh into your ood node. +- Once in, ensure your rabbitmq server is running using the command ` ` +- Additionally, ensure you have a rabbitmq user configured with the username and password as `reggie`. +- + +``` +# Run consumer on ohpc node +$ python base_consumer.py ohpc +``` + You will notice that the script is awaiting a message to display. + To produce a message, run the following on the ood node terminal session: + ``` +$ python test_producer.py ohpc +``` +You should now see that the message has been sent and displayed on the ohpc node. +- **Note,** that the `test_producer.py` script is identical to the code within the `ingest_data()` function in `run.py`. + + \ No newline at end of file diff --git a/base_consumer.py b/base_consumer.py new file mode 100644 index 0000000..ebfe9dd --- /dev/null +++ b/base_consumer.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python +import pika # python client +import sys + +credentials = pika.PlainCredentials('reggie', 'reggie') +parameters = pika.ConnectionParameters('ood', + 5672, + '/', + credentials) +connection = pika.BlockingConnection(parameters) + +channel = connection.channel() + +channel.exchange_declare(exchange='direct_logs', exchange_type='direct') # create exchange to pass messages + +result = channel.queue_declare(queue='', exclusive=True) +queue_name = result.method.queue # creates a random name for the newly generated queue + +nodes = sys.argv[1:] +if not nodes: + sys.stderr.write("Usage: %s [ood] [ohpc] [manager]\n" % sys.argv[0]) + sys.exit(1) + +for node in nodes: + channel.queue_bind( + exchange='direct_logs', queue=queue_name, routing_key=node) # combine exchange, queue, and define routing name + +print(' [*] Waiting for logs. To exit press CTRL+C') + + +def callback(ch, method, properties, body): + print(" [x] %r:%r" % (method.routing_key, body)) + print('[%r] User creation task is done.' % method.routing_key) + + +channel.basic_consume( + queue=queue_name, on_message_callback=callback, auto_ack=True) # ingest messages, and assume delivered via auto_ack + +channel.start_consuming() # initiate message ingestion diff --git a/requirements.txt b/requirements.txt index 86d3ce5..ad23ea5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -47,6 +47,7 @@ six==1.13.0 SQLAlchemy==1.3.11 SQLAlchemy-Utils==0.35.0 stevedore==1.31.0 +supervisor==4.1.0 urllib3==1.25.7 URLObject==2.4.3 virtualenv==16.7.7 diff --git a/run.py b/run.py index d671071..ce01ce7 100644 --- a/run.py +++ b/run.py @@ -51,20 +51,25 @@ def handle_my_custom_event(json, methods=['GET', 'POST']): def ingest_data(json, methods=['GET', 'POST']): print (time.strftime("%m-%d-%Y_%H:%M:%S") + '\tQueue request received: ' + str(json)) + # Begin RabbitMQ process. connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') - severity = sys.argv[1] if len(sys.argv) > 1 else 'info' - message = ' '.join(sys.argv[2:]) or 'Hello World!' + message = "Hey there" # todo: account info goes here + + channel.basic_publish( + exchange='direct_logs', routing_key="ohpc", body=message) + print(" [x] Sent %r:%r" % ("ohpc", message)) + channel.basic_publish( - exchange='direct_logs', routing_key=severity, body=message) - print(" [x] Sent %r:%r" % (severity, message)) + exchange='direct_logs', routing_key="ood", body=message) + print(" [x] Sent %r:%r" % ("ood", message)) connection.close() - # Todo: Make the remaining protion of this method in a RabbitMQ consumer + # Todo: Make the remaining portion of this method in a RabbitMQ consumer # try: # fullname = json["fullname"] diff --git a/test_producer.py b/test_producer.py new file mode 100644 index 0000000..ce69cf6 --- /dev/null +++ b/test_producer.py @@ -0,0 +1,23 @@ +import pika +import sys + +# Begin RabbitMQ process. +credentials = pika.PlainCredentials('reggie', 'reggie') +parameters = pika.ConnectionParameters('ood', + 5672, + '/', + credentials) + +connection = pika.BlockingConnection(parameters) + +channel = connection.channel() + +channel.exchange_declare(exchange='direct_logs', exchange_type='direct') + +node = sys.argv[1] if len(sys.argv) > 1 else 'info' +message = ' '.join(sys.argv[2:]) or 'Hello World!' +channel.basic_publish( + exchange='direct_logs', routing_key=node, body=message) +print(" [x] Sent %r:%r" % (node, message)) + +connection.close() -- GitLab