Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • rc/rabbitmq_agents
  • louistw/rabbitmq_agents
  • krish94/rabbitmq_agents
  • dwheel7/rabbitmq_agents
4 results
Show changes
Commits on Source (441)
Showing
with 1153 additions and 109 deletions
name: Linting
on: [push, pull_request]
jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Set up Python 3.6
uses: actions/setup-python@v2
with:
# Semantic version range syntax or exact version of a Python version
python-version: '3.11'
# Optional - x64 or x86 architecture, defaults to x64
architecture: 'x64'
- name: find trailing whitespace
uses: harupy/find-trailing-whitespace@master
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install flake8
#if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
#flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
flake8 . --count --max-line-length=79 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
#flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
include:
- template: Code-Quality.gitlab-ci.yml
code_quality:
tags:
- dind
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v3.2.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- repo: https://github.com/psf/black
rev: 23.3.0
hooks:
- id: black
- repo: https://github.com/pycqa/flake8
rev: 3.9.1
hooks:
- id: flake8
# rabbitmq_agents
This repo keeps different rabbitmq agents that help in account creation on OHPC system.
# RabbitMQ User Registration Agents
It has 2 branches ```develop``` and ```production``` , that houses agents based on where they are launched
---
## Using RCRMQ class
## Overview
This project automates user registration workflows at UAB using **RabbitMQ** to route tasks between systems like the web interface, CLI tools, databases, and email services. It ensures tasks like assigning user IDs, validating data, and sending notifications happen in sequence without manual intervention.
- First, rename `rabbit_config.py.example` to `rabbit_config.py`
---
- Modify config file, at least the `Password` needs to be your own passwod
## Key Components
### 1. **The `RegUsr` Exchange**
- **Type**: Topic exchange (routes messages using `routing_key` patterns).
- **Purpose**: Acts as the central hub for all registration-related messages.
- In your code:
### 2. **Core Scripts**
- **`self_reg_app` (Web UI)**: Starts the process by sending a `request<queuename>` message with user data.
- **`create_account.py` (CLI)**: Triggers backend tasks (e.g., UID/GID assignment, email subscriptions).
### 3. **Queues & Their Jobs**
| Queue Name | What It Does |
|--------------------------|-----------------------------------------------------------------------------|
| `get next uid gid` | Reserves a unique UID/GID for the user (uses SQLite to track IDs). |
| `subscribe mail list` | Adds the user’s email to mailing lists (e.g., department announcements). |
| `git commit` | Logs configuration changes to Git (e.g., new user added). |
| `notify user` | Sends emails/SMS to users (e.g., "Your account is ready"). |
| `task_manager` | Coordinates tasks like retrying failed steps or updating logs. |
### 4. **Data Flow**
1. A user submits details via the **Web UI** (`self_reg_app`).
2. A `request<queuename>` message is sent to `RegUsr` with fields:
```json
{ "username", "queuename", "email", "fullname", "reason" }
3. The system:
- Assigns UID/GID via SQLite (`get next uid gid` queue).
- Validates data with a `verify<queuename>` message.
- Sends a `completed<queuename>` message with success/failure status.
- Notifies the user and logs the event.
----------
## Setup & Usage
### Prerequisites
* Launch an alma9 instance on openstack, ssh into it and run the following commands
*Before proceeding, set these environment variables:*
```
export RABBITMQ_USER="reggie" # RabbitMQ admin username
export RABBITMQ_PASSWORD="secure_password" # RabbitMQ admin password
export CELERY_VHOST="adduser" # Celery-dedicated vhost
export CELERY_USER="celery_user" # Celery service username
export CELERY_PASSWORD="celery_pass" # Celery service password
```
### 1.1 Configure Package Repositories
# Import security keys
```
sudo rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
sudo rpm --import https://packagecloud.io/rabbitmq/erlang/gpgkey
sudo rpm --import https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
```
# Erlang Repository (AlmaLinux 8)
```
sudo tee /etc/yum.repos.d/rabbitmq_erlang.repo <<'EOL'
[rabbitmq_erlang]
name=RabbitMQ Erlang
baseurl=https://packagecloud.io/rabbitmq/erlang/el/8/$basearch
gpgcheck=1
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
enabled=1
EOL
```
# RabbitMQ Server Repository
```
sudo tee /etc/yum.repos.d/rabbitmq_server.repo <<'EOL'
[rabbitmq_server]
name=RabbitMQ Server
baseurl=https://packagecloud.io/rabbitmq/rabbitmq-server/el/8/$basearch
gpgcheck=1
gpgkey=https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
enabled=1
EOL
```
### 1.2 Install Packages
```
sudo yum install -y socat logrotate
sudo yum install -y erlang-25.3.2 rabbitmq-server-3.12.6
sudo systemctl enable --now rabbitmq-server
```
# import the class
from rc_rmq import RCRMQ
# instantiate an instance
rc_rmq = RCRMQ({'exchange': 'RegUsr'})
# publish a message to message queue
rc_rmq.publish_msg({
'routing_key': 'your_key',
'msg': {
'type': 'warning',
'content': 'this is warning'
}
})
# to consume message from a queue
# you have to first define callback function
# with parameters: channel, method, properties, body
def callback_function(ch, method, properties, body):
msg = json.loads(body)
print("get msg: {}".format(msg['username')
## 2. RabbitMQ Configuration
### 2.1 Create Admin User & Permissions
```
sudo rabbitmqctl add_user $RABBITMQ_USER $RABBITMQ_PASSWORD
sudo rabbitmqctl set_user_tags $RABBITMQ_USER administrator
sudo rabbitmqctl set_permissions -p / $RABBITMQ_USER ".*" ".*" ".*"
```
# this will stop the consumer
rc_rmq.stop_consumer()
# start consume messagre from queue with callback function
rc_rmq.start_consume({
'queue': 'queue_name',
'routing_key: 'your_key',
'cb': callback_function
})
### 2.2 Configure Celery Environment
# don't forget to close connection
rc_rmq.disconnect()
```
sudo rabbitmqctl add_vhost $CELERY_VHOST
sudo rabbitmqctl add_user $CELERY_USER $CELERY_PASSWORD
sudo rabbitmqctl set_permissions -p $CELERY_VHOST $CELERY_USER ".*" ".*" ".*"
```
## 3. Agent Service Setup
### 3.1 Deploy Agent Codebase
```
AGENT_DIR="/cm/shared/rabbitmq_agents"
sudo git clone https://gitlab.rc.uab.edu/rc/rabbitmq_agents.git $AGENT_DIR
cd $AGENT_DIR
sudo git checkout main # Or specific commit/tag
```
### 3.2 Configure Python Environment
```
sudo python3 -m venv $AGENT_DIR/venv
sudo $AGENT_DIR/venv/bin/pip install -r $AGENT_DIR/requirements.txt
```
### 3.3 Create Agent Configuration
Create `$AGENT_DIR/rabbit_config.py` with:
```
# rabbit_config.py
host = 'localhost' # Or your cluster host
port = 5672
vhost = '$CELERY_VHOST' # Use actual vhost name
user = '$CELERY_USER' # Celery username
password = '$CELERY_PASSWORD'
```
### 3.4 Systemd Service Setup for All Agents
OHPC Account Agent:
```
sudo tee /etc/systemd/system/ohpc_account_agent.service <<EOL
[Unit]
After=rabbitmq-server.service
[Service]
Type=simple
StartLimitInterval=0
Restart=on-failure
User=root
WorkingDirectory=/cm/shared/rabbitmq_agents/dev_rmq_agents
Environment="PYTHONPATH=/cm/shared/rabbitmq_agents/"
Environment="PATH=/cm/shared/rabbitmq_agents/dev_rmq_agents/venv/bin:/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin"
ExecStart=/cm/shared/rabbitmq_agents/venv/bin/python -u ohpc_account_create.py
[Install]
WantedBy=multi-user.target
EOL
```
OOD Account Agent:
```
sudo tee /etc/systemd/system/ood_account_agent.service <<EOL
[Unit]
After=rabbitmq-server.service
[Service]
Type=simple
StartLimitInterval=0
Restart=on-failure
User=root
WorkingDirectory=/cm/shared/rabbitmq_agents/dev_rmq_agents
Environment="PYTHONPATH=/cm/shared/rabbitmq_agents/"
Environment="PATH=/cm/shared/rabbitmq_agents/dev_rmq_agents/venv/bin:/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin"
ExecStart=/cm/shared/rabbitmq_agents/venv/bin/python -u ood_account_create.py
[Install]
WantedBy=multi-user.target
EOL
```
Slurm Agent:
```
sudo tee /etc/systemd/system/slurm_agent.service <<EOL
[Unit]
After=rabbitmq-server.service
[Service]
Type=simple
StartLimitInterval=0
Restart=on-failure
User=root
WorkingDirectory=/cm/shared/rabbitmq_agents/dev_rmq_agents
Environment="PYTHONPATH=/cm/shared/rabbitmq_agents/"
Environment="PATH=/cm/shared/rabbitmq_agents/dev_rmq_agents/venv/bin:/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin"
ExecStart=/cm/shared/rabbitmq_agents/venv/bin/python -u slurm_agent.py
[Install]
WantedBy=multi-user.target
EOL
```
## 4. Network Configuration
### Disable Firewall (Testing Only)
```
sudo systemctl stop firewalld
sudo systemctl disable firewalld
```
## 5. Validation & Testing
```
# Verify Services
systemctl is-active rabbitmq-server ood_account_agent
# Check RabbitMQ Users
sudo rabbitmqctl list_users
# Inspect Agent Logs
journalctl -u ood_account_agent -f --since "5m ago"
```
## Error Handling
- Failures (e.g., duplicate email) are reported in the `completed<queuename>` message’s `errmsg` field.
- The `user reg event logger` tracks all registration attempts. Check logs at `/var/log/user_reg.log`.
account-creation-flow.png

269 KiB

#!/usr/bin/env python3
import json
import argparse
import signal
import rc_util
from rc_rmq import RCRMQ
import rabbit_config as rcfg
parser = argparse.ArgumentParser(
description="Account management driver script"
)
parser.add_argument("username", help="Username that should be locked/unlocked")
parser.add_argument(
"state",
choices=["ok", "hold", "certification", "pre_certification"],
help="Choose from states (ok,hold,certification,pre_certification)",
)
parser.add_argument(
"-s",
"--service",
nargs="+",
default="all",
choices=["ssh", "newjobs", "expiration", "all"],
help="List one or more services to be blocked (default: %(default)s)",
)
parser.add_argument(
"-v", "--verbose", action="store_true", help="verbose output"
)
parser.add_argument(
"-n", "--dry-run", action="store_true", help="enable dry run mode"
)
args = parser.parse_args()
timeout = 60
queuename = rc_util.encode_name(args.username)
username = args.username
state = args.state
service = args.service
# Instantiate rabbitmq object
rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"})
msg = {}
msg["username"] = username
msg["state"] = state
msg["service"] = service
msg["queuename"] = queuename
msg["updated_by"], msg["host"] = rc_util.get_caller_info()
msg["interface"] = "CLI"
# publish msg with acctmgr.{uname} routing key.
rc_rmq.publish_msg(
{
"routing_key": f"acctmgr.request.{queuename}",
"msg": msg,
}
)
def timeout_handler(signum, frame):
print("Process timeout, there's some issue with agents")
rc_rmq.stop_consume()
def callback(ch, method, properties, body):
msg = json.loads(body)
username = msg["username"]
if msg["success"]:
print(
f"Account for {username} has been {msg['action']}ed.\n "
"Updating the user state in DB"
)
else:
print(
f"There's some issue in account management agents for {username}"
)
errmsg = msg.get("errmsg", [])
for err in errmsg:
print(err)
ch.basic_ack(delivery_tag=method.delivery_tag)
rc_rmq.stop_consume()
rc_rmq.delete_queue(queuename)
print(f"Request {username} account state set to {state}.")
# Set initial timeout timer
signal.signal(signal.SIGALRM, timeout_handler)
signal.setitimer(signal.ITIMER_REAL, timeout)
print("Waiting for completion...")
rc_rmq.start_consume(
{
"queue": queuename,
"routing_key": f"certified.{queuename}",
"cb": callback,
}
)
#!/usr/bin/env python
import sys
import json
from rc_rmq import RCRMQ
task = 'task_name'
task = "task_name"
# Instantiate rabbitmq object
rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'})
rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"})
# Define your callback function
def on_message(ch, method, properties, body):
# Retrieve routing key
routing_key = method.routing_key
print(routing_key)
# Retrieve message
msg = json.loads(body)
print(msg)
# Do Something
print('[{}]: Callback called.'.format(task))
print("[{}]: Callback called.".format(task))
# Acknowledge message
ch.basic_ack(delivery_tag=method.delivery_tag)
print("Start listening to queue: {}".format(task))
rc_rmq.start_consume({
'queue': task, # Define your Queue name
'routing_key': "#", # Define your routing key
'cb': on_message # Pass in callback function you just define
})
rc_rmq.start_consume(
{
"queue": task, # Define your Queue name
"routing_key": "#", # Define your routing key
"cb": on_message, # Pass in callback function you just define
}
)
#!/usr/bin/env python3
import sys
import json
import rc_util
import argparse
import signal
if len(sys.argv) < 2:
print("Usage: {} USERNAME [EMAIL] [FULL_NAME] [REASON]".format(sys.argv[0]), file=sys.stderr)
exit(1)
parser = argparse.ArgumentParser()
parser.add_argument("username", help="username that will be created")
parser.add_argument("email", nargs="?", default="", help="User's email")
parser.add_argument(
"full_name", nargs="?", default="", help="User's full name"
)
parser.add_argument(
"reason", nargs="?", default="", help="Reason of requesting"
)
parser.add_argument("--domain", default="localhost", help="domain of email")
parser.add_argument(
"-v", "--verbose", action="store_true", help="verbose output"
)
parser.add_argument(
"-n", "--dry-run", action="store_true", help="enable dry run mode"
)
args = parser.parse_args()
domain = 'uab.edu'
user_name = sys.argv[1]
email = sys.argv[2] if len(sys.argv) >= 3 else ''
full_name = sys.argv[3] if len(sys.argv) >= 4 else ''
reason = sys.argv[4] if len(sys.argv) >= 5 else ''
timeout = 60
if email == '':
if '@' in user_name:
email = user_name
queuename = rc_util.encode_name(args.username)
updated_by, host = rc_util.get_caller_info()
if args.email == "":
args.email = args.username
if "@" not in args.email:
args.email = args.username + "@" + args.domain
def timeout_handler(signum, frame):
print("Process timeout, there's might some issue with agents")
rc_util.rc_rmq.disconnect()
def callback(channel, method, properties, body):
msg = json.loads(body)
username = msg["username"]
if msg["success"]:
print(f"Account for {username} has been created.")
else:
email = user_name + '@' + domain
print(f"There's some issue while creating account for {username}")
errmsg = msg.get("errmsg", [])
for err in errmsg:
print(err)
rc_util.rc_rmq.disconnect()
rc_util.add_account(
args.username,
queuename=queuename,
email=args.email,
full=args.full_name,
reason=args.reason,
updated_by=updated_by,
host=host,
)
print(f"Account for {args.username} requested.")
rc_util.add_account(user_name, email=email, full=full_name, reason=reason)
print("Account requested for user: {}".format(user_name))
# Set initial timeout timer
signal.signal(signal.SIGALRM, timeout_handler)
signal.setitimer(signal.ITIMER_REAL, timeout)
print("Waiting for confirmation...")
rc_util.consume(user_name)
print("Waiting for completion...")
rc_util.consume(
queuename,
routing_key=f"complete.{queuename}",
exclusive=True,
callback=callback,
)
......@@ -8,44 +8,40 @@ from rc_rmq import RCRMQ
task = "ohpc_account"
# Instantiate rabbitmq object
rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'})
rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"})
def ohpc_account_create(ch, method, properties, body):
msg = json.loads(body)
print("Message received {}".format(msg))
username = msg['username']
username = msg["username"]
success = False
try:
subprocess.call(["sudo", "useradd", username])
print("[{}]: User {} has been added".format(task, username))
success = True
except:
except Exception:
e = sys.exc_info()[0]
print("[{}]: Error: {}".format(task, e))
ch.basic_ack(delivery_tag=method.delivery_tag)
msg['uid'] = getpwnam(username).pw_uid
msg['gid'] = getpwnam(username).pw_gid
msg["uid"] = getpwnam(username).pw_uid
msg["gid"] = getpwnam(username).pw_gid
# send confirm message
rc_rmq.publish_msg({
'routing_key': 'confirm.' + username,
'msg': {
'task': task,
'success': success
rc_rmq.publish_msg(
{
"routing_key": "confirm." + username,
"msg": {"task": task, "success": success},
}
})
)
if success:
# send create message to other agent
rc_rmq.publish_msg({
'routing_key': 'create.' + username,
'msg': msg
})
rc_rmq.publish_msg({"routing_key": "create." + username, "msg": msg})
print("Start Listening to queue: {}".format(task))
rc_rmq.start_consume({
'queue': task,
'routing_key': 'request.*',
'cb': ohpc_account_create
})
rc_rmq.start_consume(
{"queue": task, "routing_key": "request.*", "cb": ohpc_account_create}
)
......@@ -4,41 +4,42 @@ import json
import subprocess
from rc_rmq import RCRMQ
task = 'ood_account'
task = "ood_account"
# Instantiate rabbitmq object
rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'})
rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"})
def ood_account_create(ch, method, properties, body):
msg = json.loads(body)
print("Message received {}".format(msg))
username = msg['username']
user_uid = str(msg['uid'])
user_gid = str(msg['gid'])
username = msg["username"]
user_uid = str(msg["uid"])
user_gid = str(msg["gid"])
success = False
try:
subprocess.call(["sudo", "groupadd", "-r", "-g", user_gid, username])
subprocess.call(["sudo", "useradd", "-u", user_uid, "-g", user_gid, username])
subprocess.call(
["sudo", "useradd", "-u", user_uid, "-g", user_gid, username]
)
print("[{}]: User {} has been added".format(task, username))
success = True
except:
except Exception:
e = sys.exc_info()[0]
print("[{}]: Error: {}".format(task, e))
ch.basic_ack(delivery_tag=method.delivery_tag)
# send confirm message
rc_rmq.publish_msg({
'routing_key': 'confirm.' + username,
'msg': {
'task': task,
'success': success
rc_rmq.publish_msg(
{
"routing_key": "confirm." + username,
"msg": {"task": task, "success": success},
}
})
)
print("Start listening to queue: {}".format(task))
rc_rmq.start_consume({
'queue': task,
'routing_key': "create.*",
'cb': ood_account_create
})
rc_rmq.start_consume(
{"queue": task, "routing_key": "create.*", "cb": ood_account_create}
)
......@@ -4,39 +4,58 @@ import json
import subprocess
from rc_rmq import RCRMQ
task = 'slurm_account'
task = "slurm_account"
# Instantiate rabbitmq object
rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'})
rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"})
def slurm_account_create(ch, method, properties, body):
msg = json.loads(body)
print("Message received {}".format(msg))
username = msg['username']
username = msg["username"]
success = False
try:
subprocess.call(["sudo", "sacctmgr", "add", "account", username, "-i", "Descripition: Add user"])
subprocess.call(["sudo", "sacctmgr", "add", "user", username, "account=" + username, "-i"])
subprocess.call(
[
"sudo",
"sacctmgr",
"add",
"account",
username,
"-i",
"Descripition: Add user",
]
)
subprocess.call(
[
"sudo",
"sacctmgr",
"add",
"user",
username,
"account=" + username,
"-i",
]
)
print("SLURM account for user {} has been added".format(username))
success = True
except:
except Exception:
e = sys.exc_info()[0]
print("[{}]: Error: {}".format(task, e))
ch.basic_ack(delivery_tag=method.delivery_tag)
# send confirm message
rc_rmq.publish_msg({
'routing_key': 'confirm.' + username,
'msg': {
'task': task,
'success': success
rc_rmq.publish_msg(
{
"routing_key": "confirm." + username,
"msg": {"task": task, "success": success},
}
})
)
print("Start listening to queue: {}".format(task))
rc_rmq.start_consume({
'queue': task,
'routing_key': "create.*",
'cb': slurm_account_create
})
rc_rmq.start_consume(
{"queue": task, "routing_key": "create.*", "cb": slurm_account_create}
)
......@@ -15,7 +15,7 @@ user_name = sys.argv[2]
message = {
"username": user_name,
"fullname": "Full Name",
"reason": "Reason1, Reason2."
"reason": "Reason1, Reason2.",
}
hostname = socket.gethostname().split(".", 1)[0]
......@@ -23,33 +23,39 @@ connect_host = rcfg.Server if hostname != rcfg.Server else "localhost"
# Set up credentials to connect to RabbitMQ server
credentials = pika.PlainCredentials(rcfg.User, rcfg.Password)
parameters = pika.ConnectionParameters(connect_host,
rcfg.Port,
rcfg.VHost,
credentials)
parameters = pika.ConnectionParameters(
connect_host, rcfg.Port, rcfg.VHost, credentials
)
# Establish connection to RabbitMQ server
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.exchange_declare(exchange=rcfg.Exchange, exchange_type='direct')
channel.exchange_declare(exchange=rcfg.Exchange, exchange_type="direct")
channel.basic_publish(exchange=rcfg.Exchange, routing_key=node, body=json.dumps(message))
channel.basic_publish(
exchange=rcfg.Exchange, routing_key=node, body=json.dumps(message)
)
print(" [x] Sent {}: {}".format(node, json.dumps(message)))
# creates a named queue
result = channel.queue_declare(queue=user_name, exclusive=False, durable=True)
# bind the queue with exchange
channel.queue_bind(exchange=rcfg.Exchange, queue=user_name, routing_key=user_name)
channel.queue_bind(
exchange=rcfg.Exchange, queue=user_name, routing_key=user_name
)
def work(ch, method, properties, body):
msg = json.loads(body)
print("Received message from {}: \n\t{}".format(method.routing_key, msg))
#queue_unbind(queue, exchange=None, routing_key=None, arguments=None, callback=None)
channel.queue_delete(method.routing_key)
# ingest messages, and assume delivered via auto_ack
channel.basic_consume(queue=sys.argv[2], on_message_callback=work, auto_ack=True)
channel.basic_consume(
queue=sys.argv[2], on_message_callback=work, auto_ack=True
)
print("Subscribing to queue: {}".format(sys.argv[2]))
# initiate message ingestion
......
#!/usr/bin/env python3
import argparse
import grp
import json
import pika
import pwd
import rabbit_config as rcfg
import sys
import uuid
from rc_rmq import RCRMQ
from rc_util import get_caller_info, timeout
# Instantiate rabbitmq object
rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"})
def user_exists(username):
try:
pwd.getpwnam(username)
except KeyError:
return False
return True
def group_exists(groupname):
try:
grp.getgrnam(groupname)
except KeyError:
return False
return True
@timeout(rcfg.Function_timeout)
def manage_group(op, usernames, groupname, debug=False):
callback_queue = rc_rmq.bind_queue(exclusive=True)
rpc_queue = f"group_member.{op}"
corr_id = str(uuid.uuid4())
status = dict.fromkeys(usernames, False)
response = 0
interface = "CLI"
updated_by, host = get_caller_info()
def handler(ch, method, properties, body):
if debug:
print("Message received:")
print(body)
nonlocal corr_id
nonlocal status
nonlocal response
msg = json.loads(body)
username = msg["username"]
if properties.correlation_id == corr_id:
status[username] = msg["success"]
response += 1
if not msg["success"]:
print(f"{username}: Something's wrong, please try again.")
if len(status) == response:
rc_rmq.stop_consume()
rc_rmq.disconnect()
if debug:
print(f"Adding user(s) {', '.join(usernames)} to group {groupname}")
print(f"Callback queue: {callback_queue}, correlation_id: {corr_id}")
for user in usernames:
rc_rmq.publish_msg(
{
"routing_key": rpc_queue,
"props": pika.BasicProperties(
correlation_id=corr_id, reply_to=callback_queue
),
"msg": {
"groups": {f"{op}": [f"{groupname}"]},
"username": user,
"host": host,
"updated_by": updated_by,
"interface": interface,
},
}
)
rc_rmq.start_consume(
{
"queue": callback_queue,
"exclusive": True,
"bind": False,
"cb": handler,
}
)
print("Done")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Group management script")
parser.add_argument("--debug", action="store_true", help="Debug mode")
parser.add_argument(
"-d",
"--delete",
action="store_true",
help="Delete the user(s) from the group",
)
parser.add_argument(
"-g", "--group", required=True, help="The Group to add the user(s)"
)
parser.add_argument(
"users",
metavar="USER",
nargs="+",
help="User(s) to be add to the group",
)
args = parser.parse_args()
exist_users = []
miss = False
# Check if all of the users exist
for user in args.users:
if not user_exists(user):
print(f"{user} does not exist.", file=sys.stderr)
miss = True
else:
exist_users.append(user)
# Check if the group exists
if not group_exists(args.group):
print(f"{args.group} does not exist.", file=sys.stderr)
miss = True
if miss:
print("A user and/or group does not exist.", file=sys.stderr)
print("Abort.", file=sys.stderr)
exit(1)
elif exist_users:
op = "remove" if args.delete else "add"
manage_group(op, exist_users, args.group, args.debug)
else:
print("No user to change", file=sys.stderr)
print("Abort.", file=sys.stderr)
sys.exit(1)
#!/usr/bin/env python3
import argparse
import dataset
import sys
import subprocess
import rabbit_config as rcfg
import rc_util
from datetime import datetime
parser = argparse.ArgumentParser()
parser.add_argument("-f", "--force", action="store_true", help="force update")
parser.add_argument(
"--dry-run", action="store_true", help="enable dry run mode"
)
args = parser.parse_args()
default_state = "ok"
today = datetime.now()
updated_by, host = rc_util.get_caller_info()
# Chunk size for insert into db
size = 1000
db = dataset.connect(f"sqlite:///{rcfg.db_path}/user_reg.db")
table = db["user_state"]
if table.__len__() > 0 and not args.force:
print("table user_state not empty, abort.")
sys.exit()
# Getting user list
users = subprocess.run(
["ls", "/data/user"], stdout=subprocess.PIPE, encoding="UTF-8"
).stdout.split()
# Update user_state table
# Insert many
if len(users) > 50:
start = 0
while start < len(users):
end = start + size if start + size < len(users) else len(users)
data = [
dict(
username=user,
state=default_state,
date=today,
updated_by=updated_by,
host=host,
)
for user in users[start:end]
]
if args.dry_run:
print(f"Table insert many from {start} to {end - 1}")
else:
table.insert_many(data, chunk_size=size)
start = end
# Insert one by one
else:
for user in users:
if args.dry_run:
print(f"Table insert user: {user}, state: {default_state}")
else:
table.insert(
{
"username": user,
"state": default_state,
"date": today,
"updated_by": updated_by,
"host": host,
}
)
import rabbit_config as rcfg
Head = f"""From: {rcfg.Sender_alias} <{rcfg.Sender}>
To: <{{{{ to }}}}>
Subject: {rcfg.Subject}
"""
Body = f"""
Hi {{{{ username }}}}
Your account has been set up with:
============================
User ID: {{{{ username }}}}
============================
If you have any questions, please visit:
{rcfg.Info_url}
or email at {rcfg.Admin_email}
Cheers,
"""
Whole_mail = Head + Body
UserReportHead = f"""From: {rcfg.Sender_alias} <{rcfg.Sender}>
To: <{rcfg.Admin_email}>
Subject: [{{{{ result }}}}]RC Account Creation Report: {{{{ fullname }}}}, {{{{ username }}}} """
BEGIN TRANSACTION;
-- Create new table with updated_by column
CREATE TABLE groups_temp (
id INTEGER PRIMARY KEY,
user TEXT,
"group" TEXT,
operation INTEGER,
date DATETIME,
host TEXT,
updated_by TEXT,
interface TEXT
);
-- Copy all entries from old table
INSERT INTO groups_temp(user,"group",operation,date,host,updated_by,interface)
SELECT user,"group",operation,date,host,executed_by,interface
FROM groups;
-- Drop old table
DROP TABLE groups;
-- Rename new table
ALTER TABLE groups_temp
RENAME TO groups;
COMMIT;
#!/usr/bin/env python3
import json
import rc_util
import argparse
import signal
import uuid
import pika
import rc_util
from rc_rmq import RCRMQ
import rabbit_config as rcfg
import time
task = "acctmgr"
# Instantiate rabbitmq object
rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"})
tracking = {}
def manage_acct(ch, method, properties, body):
msg = json.loads(body)
op = method.routing_key.split(".")[1]
username = msg["username"]
state = msg["state"]
service = msg["service"]
queuename = msg["queuename"]
try:
if username in tracking:
current = tracking[username]
else:
current = tracking[username] = {}
if op == 'request':
if state == 'hold' or state == 'certification':
msg["action"] = "lock"
elif state == 'ok' or state == 'pre_certification':
msg["action"] = "unlock"
else:
print("Invalid state provided. Check the help menu.")
if service == 'all':
current["new_jobs"] = None
current["expire_account"] = None
current["ssh_access"] = None
# send a broadcast message to all agents
rc_rmq.publish_msg(
{
"routing_key": f"{msg['action']}.{queuename}",
"msg": msg,
}
)
else:
for each_service in service:
current[each_service] = None
rc_rmq.publish_msg(
{
"routing_key": f"{each_service}.{queuename}",
"msg": msg
}
)
elif op == 'done':
# Check if each task/agent returned success
current[msg["task"]] = msg["success"]
done = True
for task in current.keys():
if current[task] is None:
done = False
if done:
rc_util.update_state(
username, state, msg.get("updated_by"), msg.get("host")
)
# Send done msg to account_manager.py
rc_rmq.publish_msg(
{
"routing_key": f'certified.{queuename}',
"msg": msg,
}
)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception:
msg["errmsg"] = "Exception raised in account manager workflow agent , check the logs for stack trace"
logger.error("", exc_info=True)
rc_rmq.bind_queue(queue=task, routing_key='acctmgr.request.*', durable=True)
rc_rmq.bind_queue(queue=task, routing_key='acctmgr.done.*', durable=True)
print("Waiting for completion...")
rc_rmq.start_consume(
{"queue": task, "cb": manage_acct}
)
#!/usr/bin/env python
import os
import json
import rc_util
from pathlib import Path
from rc_rmq import RCRMQ
import rabbit_config as rcfg
task = "dir_verify"
dirs = rcfg.User_dirs
args = rc_util.get_args()
logger = rc_util.get_logger(args)
# Instantiate rabbitmq object
rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"})
def dir_verify(ch, method, properties, body):
msg = json.loads(body)
username = msg["username"]
msg["task"] = task
msg["success"] = True
missing_dirs = []
try:
for d in dirs:
path = Path(d) / msg["username"]
if args.dry_run:
logger.info(f"Checking dirs: {path}")
else:
if not path.exists():
# check if dirs exist and record any missing dirs
missing_dirs.append(path)
msg["success"] = False
msg["errmsg"] = f"Error: missing dirs {missing_dirs}"
logger.info(f"{path} does not exist")
else:
# check existing dirs for correct ownership and permissions
status = os.stat(path)
mask = oct(status.st_mode)[-3:]
uid = str(status.st_uid)
gid = str(status.st_gid)
if mask != "700" or uid != msg["uid"] or gid != msg["gid"]:
msg["success"] = False
msg["errmsg"] = (
f"Error: dir {path} permissions or ownership are"
" wrong"
)
except Exception:
msg["success"] = False
msg["errmsg"] = "Exception raised, check the logs for stack trace"
logger.error("", exc_info=True)
# send confirm message
rc_rmq.publish_msg(
{"routing_key": "confirm." + msg["queuename"], "msg": msg}
)
logger.debug(f"User {username} confirmation sent")
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.info(f"Start listening to queue: {task}")
rc_rmq.start_consume(
{"queue": task, "routing_key": "verify.*", "cb": dir_verify}
)
logger.info("Disconnected")
rc_rmq.disconnect()
#!/bin/sh
username="$1"
path_to_db="/cm/shared/rabbitmq_agents/prod_rmq_agents/.agent_db/user_reg.db"
usage() {
echo "Usage: $0 USERNAME"
}
if [[ "$EUID" -ne 0 ]]; then
echo "This script must be run as root!"
exit 1
fi
if [ -z "$username" ]; then
usage
exit 1
fi
if id "$username" &>/dev/null; then
echo "Deleting user: ${username}"
echo "Clean PUN process on loginnode"
ssh login001 "/opt/ood/nginx_stage/sbin/nginx_stage nginx_clean --force --user $username"
echo "Remove user via cmsh"
cmsh -c "user use ${username}; remove -d; commit;"
echo "Remove user from sqlite db users table"
sqlite3 $path_to_db "delete from users where username=\"$username\""
echo "Remove user from sqlite db user_state table"
sqlite3 $path_to_db "delete from user_state where username=\"$username\""
echo "Remove user from sacctmgr"
sacctmgr -i delete user $username
sacctmgr -i delete account $username
echo "Remove /data/user"
rm -rf "/data/user/${username}"
echo "Remove /data/scratch"
rm -rf "/data/scratch/${username}"
else
echo "user: ${username} not found."
exit 1
fi
#!/usr/bin/env python
import os
import json
import pika
import rc_util
from os import popen
from pathlib import Path
from rc_rmq import RCRMQ
import rabbit_config as rcfg
from datetime import date, timedelta
task = "expire_account"
args = rc_util.get_args()
logger = rc_util.get_logger(args)
# Instantiate rabbitmq object
rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"})
def expire_account(ch, method, properties, body):
msg = json.loads(body)
username = msg["username"]
action = msg["action"]
msg["task"] = task
queuename = msg["queuename"]
yesterday = date.today() - timedelta(days=1)
try:
expire_account_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "user;use {username}; set expirationdate {yesterday}; commit;"'
unexpire_account_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "user;use {username}; set expirationdate 2037/12/31; commit;"'
if action == 'lock':
block_ssh = popen(expire_account_cmd).read().rstrip()
elif action == 'unlock':
unblock_ssh = popen(unexpire_account_cmd).read().rstrip()
msg["success"] = True
logger.info(f"ssh expiration set to yesterday for user {username}")
except Exception:
msg["success"] = False
msg["errmsg"] = "Exception raised, while expiring user's ssh access, check the logs for stack trace"
logger.error("", exc_info=True)
# send response to callback queue with it's correlation ID
rc_rmq.publish_msg(
{"routing_key": f'acctmgr.done.{queuename}',
"msg": msg}
)
logger.debug(f"User {username} confirmation sent for {action}ing {task}")
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.info(f"Start listening to queue: {task}")
rc_rmq.bind_queue(queue=task, routing_key='lock.*', durable=True)
rc_rmq.bind_queue(queue=task, routing_key='unlock.*', durable=True)
rc_rmq.bind_queue(queue=task, routing_key='expiration.*', durable=True)
rc_rmq.start_consume(
{"queue": task, "cb": expire_account}
)
logger.info("Disconnected")
rc_rmq.disconnect()