Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • rc/rabbitmq_agents
  • louistw/rabbitmq_agents
  • krish94/rabbitmq_agents
  • dwheel7/rabbitmq_agents
4 results
Show changes
Commits on Source (40)
# rabbitmq_agents
This repo keeps different rabbitmq agents that help in account creation on OHPC system. # RabbitMQ User Registration Agents
It has 2 branches ```develop``` and ```production``` , that houses agents based on where they are launched ---
## Using RCRMQ class ## Overview
This project automates user registration workflows at UAB using **RabbitMQ** to route tasks between systems like the web interface, CLI tools, databases, and email services. It ensures tasks like assigning user IDs, validating data, and sending notifications happen in sequence without manual intervention.
- First, rename `rabbit_config.py.example` to `rabbit_config.py` ---
- Modify config file, at least the `Password` needs to be your own passwod ## Key Components
### 1. **The `RegUsr` Exchange**
- **Type**: Topic exchange (routes messages using `routing_key` patterns).
- **Purpose**: Acts as the central hub for all registration-related messages.
- In your code: ### 2. **Core Scripts**
- **`self_reg_app` (Web UI)**: Starts the process by sending a `request<queuename>` message with user data.
- **`create_account.py` (CLI)**: Triggers backend tasks (e.g., UID/GID assignment, email subscriptions).
### 3. **Queues & Their Jobs**
| Queue Name | What It Does |
|--------------------------|-----------------------------------------------------------------------------|
| `get next uid gid` | Reserves a unique UID/GID for the user (uses SQLite to track IDs). |
| `subscribe mail list` | Adds the user’s email to mailing lists (e.g., department announcements). |
| `git commit` | Logs configuration changes to Git (e.g., new user added). |
| `notify user` | Sends emails/SMS to users (e.g., "Your account is ready"). |
| `task_manager` | Coordinates tasks like retrying failed steps or updating logs. |
### 4. **Data Flow**
1. A user submits details via the **Web UI** (`self_reg_app`).
2. A `request<queuename>` message is sent to `RegUsr` with fields:
```json
{ "username", "queuename", "email", "fullname", "reason" }
3. The system:
- Assigns UID/GID via SQLite (`get next uid gid` queue).
- Validates data with a `verify<queuename>` message.
- Sends a `completed<queuename>` message with success/failure status.
- Notifies the user and logs the event.
----------
## Setup & Usage
### Prerequisites
* Launch an alma9 instance on openstack, ssh into it and run the following commands
*Before proceeding, set these environment variables:*
```
export RABBITMQ_USER="reggie" # RabbitMQ admin username
export RABBITMQ_PASSWORD="secure_password" # RabbitMQ admin password
export CELERY_VHOST="adduser" # Celery-dedicated vhost
export CELERY_USER="celery_user" # Celery service username
export CELERY_PASSWORD="celery_pass" # Celery service password
```
### 1.1 Configure Package Repositories
# Import security keys
```
sudo rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
sudo rpm --import https://packagecloud.io/rabbitmq/erlang/gpgkey
sudo rpm --import https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
```
# Erlang Repository (AlmaLinux 8)
```
sudo tee /etc/yum.repos.d/rabbitmq_erlang.repo <<'EOL'
[rabbitmq_erlang]
name=RabbitMQ Erlang
baseurl=https://packagecloud.io/rabbitmq/erlang/el/8/$basearch
gpgcheck=1
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
enabled=1
EOL
```
# RabbitMQ Server Repository
```
sudo tee /etc/yum.repos.d/rabbitmq_server.repo <<'EOL'
[rabbitmq_server]
name=RabbitMQ Server
baseurl=https://packagecloud.io/rabbitmq/rabbitmq-server/el/8/$basearch
gpgcheck=1
gpgkey=https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
enabled=1
EOL
```
### 1.2 Install Packages
```
sudo yum install -y socat logrotate
sudo yum install -y erlang-25.3.2 rabbitmq-server-3.12.6
sudo systemctl enable --now rabbitmq-server
```
## 2. RabbitMQ Configuration
### 2.1 Create Admin User & Permissions
```
sudo rabbitmqctl add_user $RABBITMQ_USER $RABBITMQ_PASSWORD
sudo rabbitmqctl set_user_tags $RABBITMQ_USER administrator
sudo rabbitmqctl set_permissions -p / $RABBITMQ_USER ".*" ".*" ".*"
```
### 2.2 Configure Celery Environment
```
sudo rabbitmqctl add_vhost $CELERY_VHOST
sudo rabbitmqctl add_user $CELERY_USER $CELERY_PASSWORD
sudo rabbitmqctl set_permissions -p $CELERY_VHOST $CELERY_USER ".*" ".*" ".*"
```
## 3. Agent Service Setup
### 3.1 Deploy Agent Codebase
``` ```
# import the class AGENT_DIR="/cm/shared/rabbitmq_agents"
from rc_rmq import RCRMQ sudo git clone https://gitlab.rc.uab.edu/rc/rabbitmq_agents.git $AGENT_DIR
cd $AGENT_DIR
sudo git checkout main # Or specific commit/tag
```
### 3.2 Configure Python Environment
```
sudo python3 -m venv $AGENT_DIR/venv
sudo $AGENT_DIR/venv/bin/pip install -r $AGENT_DIR/requirements.txt
```
# instantiate an instance ### 3.3 Create Agent Configuration
rc_rmq = RCRMQ({'exchange': 'RegUsr'})
# publish a message to message queue Create `$AGENT_DIR/rabbit_config.py` with:
rc_rmq.publish_msg({
'routing_key': 'your_key',
'msg': {
'type': 'warning',
'content': 'this is warning'
}
})
# to consume message from a queue ```
# you have to first define callback function # rabbit_config.py
# with parameters: channel, method, properties, body host = 'localhost' # Or your cluster host
def callback_function(ch, method, properties, body): port = 5672
msg = json.loads(body) vhost = '$CELERY_VHOST' # Use actual vhost name
print("get msg: {}".format(msg['username') user = '$CELERY_USER' # Celery username
password = '$CELERY_PASSWORD'
```
# this will stop the consumer ### 3.4 Systemd Service Setup for All Agents
rc_rmq.stop_consumer() OHPC Account Agent:
# start consume messagre from queue with callback function ```
rc_rmq.start_consume({ sudo tee /etc/systemd/system/ohpc_account_agent.service <<EOL
'queue': 'queue_name', [Unit]
'routing_key: 'your_key', After=rabbitmq-server.service
'cb': callback_function
})
# don't forget to close connection [Service]
rc_rmq.disconnect() Type=simple
StartLimitInterval=0
Restart=on-failure
User=root
WorkingDirectory=/cm/shared/rabbitmq_agents/dev_rmq_agents
Environment="PYTHONPATH=/cm/shared/rabbitmq_agents/"
Environment="PATH=/cm/shared/rabbitmq_agents/dev_rmq_agents/venv/bin:/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin"
ExecStart=/cm/shared/rabbitmq_agents/venv/bin/python -u ohpc_account_create.py
[Install]
WantedBy=multi-user.target
EOL
``` ```
### Account creation flowchart OOD Account Agent:
![Account creation flowchart](./account-creation-flow.png)
\ No newline at end of file ```
sudo tee /etc/systemd/system/ood_account_agent.service <<EOL
[Unit]
After=rabbitmq-server.service
[Service]
Type=simple
StartLimitInterval=0
Restart=on-failure
User=root
WorkingDirectory=/cm/shared/rabbitmq_agents/dev_rmq_agents
Environment="PYTHONPATH=/cm/shared/rabbitmq_agents/"
Environment="PATH=/cm/shared/rabbitmq_agents/dev_rmq_agents/venv/bin:/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin"
ExecStart=/cm/shared/rabbitmq_agents/venv/bin/python -u ood_account_create.py
[Install]
WantedBy=multi-user.target
EOL
```
Slurm Agent:
```
sudo tee /etc/systemd/system/slurm_agent.service <<EOL
[Unit]
After=rabbitmq-server.service
[Service]
Type=simple
StartLimitInterval=0
Restart=on-failure
User=root
WorkingDirectory=/cm/shared/rabbitmq_agents/dev_rmq_agents
Environment="PYTHONPATH=/cm/shared/rabbitmq_agents/"
Environment="PATH=/cm/shared/rabbitmq_agents/dev_rmq_agents/venv/bin:/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin"
ExecStart=/cm/shared/rabbitmq_agents/venv/bin/python -u slurm_agent.py
[Install]
WantedBy=multi-user.target
EOL
```
## 4. Network Configuration
### Disable Firewall (Testing Only)
```
sudo systemctl stop firewalld
sudo systemctl disable firewalld
```
## 5. Validation & Testing
```
# Verify Services
systemctl is-active rabbitmq-server ood_account_agent
# Check RabbitMQ Users
sudo rabbitmqctl list_users
# Inspect Agent Logs
journalctl -u ood_account_agent -f --since "5m ago"
```
## Error Handling
- Failures (e.g., duplicate email) are reported in the `completed<queuename>` message’s `errmsg` field.
- The `user reg event logger` tracks all registration attempts. Check logs at `/var/log/user_reg.log`.
#!/usr/bin/env python3 #!/usr/bin/env python3
import json import json
import rc_util
import argparse import argparse
import signal import signal
import uuid
import pika
import rc_util import rc_util
from rc_rmq import RCRMQ from rc_rmq import RCRMQ
import rabbit_config as rcfg import rabbit_config as rcfg
import time
parser = argparse.ArgumentParser(description = "Account management driver script") parser = argparse.ArgumentParser(
parser.add_argument( description="Account management driver script"
"username", help="Username that should be locked/unlocked") )
parser.add_argument("username", help="Username that should be locked/unlocked")
parser.add_argument( parser.add_argument(
"state", choices=['ok', 'hold', 'certification', 'pre_certification'], help="Choose from states (ok,hold,certification,pre_certification)") "state",
choices=["ok", "hold", "certification", "pre_certification"],
help="Choose from states (ok,hold,certification,pre_certification)",
)
parser.add_argument( parser.add_argument(
"-s", "--service", nargs='+', default='all', choices=['ssh', 'newjobs', 'expiration', 'all'], help="List one or more services to be blocked (default: %(default)s)") "-s",
"--service",
nargs="+",
default="all",
choices=["ssh", "newjobs", "expiration", "all"],
help="List one or more services to be blocked (default: %(default)s)",
)
parser.add_argument( parser.add_argument(
"-v", "--verbose", action="store_true", help="verbose output") "-v", "--verbose", action="store_true", help="verbose output"
)
parser.add_argument( parser.add_argument(
"-n", "--dry-run", action="store_true", help="enable dry run mode" "-n", "--dry-run", action="store_true", help="enable dry run mode"
) )
...@@ -41,11 +48,12 @@ msg["state"] = state ...@@ -41,11 +48,12 @@ msg["state"] = state
msg["service"] = service msg["service"] = service
msg["queuename"] = queuename msg["queuename"] = queuename
msg["updated_by"], msg["host"] = rc_util.get_caller_info() msg["updated_by"], msg["host"] = rc_util.get_caller_info()
msg["interface"] = "CLI"
# publish msg with acctmgr.{uname} routing key. # publish msg with acctmgr.{uname} routing key.
rc_rmq.publish_msg( rc_rmq.publish_msg(
{ {
"routing_key": f'acctmgr.request.{queuename}', "routing_key": f"acctmgr.request.{queuename}",
"msg": msg, "msg": msg,
} }
) )
...@@ -61,18 +69,23 @@ def callback(ch, method, properties, body): ...@@ -61,18 +69,23 @@ def callback(ch, method, properties, body):
username = msg["username"] username = msg["username"]
if msg["success"]: if msg["success"]:
print(f"Account for {username} has been {msg['action']}ed.\n Updating the user state in DB") print(
f"Account for {username} has been {msg['action']}ed.\n "
"Updating the user state in DB"
)
else: else:
print(f"There's some issue in account management agents for {username}") print(
f"There's some issue in account management agents for {username}"
)
errmsg = msg.get("errmsg", []) errmsg = msg.get("errmsg", [])
for err in errmsg: for err in errmsg:
print(err) print(err)
ch.basic_ack(delivery_tag=method.delivery_tag) ch.basic_ack(delivery_tag=method.delivery_tag)
rc_rmq.stop_consume() rc_rmq.stop_consume()
rc_rmq.delete_queue(queuename) rc_rmq.delete_queue(queuename)
print(f"Request {username} account state set to {state}.") print(f"Request {username} account state set to {state}.")
# Set initial timeout timer # Set initial timeout timer
...@@ -83,7 +96,7 @@ print("Waiting for completion...") ...@@ -83,7 +96,7 @@ print("Waiting for completion...")
rc_rmq.start_consume( rc_rmq.start_consume(
{ {
"queue": queuename, "queue": queuename,
"routing_key": f'certified.{queuename}', "routing_key": f"certified.{queuename}",
"cb": callback, "cb": callback,
} }
) )
#!/usr/bin/env python3
import argparse
import grp
import json
import pika
import pwd
import rabbit_config as rcfg
import sys
import uuid
from rc_rmq import RCRMQ
from rc_util import get_caller_info, timeout
# Instantiate rabbitmq object
rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"})
def user_exists(username):
try:
pwd.getpwnam(username)
except KeyError:
return False
return True
def group_exists(groupname):
try:
grp.getgrnam(groupname)
except KeyError:
return False
return True
@timeout(rcfg.Function_timeout)
def manage_group(op, usernames, groupname, debug=False):
callback_queue = rc_rmq.bind_queue(exclusive=True)
rpc_queue = f"group_member.{op}"
corr_id = str(uuid.uuid4())
status = dict.fromkeys(usernames, False)
response = 0
interface = "CLI"
updated_by, host = get_caller_info()
def handler(ch, method, properties, body):
if debug:
print("Message received:")
print(body)
nonlocal corr_id
nonlocal status
nonlocal response
msg = json.loads(body)
username = msg["username"]
if properties.correlation_id == corr_id:
status[username] = msg["success"]
response += 1
if not msg["success"]:
print(f"{username}: Something's wrong, please try again.")
if len(status) == response:
rc_rmq.stop_consume()
rc_rmq.disconnect()
if debug:
print(f"Adding user(s) {', '.join(usernames)} to group {groupname}")
print(f"Callback queue: {callback_queue}, correlation_id: {corr_id}")
for user in usernames:
rc_rmq.publish_msg(
{
"routing_key": rpc_queue,
"props": pika.BasicProperties(
correlation_id=corr_id, reply_to=callback_queue
),
"msg": {
"groups": {f"{op}": [f"{groupname}"]},
"username": user,
"host": host,
"updated_by": updated_by,
"interface": interface,
},
}
)
rc_rmq.start_consume(
{
"queue": callback_queue,
"exclusive": True,
"bind": False,
"cb": handler,
}
)
print("Done")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Group management script")
parser.add_argument("--debug", action="store_true", help="Debug mode")
parser.add_argument(
"-d",
"--delete",
action="store_true",
help="Delete the user(s) from the group",
)
parser.add_argument(
"-g", "--group", required=True, help="The Group to add the user(s)"
)
parser.add_argument(
"users",
metavar="USER",
nargs="+",
help="User(s) to be add to the group",
)
args = parser.parse_args()
exist_users = []
miss = False
# Check if all of the users exist
for user in args.users:
if not user_exists(user):
print(f"{user} does not exist.", file=sys.stderr)
miss = True
else:
exist_users.append(user)
# Check if the group exists
if not group_exists(args.group):
print(f"{args.group} does not exist.", file=sys.stderr)
miss = True
if miss:
print("A user and/or group does not exist.", file=sys.stderr)
print("Abort.", file=sys.stderr)
exit(1)
elif exist_users:
op = "remove" if args.delete else "add"
manage_group(op, exist_users, args.group, args.debug)
else:
print("No user to change", file=sys.stderr)
print("Abort.", file=sys.stderr)
sys.exit(1)
BEGIN TRANSACTION;
-- Create new table with updated_by column
CREATE TABLE groups_temp (
id INTEGER PRIMARY KEY,
user TEXT,
"group" TEXT,
operation INTEGER,
date DATETIME,
host TEXT,
updated_by TEXT,
interface TEXT
);
-- Copy all entries from old table
INSERT INTO groups_temp(user,"group",operation,date,host,updated_by,interface)
SELECT user,"group",operation,date,host,executed_by,interface
FROM groups;
-- Drop old table
DROP TABLE groups;
-- Rename new table
ALTER TABLE groups_temp
RENAME TO groups;
COMMIT;
#!/usr/bin/env python #!/usr/bin/env python
import os import dataset
import json import json
import pika import pika
import shlex import shlex
import rc_util import rc_util
from subprocess import Popen,PIPE from datetime import datetime
from pathlib import Path from subprocess import Popen, PIPE
from rc_rmq import RCRMQ from rc_rmq import RCRMQ
import rabbit_config as rcfg import rabbit_config as rcfg
...@@ -14,85 +14,142 @@ task = "group_member" ...@@ -14,85 +14,142 @@ task = "group_member"
args = rc_util.get_args() args = rc_util.get_args()
logger = rc_util.get_logger(args) logger = rc_util.get_logger(args)
# Initialize db
db = dataset.connect(f"sqlite:///{rcfg.db_path}/user_reg.db")
table = db["groups"]
# Instantiate rabbitmq object # Instantiate rabbitmq object
rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"}) rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"})
def insert_db(operation, groupname, msg):
if operation == "remove":
op = 0
elif operation == "add":
op = 1
# SQL insert
table.insert(
{
"user": msg["username"],
"group": groupname,
"operation": op,
"date": datetime.now(),
"host": msg["host"],
"updated_by": msg["updated_by"],
"interface": msg.get("interface", ""),
}
)
def group_member(ch, method, properties, body): def group_member(ch, method, properties, body):
"""
Properties:
correlation_id (str): The UUID for the request.
reply_to (str): The RabbitMQ queue name for reply to send to.
Message(body):
username (str): The user to be added/removed from groups.
groups (dict): A dictionary with `add` or `remove` key.
add (list): A list of groups to be added for the user.
remove (list): A list of groups to be removed for the user.
updated_by (str): The user who request the change.
host (str): Hostname where the request comes from.
interface (str): whether it's from CLI or WebUI.
Returns:
status (bool): Whether or not the operation executed successfully.
errmsg (str): Detailed error message if operation failed.
task (str): The task name of the agent who handle the message.
"""
msg = json.loads(body) msg = json.loads(body)
username = msg["username"] username = msg["username"]
action = msg["action"]
msg["task"] = task msg["task"] = task
state = msg["state"]
try: try:
if 'remove' in msg["groups"]: if "remove" in msg["groups"]:
for each_group in msg["groups"]["remove"]: for each_group in msg["groups"]["remove"]:
logger.debug(f'Removing user {username} from group {each_group}') logger.debug(
f"Removing user {username} from group {each_group}"
)
if str(rcfg.bright_cm_version).split(".")[0] == "8": if str(rcfg.bright_cm_version).split(".")[0] == "8":
grp_remove_user_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "group; removefrom {each_group} groupmembers {username}; commit;"' grp_remove_user_cmd = (
'/cm/local/apps/cmd/bin/cmsh -n -c "group; removefrom'
f' {each_group} groupmembers {username}; commit;"'
)
else: else:
grp_remove_user_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "group; removefrom {each_group} members {username}; commit;"' grp_remove_user_cmd = (
'/cm/local/apps/cmd/bin/cmsh -n -c "group; removefrom'
logger.info(f'Running command: {grp_remove_user_cmd}') f' {each_group} members {username}; commit;"'
proc = Popen(shlex.split(grp_remove_user_cmd), stdout=PIPE, stderr=PIPE) )
out,err = proc.communicate()
logger.debug(f'Result: {err}') logger.info(f"Running command: {grp_remove_user_cmd}")
logger.info(f'User {username} is removed from {each_group} group') proc = Popen(
shlex.split(grp_remove_user_cmd), stdout=PIPE, stderr=PIPE
if 'add' in msg["groups"]: )
out, err = proc.communicate()
logger.debug(f"Result: {err}")
logger.info(
f"User {username} is removed from {each_group} group"
)
insert_db("remove", each_group, msg)
if "add" in msg["groups"]:
for each_group in msg["groups"]["add"]: for each_group in msg["groups"]["add"]:
logger.debug(f'Adding user {username} to group {each_group}') logger.debug(f"Adding user {username} to group {each_group}")
if str(rcfg.bright_cm_version).split(".")[0] == "8": if str(rcfg.bright_cm_version).split(".")[0] == "8":
grp_add_user_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "group; append {each_group} groupmembers {username}; commit;"' grp_add_user_cmd = (
'/cm/local/apps/cmd/bin/cmsh -n -c "group; append'
f' {each_group} groupmembers {username}; commit;"'
)
else: else:
grp_add_user_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "group; append {each_group} members {username}; commit;"' grp_add_user_cmd = (
'/cm/local/apps/cmd/bin/cmsh -n -c "group; append'
logger.info(f'Running command: {grp_add_user_cmd}') f' {each_group} members {username}; commit;"'
proc = Popen(shlex.split(grp_add_user_cmd), stdout=PIPE, stderr=PIPE) )
out,err = proc.communicate()
logger.debug(f'Result: {err}') logger.info(f"Running command: {grp_add_user_cmd}")
logger.info(f'User {username} is added to {each_group} group') proc = Popen(
shlex.split(grp_add_user_cmd), stdout=PIPE, stderr=PIPE
)
out, err = proc.communicate()
logger.debug(f"Result: {err}")
logger.info(f"User {username} is added to {each_group} group")
insert_db("add", each_group, msg)
msg["success"] = True msg["success"] = True
except Exception: except Exception:
msg["success"] = False msg["success"] = False
msg["errmsg"] = "Exception raised, while adding user to group {groupname}, check the logs for stack trace" msg["errmsg"] = (
"Exception raised, while adding user to group {groupname}, check"
" the logs for stack trace"
)
logger.error("", exc_info=True) logger.error("", exc_info=True)
corr_id = properties.correlation_id corr_id = properties.correlation_id
reply_to = properties.reply_to reply_to = properties.reply_to
logger.debug(f'corr_id: {corr_id} \n reply_to: {reply_to}') logger.debug(f"corr_id: {corr_id} \n reply_to: {reply_to}")
# send response to the callback queue # send response to the callback queue
if reply_to: if reply_to:
props = pika.BasicProperties(correlation_id=corr_id) props = pika.BasicProperties(correlation_id=corr_id)
logger.debug("Sending confirmation back to reply_to") logger.debug("Sending confirmation back to reply_to")
rc_rmq.publish_msg( rc_rmq.publish_msg(
{ {"routing_key": reply_to, "props": props, "msg": msg}
"routing_key": reply_to,
"props": props,
"msg": msg
}
) )
else: else:
print("Error: no reply_to") print("Error: no reply_to")
logger.debug(f'User {username} confirmation sent from {task}') logger.debug(f"User {username} confirmation sent from {task}")
ch.basic_ack(delivery_tag=method.delivery_tag) ch.basic_ack(delivery_tag=method.delivery_tag)
logger.info(f"Start listening to queue: {task}") logger.info(f"Start listening to queue: {task}")
rc_rmq.bind_queue(queue=task, routing_key='group_member.*', durable=True) rc_rmq.bind_queue(queue=task, routing_key="group_member.*", durable=True)
rc_rmq.start_consume( rc_rmq.start_consume({"queue": task, "cb": group_member})
{"queue": task, "cb": group_member}
)
logger.info("Disconnected") logger.info("Disconnected")
rc_rmq.disconnect() rc_rmq.disconnect()
...@@ -266,13 +266,25 @@ def task_manager(ch, method, properties, body): ...@@ -266,13 +266,25 @@ def task_manager(ch, method, properties, body):
# Send trigger message # Send trigger message
rc_rmq.publish_msg({"routing_key": routing_key, "msg": message}) rc_rmq.publish_msg({"routing_key": routing_key, "msg": message})
if task_name == "create_account" and success and rcfg.default_groups:
rc_rmq.publish_msg(
{
"routing_key": "group_member.add",
"msg": {
"groups": {"add": rcfg.default_groups},
"username": username,
"host": msg.get("host"),
"updated_by": msg.get("updated_by"),
"interface": msg.get("interface"),
},
}
)
logger.debug(f"Trigger message '{routing_key}' sent") logger.debug(f"Trigger message '{routing_key}' sent")
logger.debug("Previous level messages acknowledged") logger.debug("Previous level messages acknowledged")
# Send report to admin # Send report to admin
if completed or terminated: if completed or terminated:
notify_admin(username, current) notify_admin(username, current)
update_db(username, {"reported": True}) update_db(username, {"reported": True})
...@@ -295,7 +307,6 @@ def timeout_handler(signum, frame): ...@@ -295,7 +307,6 @@ def timeout_handler(signum, frame):
delta = current_time - tracking[user]["last_update"] delta = current_time - tracking[user]["last_update"]
if delta.seconds > timeout: if delta.seconds > timeout:
rc_rmq.publish_msg( rc_rmq.publish_msg(
{ {
"routing_key": "complete." + user, "routing_key": "complete." + user,
......
...@@ -10,15 +10,12 @@ rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) ...@@ -10,15 +10,12 @@ rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"})
# Define your callback function # Define your callback function
def log_user_reg_events(ch, method, properties, body): def log_user_reg_events(ch, method, properties, body):
# Retrieve message # Retrieve message
msg = json.loads(body) msg = json.loads(body)
# Retrieve routing key # Retrieve routing key
routing_key = method.routing_key routing_key = method.routing_key
action = routing_key.split(".")[0] print(f"Got a message with routing key: {routing_key}")
user = routing_key.split(".")[1]
print(f"Got a {action} message for {user} with routing key: {routing_key}")
print(msg) print(msg)
# Acknowledge message # Acknowledge message
......
[tool.black] [tool.black]
line-length=79 line-length = 79
target-version=['py36'] target-version = ['py36']
preview = true
[tool.pylint.main]
disable = ["invalid-name", "import-error", "unused-argument", "broad-except"]
ignore = ["config.py", "tests.py"]
[tool.pylint.format]
max-line-length = 79
#!/bin/bash
group_options=(gpfs4 gpfs5)
gpfs4_home="/gpfs4/data/user/home"
gpfs5_home="/gpfs5/data/user/home"
user=$1
group_to=$2
if [[ -z "${group_to}" ]]; then
echo "Usage: $0 USER TARGET_GROUP"
exit 1
elif [[ ! " ${group_options[*]} " =~ [[:space:]]${group_to}[[:space:]] ]]; then
echo "Invalid target group"
echo "Available options: ${group_options[*]}, got ${group_to}"
exit 1
fi
if ! getent passwd "$user" > /dev/null 2>&1; then
echo "The user $user does not exist"
exit 1
fi
cd /cm/shared/rabbitmq_agents || exit
source venv/bin/activate
if [[ "$group_to" == "gpfs4" ]]; then
group_from=gpfs5
dir_from="$gpfs5_home/$user/"
dir_to="$gpfs4_home/$user"
else
group_from=gpfs4
dir_from="$gpfs4_home/$user/"
dir_to="$gpfs5_home/$user"
fi
if [[ -d "/$group_from/data/user/home/$user" ]]; then
./account_manager.py "$user" hold
rsync -a --delete "$dir_from" "$dir_to"
./group_manager.py "$user" -g "$group_to"
./group_manager.py "$user" -d -g "$group_from"
./account_manager.py "$user" ok
else
echo User home directory does not exist.
exit 1
fi