Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • rc/rabbitmq_agents
  • louistw/rabbitmq_agents
  • krish94/rabbitmq_agents
  • dwheel7/rabbitmq_agents
4 results
Show changes
Commits on Source (74)
Showing
with 710 additions and 115 deletions
......@@ -14,7 +14,7 @@ jobs:
uses: actions/setup-python@v2
with:
# Semantic version range syntax or exact version of a Python version
python-version: '3.6'
python-version: '3.11'
# Optional - x64 or x86 architecture, defaults to x64
architecture: 'x64'
- name: find trailing whitespace
......
include:
- template: Code-Quality.gitlab-ci.yml
code_quality:
tags:
- dind
......@@ -6,7 +6,7 @@ repos:
- id: end-of-file-fixer
- id: check-yaml
- repo: https://github.com/psf/black
rev: 21.5b0
rev: 23.3.0
hooks:
- id: black
- repo: https://github.com/pycqa/flake8
......
# rabbitmq_agents
This repo keeps different rabbitmq agents that help in account creation on OHPC system.
# RabbitMQ User Registration Agents
It has 2 branches ```develop``` and ```production``` , that houses agents based on where they are launched
---
## Using RCRMQ class
## Overview
This project automates user registration workflows at UAB using **RabbitMQ** to route tasks between systems like the web interface, CLI tools, databases, and email services. It ensures tasks like assigning user IDs, validating data, and sending notifications happen in sequence without manual intervention.
- First, rename `rabbit_config.py.example` to `rabbit_config.py`
---
- Modify config file, at least the `Password` needs to be your own passwod
## Key Components
### 1. **The `RegUsr` Exchange**
- **Type**: Topic exchange (routes messages using `routing_key` patterns).
- **Purpose**: Acts as the central hub for all registration-related messages.
- In your code:
### 2. **Core Scripts**
- **`self_reg_app` (Web UI)**: Starts the process by sending a `request<queuename>` message with user data.
- **`create_account.py` (CLI)**: Triggers backend tasks (e.g., UID/GID assignment, email subscriptions).
### 3. **Queues & Their Jobs**
| Queue Name | What It Does |
|--------------------------|-----------------------------------------------------------------------------|
| `get next uid gid` | Reserves a unique UID/GID for the user (uses SQLite to track IDs). |
| `subscribe mail list` | Adds the user’s email to mailing lists (e.g., department announcements). |
| `git commit` | Logs configuration changes to Git (e.g., new user added). |
| `notify user` | Sends emails/SMS to users (e.g., "Your account is ready"). |
| `task_manager` | Coordinates tasks like retrying failed steps or updating logs. |
### 4. **Data Flow**
1. A user submits details via the **Web UI** (`self_reg_app`).
2. A `request<queuename>` message is sent to `RegUsr` with fields:
```json
{ "username", "queuename", "email", "fullname", "reason" }
3. The system:
- Assigns UID/GID via SQLite (`get next uid gid` queue).
- Validates data with a `verify<queuename>` message.
- Sends a `completed<queuename>` message with success/failure status.
- Notifies the user and logs the event.
----------
## Setup & Usage
### Prerequisites
* Launch an alma9 instance on openstack, ssh into it and run the following commands
*Before proceeding, set these environment variables:*
```
export RABBITMQ_USER="reggie" # RabbitMQ admin username
export RABBITMQ_PASSWORD="secure_password" # RabbitMQ admin password
export CELERY_VHOST="adduser" # Celery-dedicated vhost
export CELERY_USER="celery_user" # Celery service username
export CELERY_PASSWORD="celery_pass" # Celery service password
```
### 1.1 Configure Package Repositories
# Import security keys
```
sudo rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
sudo rpm --import https://packagecloud.io/rabbitmq/erlang/gpgkey
sudo rpm --import https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
```
# Erlang Repository (AlmaLinux 8)
```
sudo tee /etc/yum.repos.d/rabbitmq_erlang.repo <<'EOL'
[rabbitmq_erlang]
name=RabbitMQ Erlang
baseurl=https://packagecloud.io/rabbitmq/erlang/el/8/$basearch
gpgcheck=1
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
enabled=1
EOL
```
# RabbitMQ Server Repository
```
sudo tee /etc/yum.repos.d/rabbitmq_server.repo <<'EOL'
[rabbitmq_server]
name=RabbitMQ Server
baseurl=https://packagecloud.io/rabbitmq/rabbitmq-server/el/8/$basearch
gpgcheck=1
gpgkey=https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
enabled=1
EOL
```
### 1.2 Install Packages
```
sudo yum install -y socat logrotate
sudo yum install -y erlang-25.3.2 rabbitmq-server-3.12.6
sudo systemctl enable --now rabbitmq-server
```
# import the class
from rc_rmq import RCRMQ
# instantiate an instance
rc_rmq = RCRMQ({'exchange': 'RegUsr'})
# publish a message to message queue
rc_rmq.publish_msg({
'routing_key': 'your_key',
'msg': {
'type': 'warning',
'content': 'this is warning'
}
})
# to consume message from a queue
# you have to first define callback function
# with parameters: channel, method, properties, body
def callback_function(ch, method, properties, body):
msg = json.loads(body)
print("get msg: {}".format(msg['username')
## 2. RabbitMQ Configuration
### 2.1 Create Admin User & Permissions
```
sudo rabbitmqctl add_user $RABBITMQ_USER $RABBITMQ_PASSWORD
sudo rabbitmqctl set_user_tags $RABBITMQ_USER administrator
sudo rabbitmqctl set_permissions -p / $RABBITMQ_USER ".*" ".*" ".*"
```
# this will stop the consumer
rc_rmq.stop_consumer()
# start consume messagre from queue with callback function
rc_rmq.start_consume({
'queue': 'queue_name',
'routing_key: 'your_key',
'cb': callback_function
})
### 2.2 Configure Celery Environment
# don't forget to close connection
rc_rmq.disconnect()
```
sudo rabbitmqctl add_vhost $CELERY_VHOST
sudo rabbitmqctl add_user $CELERY_USER $CELERY_PASSWORD
sudo rabbitmqctl set_permissions -p $CELERY_VHOST $CELERY_USER ".*" ".*" ".*"
```
## 3. Agent Service Setup
### 3.1 Deploy Agent Codebase
```
AGENT_DIR="/cm/shared/rabbitmq_agents"
sudo git clone https://gitlab.rc.uab.edu/rc/rabbitmq_agents.git $AGENT_DIR
cd $AGENT_DIR
sudo git checkout main # Or specific commit/tag
```
### 3.2 Configure Python Environment
```
sudo python3 -m venv $AGENT_DIR/venv
sudo $AGENT_DIR/venv/bin/pip install -r $AGENT_DIR/requirements.txt
```
### 3.3 Create Agent Configuration
Create `$AGENT_DIR/rabbit_config.py` with:
```
# rabbit_config.py
host = 'localhost' # Or your cluster host
port = 5672
vhost = '$CELERY_VHOST' # Use actual vhost name
user = '$CELERY_USER' # Celery username
password = '$CELERY_PASSWORD'
```
### 3.4 Systemd Service Setup for All Agents
OHPC Account Agent:
```
sudo tee /etc/systemd/system/ohpc_account_agent.service <<EOL
[Unit]
After=rabbitmq-server.service
[Service]
Type=simple
StartLimitInterval=0
Restart=on-failure
User=root
WorkingDirectory=/cm/shared/rabbitmq_agents/dev_rmq_agents
Environment="PYTHONPATH=/cm/shared/rabbitmq_agents/"
Environment="PATH=/cm/shared/rabbitmq_agents/dev_rmq_agents/venv/bin:/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin"
ExecStart=/cm/shared/rabbitmq_agents/venv/bin/python -u ohpc_account_create.py
[Install]
WantedBy=multi-user.target
EOL
```
OOD Account Agent:
```
sudo tee /etc/systemd/system/ood_account_agent.service <<EOL
[Unit]
After=rabbitmq-server.service
[Service]
Type=simple
StartLimitInterval=0
Restart=on-failure
User=root
WorkingDirectory=/cm/shared/rabbitmq_agents/dev_rmq_agents
Environment="PYTHONPATH=/cm/shared/rabbitmq_agents/"
Environment="PATH=/cm/shared/rabbitmq_agents/dev_rmq_agents/venv/bin:/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin"
ExecStart=/cm/shared/rabbitmq_agents/venv/bin/python -u ood_account_create.py
[Install]
WantedBy=multi-user.target
EOL
```
Slurm Agent:
```
sudo tee /etc/systemd/system/slurm_agent.service <<EOL
[Unit]
After=rabbitmq-server.service
[Service]
Type=simple
StartLimitInterval=0
Restart=on-failure
User=root
WorkingDirectory=/cm/shared/rabbitmq_agents/dev_rmq_agents
Environment="PYTHONPATH=/cm/shared/rabbitmq_agents/"
Environment="PATH=/cm/shared/rabbitmq_agents/dev_rmq_agents/venv/bin:/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin"
ExecStart=/cm/shared/rabbitmq_agents/venv/bin/python -u slurm_agent.py
[Install]
WantedBy=multi-user.target
EOL
```
## 4. Network Configuration
### Disable Firewall (Testing Only)
```
sudo systemctl stop firewalld
sudo systemctl disable firewalld
```
## 5. Validation & Testing
```
# Verify Services
systemctl is-active rabbitmq-server ood_account_agent
# Check RabbitMQ Users
sudo rabbitmqctl list_users
# Inspect Agent Logs
journalctl -u ood_account_agent -f --since "5m ago"
```
## Error Handling
- Failures (e.g., duplicate email) are reported in the `completed<queuename>` message’s `errmsg` field.
- The `user reg event logger` tracks all registration attempts. Check logs at `/var/log/user_reg.log`.
account-creation-flow.png

269 KiB

#!/usr/bin/env python3
import json
import rc_util
import argparse
import signal
import uuid
import pika
import rc_util
from rc_rmq import RCRMQ
import rabbit_config as rcfg
import time
parser = argparse.ArgumentParser(description = "Account management driver script")
parser.add_argument(
"username", help="Username that should be locked/unlocked")
parser = argparse.ArgumentParser(
description="Account management driver script"
)
parser.add_argument("username", help="Username that should be locked/unlocked")
parser.add_argument(
"state", choices=['ok', 'hold', 'certification', 'pre_certification'], help="Choose from states (ok,hold,certification,pre_certification)")
"state",
choices=["ok", "hold", "certification", "pre_certification"],
help="Choose from states (ok,hold,certification,pre_certification)",
)
parser.add_argument(
"-s", "--service", nargs='+', default='all', choices=['ssh', 'newjobs', 'expiration', 'all'], help="List one or more services to be blocked (default: %(default)s)")
"-s",
"--service",
nargs="+",
default="all",
choices=["ssh", "newjobs", "expiration", "all"],
help="List one or more services to be blocked (default: %(default)s)",
)
parser.add_argument(
"-v", "--verbose", action="store_true", help="verbose output")
"-v", "--verbose", action="store_true", help="verbose output"
)
parser.add_argument(
"-n", "--dry-run", action="store_true", help="enable dry run mode"
)
......@@ -41,11 +48,12 @@ msg["state"] = state
msg["service"] = service
msg["queuename"] = queuename
msg["updated_by"], msg["host"] = rc_util.get_caller_info()
msg["interface"] = "CLI"
# publish msg with acctmgr.{uname} routing key.
rc_rmq.publish_msg(
{
"routing_key": f'acctmgr.request.{queuename}',
"routing_key": f"acctmgr.request.{queuename}",
"msg": msg,
}
)
......@@ -61,18 +69,23 @@ def callback(ch, method, properties, body):
username = msg["username"]
if msg["success"]:
print(f"Account for {username} has been {msg['action']}ed.\n Updating the user state in DB")
print(
f"Account for {username} has been {msg['action']}ed.\n "
"Updating the user state in DB"
)
else:
print(f"There's some issue in account management agents for {username}")
print(
f"There's some issue in account management agents for {username}"
)
errmsg = msg.get("errmsg", [])
for err in errmsg:
print(err)
ch.basic_ack(delivery_tag=method.delivery_tag)
rc_rmq.stop_consume()
rc_rmq.delete_queue(queuename)
print(f"Request {username} account state set to {state}.")
# Set initial timeout timer
......@@ -83,7 +96,7 @@ print("Waiting for completion...")
rc_rmq.start_consume(
{
"queue": queuename,
"routing_key": f'certified.{queuename}',
"routing_key": f"certified.{queuename}",
"cb": callback,
}
)
#!/usr/bin/env python3
import argparse
import grp
import json
import pika
import pwd
import rabbit_config as rcfg
import sys
import uuid
from rc_rmq import RCRMQ
from rc_util import get_caller_info, timeout
# Instantiate rabbitmq object
rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"})
def user_exists(username):
try:
pwd.getpwnam(username)
except KeyError:
return False
return True
def group_exists(groupname):
try:
grp.getgrnam(groupname)
except KeyError:
return False
return True
@timeout(rcfg.Function_timeout)
def manage_group(op, usernames, groupname, debug=False):
callback_queue = rc_rmq.bind_queue(exclusive=True)
rpc_queue = f"group_member.{op}"
corr_id = str(uuid.uuid4())
status = dict.fromkeys(usernames, False)
response = 0
interface = "CLI"
updated_by, host = get_caller_info()
def handler(ch, method, properties, body):
if debug:
print("Message received:")
print(body)
nonlocal corr_id
nonlocal status
nonlocal response
msg = json.loads(body)
username = msg["username"]
if properties.correlation_id == corr_id:
status[username] = msg["success"]
response += 1
if not msg["success"]:
print(f"{username}: Something's wrong, please try again.")
if len(status) == response:
rc_rmq.stop_consume()
rc_rmq.disconnect()
if debug:
print(f"Adding user(s) {', '.join(usernames)} to group {groupname}")
print(f"Callback queue: {callback_queue}, correlation_id: {corr_id}")
for user in usernames:
rc_rmq.publish_msg(
{
"routing_key": rpc_queue,
"props": pika.BasicProperties(
correlation_id=corr_id, reply_to=callback_queue
),
"msg": {
"groups": {f"{op}": [f"{groupname}"]},
"username": user,
"host": host,
"updated_by": updated_by,
"interface": interface,
},
}
)
rc_rmq.start_consume(
{
"queue": callback_queue,
"exclusive": True,
"bind": False,
"cb": handler,
}
)
print("Done")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Group management script")
parser.add_argument("--debug", action="store_true", help="Debug mode")
parser.add_argument(
"-d",
"--delete",
action="store_true",
help="Delete the user(s) from the group",
)
parser.add_argument(
"-g", "--group", required=True, help="The Group to add the user(s)"
)
parser.add_argument(
"users",
metavar="USER",
nargs="+",
help="User(s) to be add to the group",
)
args = parser.parse_args()
exist_users = []
miss = False
# Check if all of the users exist
for user in args.users:
if not user_exists(user):
print(f"{user} does not exist.", file=sys.stderr)
miss = True
else:
exist_users.append(user)
# Check if the group exists
if not group_exists(args.group):
print(f"{args.group} does not exist.", file=sys.stderr)
miss = True
if miss:
print("A user and/or group does not exist.", file=sys.stderr)
print("Abort.", file=sys.stderr)
exit(1)
elif exist_users:
op = "remove" if args.delete else "add"
manage_group(op, exist_users, args.group, args.debug)
else:
print("No user to change", file=sys.stderr)
print("Abort.", file=sys.stderr)
sys.exit(1)
BEGIN TRANSACTION;
-- Create new table with updated_by column
CREATE TABLE groups_temp (
id INTEGER PRIMARY KEY,
user TEXT,
"group" TEXT,
operation INTEGER,
date DATETIME,
host TEXT,
updated_by TEXT,
interface TEXT
);
-- Copy all entries from old table
INSERT INTO groups_temp(user,"group",operation,date,host,updated_by,interface)
SELECT user,"group",operation,date,host,executed_by,interface
FROM groups;
-- Drop old table
DROP TABLE groups;
-- Rename new table
ALTER TABLE groups_temp
RENAME TO groups;
COMMIT;
#!/usr/bin/env python
import os
import dataset
import json
import pika
import shlex
import rc_util
from subprocess import Popen,PIPE
from pathlib import Path
from datetime import datetime
from subprocess import Popen, PIPE
from rc_rmq import RCRMQ
import rabbit_config as rcfg
......@@ -14,85 +14,142 @@ task = "group_member"
args = rc_util.get_args()
logger = rc_util.get_logger(args)
# Initialize db
db = dataset.connect(f"sqlite:///{rcfg.db_path}/user_reg.db")
table = db["groups"]
# Instantiate rabbitmq object
rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"})
def insert_db(operation, groupname, msg):
if operation == "remove":
op = 0
elif operation == "add":
op = 1
# SQL insert
table.insert(
{
"user": msg["username"],
"group": groupname,
"operation": op,
"date": datetime.now(),
"host": msg["host"],
"updated_by": msg["updated_by"],
"interface": msg.get("interface", ""),
}
)
def group_member(ch, method, properties, body):
"""
Properties:
correlation_id (str): The UUID for the request.
reply_to (str): The RabbitMQ queue name for reply to send to.
Message(body):
username (str): The user to be added/removed from groups.
groups (dict): A dictionary with `add` or `remove` key.
add (list): A list of groups to be added for the user.
remove (list): A list of groups to be removed for the user.
updated_by (str): The user who request the change.
host (str): Hostname where the request comes from.
interface (str): whether it's from CLI or WebUI.
Returns:
status (bool): Whether or not the operation executed successfully.
errmsg (str): Detailed error message if operation failed.
task (str): The task name of the agent who handle the message.
"""
msg = json.loads(body)
username = msg["username"]
action = msg["action"]
msg["task"] = task
state = msg["state"]
try:
if 'remove' in msg["groups"]:
if "remove" in msg["groups"]:
for each_group in msg["groups"]["remove"]:
logger.debug(f'Removing user {username} from group {each_group}')
logger.debug(
f"Removing user {username} from group {each_group}"
)
if str(rcfg.bright_cm_version).split(".")[0] == "8":
grp_remove_user_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "group; removefrom {each_group} groupmembers {username}; commit;"'
grp_remove_user_cmd = (
'/cm/local/apps/cmd/bin/cmsh -n -c "group; removefrom'
f' {each_group} groupmembers {username}; commit;"'
)
else:
grp_remove_user_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "group; removefrom {each_group} members {username}; commit;"'
logger.info(f'Running command: {grp_remove_user_cmd}')
proc = Popen(shlex.split(grp_remove_user_cmd), stdout=PIPE, stderr=PIPE)
out,err = proc.communicate()
logger.debug(f'Result: {err}')
logger.info(f'User {username} is removed from {each_group} group')
if 'add' in msg["groups"]:
grp_remove_user_cmd = (
'/cm/local/apps/cmd/bin/cmsh -n -c "group; removefrom'
f' {each_group} members {username}; commit;"'
)
logger.info(f"Running command: {grp_remove_user_cmd}")
proc = Popen(
shlex.split(grp_remove_user_cmd), stdout=PIPE, stderr=PIPE
)
out, err = proc.communicate()
logger.debug(f"Result: {err}")
logger.info(
f"User {username} is removed from {each_group} group"
)
insert_db("remove", each_group, msg)
if "add" in msg["groups"]:
for each_group in msg["groups"]["add"]:
logger.debug(f'Adding user {username} to group {each_group}')
logger.debug(f"Adding user {username} to group {each_group}")
if str(rcfg.bright_cm_version).split(".")[0] == "8":
grp_add_user_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "group; append {each_group} groupmembers {username}; commit;"'
grp_add_user_cmd = (
'/cm/local/apps/cmd/bin/cmsh -n -c "group; append'
f' {each_group} groupmembers {username}; commit;"'
)
else:
grp_add_user_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "group; append {each_group} members {username}; commit;"'
logger.info(f'Running command: {grp_add_user_cmd}')
proc = Popen(shlex.split(grp_add_user_cmd), stdout=PIPE, stderr=PIPE)
out,err = proc.communicate()
logger.debug(f'Result: {err}')
logger.info(f'User {username} is added to {each_group} group')
grp_add_user_cmd = (
'/cm/local/apps/cmd/bin/cmsh -n -c "group; append'
f' {each_group} members {username}; commit;"'
)
logger.info(f"Running command: {grp_add_user_cmd}")
proc = Popen(
shlex.split(grp_add_user_cmd), stdout=PIPE, stderr=PIPE
)
out, err = proc.communicate()
logger.debug(f"Result: {err}")
logger.info(f"User {username} is added to {each_group} group")
insert_db("add", each_group, msg)
msg["success"] = True
except Exception:
msg["success"] = False
msg["errmsg"] = "Exception raised, while adding user to group {groupname}, check the logs for stack trace"
msg["errmsg"] = (
"Exception raised, while adding user to group {groupname}, check"
" the logs for stack trace"
)
logger.error("", exc_info=True)
corr_id = properties.correlation_id
reply_to = properties.reply_to
logger.debug(f'corr_id: {corr_id} \n reply_to: {reply_to}')
logger.debug(f"corr_id: {corr_id} \n reply_to: {reply_to}")
# send response to the callback queue
if reply_to:
props = pika.BasicProperties(correlation_id=corr_id)
logger.debug("Sending confirmation back to reply_to")
rc_rmq.publish_msg(
{
"routing_key": reply_to,
"props": props,
"msg": msg
}
{"routing_key": reply_to, "props": props, "msg": msg}
)
else:
print("Error: no reply_to")
logger.debug(f'User {username} confirmation sent from {task}')
logger.debug(f"User {username} confirmation sent from {task}")
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.info(f"Start listening to queue: {task}")
rc_rmq.bind_queue(queue=task, routing_key='group_member.*', durable=True)
rc_rmq.bind_queue(queue=task, routing_key="group_member.*", durable=True)
rc_rmq.start_consume(
{"queue": task, "cb": group_member}
)
rc_rmq.start_consume({"queue": task, "cb": group_member})
logger.info("Disconnected")
rc_rmq.disconnect()
rc_rmq.disconnect()
......@@ -70,7 +70,7 @@ def notify_user(ch, method, properties, body):
else:
errmsg = "Sending email to user"
smtp = smtplib.SMTP(rcfg.Mail_server)
smtp.sendmail(rcfg.Sender, receivers, message)
smtp.sendmail(rcfg.Sender_notification, receivers, message)
logger.debug(f"Email sent to: {user_email}")
......
......@@ -51,33 +51,33 @@ def ssh_access(ch, method, properties, body):
proc = Popen(['/usr/bin/groups', username], stdout=PIPE, stderr=PIPE)
out,err = proc.communicate()
user_group_list = out.decode().strip().split(":")[1].split()
lock_groups = rcfg.lock_groups
user_groups = out.decode().strip().split(":")[1].split()
state_groups = rcfg.state_groups
"""
Filter the lock group a user is in and assign to spl
lambda function returns common elements between two lists. For all
the true values by returned lambda function for common elements
corresponding values are included as a list by filter function.
"""
spl_groups = list(filter(lambda x:x in list(lock_groups.values()),user_group_list))
user_state_groups = list(filter(lambda x:x in list(rcfg.state_groups.values()),user_groups))
# Depending on state add user to the group corresponding to state.
# Remove user from lock_groups they are already part of.
# Remove user from user_state_groups they are already part of.
# eg: {"groups": { "add":[a,b,c], "remove":[d,e,f] }
if state == 'certification':
msg["groups"]["add"] = [lock_groups[state]]
msg["groups"]["remove"] = spl_groups
msg["groups"]["add"] = [state_groups[state]]
msg["groups"]["remove"] = user_state_groups
elif state == 'hold':
msg["groups"]["add"] = [lock_groups[state]]
msg["groups"]["remove"] = spl_groups
msg["groups"]["add"] = [state_groups[state]]
msg["groups"]["remove"] = user_state_groups
elif state == 'pre_certification':
msg["groups"]["add"] = [lock_groups[state]]
msg["groups"]["remove"] = spl_groups
msg["groups"]["add"] = [state_groups[state]]
msg["groups"]["remove"] = user_state_groups
elif state == 'ok':
msg["groups"]["remove"] = spl_groups
msg["groups"]["remove"] = user_state_groups
# send a message to group_member.py agent
logger.debug(f"sending msg to group agent: {msg}")
......
......@@ -27,7 +27,7 @@ def mail_list_subscription(ch, method, properties, body):
fullname = msg["fullname"]
email = msg["email"]
mail_list_admin = rcfg.Sender
mail_list_admin = rcfg.Sender_subscription
mail_list = rcfg.Mail_list
mail_list_bcc = rcfg.Mail_list_bcc
server = rcfg.Mail_server
......
......@@ -266,13 +266,25 @@ def task_manager(ch, method, properties, body):
# Send trigger message
rc_rmq.publish_msg({"routing_key": routing_key, "msg": message})
if task_name == "create_account" and success and rcfg.default_groups:
rc_rmq.publish_msg(
{
"routing_key": "group_member.add",
"msg": {
"groups": {"add": rcfg.default_groups},
"username": username,
"host": msg.get("host"),
"updated_by": msg.get("updated_by"),
"interface": msg.get("interface"),
},
}
)
logger.debug(f"Trigger message '{routing_key}' sent")
logger.debug("Previous level messages acknowledged")
# Send report to admin
if completed or terminated:
notify_admin(username, current)
update_db(username, {"reported": True})
......@@ -295,7 +307,6 @@ def timeout_handler(signum, frame):
delta = current_time - tracking[user]["last_update"]
if delta.seconds > timeout:
rc_rmq.publish_msg(
{
"routing_key": "complete." + user,
......
......@@ -10,15 +10,12 @@ rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"})
# Define your callback function
def log_user_reg_events(ch, method, properties, body):
# Retrieve message
msg = json.loads(body)
# Retrieve routing key
routing_key = method.routing_key
action = routing_key.split(".")[0]
user = routing_key.split(".")[1]
print(f"Got a {action} message for {user} with routing key: {routing_key}")
print(f"Got a message with routing key: {routing_key}")
print(msg)
# Acknowledge message
......
[tool.black]
line-length=79
target-version=['py36']
line-length = 79
target-version = ['py36']
preview = true
[tool.pylint.main]
disable = ["invalid-name", "import-error", "unused-argument", "broad-except"]
ignore = ["config.py", "tests.py"]
[tool.pylint.format]
max-line-length = 79
......@@ -25,6 +25,8 @@ db_path = ".agent_db"
Mail_server = 'localhost'
Admin_email = 'root@localhost'
Sender = 'ROOT@LOCALHOST'
Sender_notification = 'NOTIFY@LOCALHOST'
Sender_subscription = 'SUBSCRIBE_EMAIL@LOCALHOST'
Sender_alias = 'Services'
Subject = 'New User Account'
Info_url = 'https://www.google.com'
......
......@@ -3,3 +3,4 @@ dataset==1.3.1
Jinja2==2.11.2
sh==1.12.14
pre-commit==2.12.1
greenlet==1.1.3
#!/bin/bash
usage() {
echo "Usage: $0 -d USERNAME to disable a user"
echo "Usage: $0 -e USERNAME to re-enable a user"
}
if [[ "$EUID" -ne 0 ]]; then
echo "This script must be run as root!"
exit 1
fi
if [ "$#" -eq 0 ]; then
usage
exit 1
fi
while getopts ':d:e:' OPTION; do
case $OPTION in
d)
mv /var/spool/cron/$OPTARG /var/spool/cron/$OPTARG.disabled
echo $OPTARG >>/etc/cron.deny
;;
e)
mv /var/spool/cron/$OPTARG.disabled /var/spool/cron/$OPTARG
sed -i -e "/$OPTARG/d" /etc/cron.deny
;;
esac
done
#!/bin/bash
force=false
username=''
usage() {
echo "Usage: $0 -u USERNAME to run graceful shutdown of processes"
echo "Usage: $0 -u USERNAME -k to run forced shutdown of processes"
}
if [[ "$EUID" -ne 0 ]]; then
echo "This script must be run as root!"
exit 1
fi
while [ $OPTIND -le "$#" ]; do
if getopts k option; then
case $option in
k)
force=true
;;
esac
else
username=("${!OPTIND}")
((OPTIND++))
fi
done
if [ -z "$username" ]; then
usage
exit 1
fi
if [ "$username" = "root" ]; then
echo "Username cannot be root"
exit 1
fi
userId=$(id -u $username)
if [ "$force" = true ]; then
echo "Performing SIGKILL on processes belonging to $username"
pkill -9 -u $userId
else
echo "Performing SIGTERM on processes belonging to $username"
pkill -u $userId
fi
#!/bin/bash
group_options=(gpfs4 gpfs5)
gpfs4_home="/gpfs4/data/user/home"
gpfs5_home="/gpfs5/data/user/home"
user=$1
group_to=$2
if [[ -z "${group_to}" ]]; then
echo "Usage: $0 USER TARGET_GROUP"
exit 1
elif [[ ! " ${group_options[*]} " =~ [[:space:]]${group_to}[[:space:]] ]]; then
echo "Invalid target group"
echo "Available options: ${group_options[*]}, got ${group_to}"
exit 1
fi
if ! getent passwd "$user" > /dev/null 2>&1; then
echo "The user $user does not exist"
exit 1
fi
cd /cm/shared/rabbitmq_agents || exit
source venv/bin/activate
if [[ "$group_to" == "gpfs4" ]]; then
group_from=gpfs5
dir_from="$gpfs5_home/$user/"
dir_to="$gpfs4_home/$user"
else
group_from=gpfs4
dir_from="$gpfs4_home/$user/"
dir_to="$gpfs5_home/$user"
fi
if [[ -d "/$group_from/data/user/home/$user" ]]; then
./account_manager.py "$user" hold
rsync -a --delete "$dir_from" "$dir_to"
./group_manager.py "$user" -g "$group_to"
./group_manager.py "$user" -d -g "$group_from"
./account_manager.py "$user" ok
else
echo User home directory does not exist.
exit 1
fi