diff --git a/prod_rmq_agents/acct_mgmt_workflow.py b/prod_rmq_agents/acct_mgmt_workflow.py index 9463aa98a4466e2ced87075cd08dae74b8bc5ff6..ac89b6dee2b4c38e3bf89d58246f764640fc8c1c 100755 --- a/prod_rmq_agents/acct_mgmt_workflow.py +++ b/prod_rmq_agents/acct_mgmt_workflow.py @@ -42,6 +42,7 @@ def manage_acct(ch, method, properties, body): if service == 'all': current["new_jobs"] = None current["expire_account"] = None + current["ssh_access"] = None # send a broadcast message to all agents rc_rmq.publish_msg( { diff --git a/prod_rmq_agents/group_member.py b/prod_rmq_agents/group_member.py new file mode 100644 index 0000000000000000000000000000000000000000..cda7e5ac0bdf97f4ef953cc4b3fde4951b65143f --- /dev/null +++ b/prod_rmq_agents/group_member.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python +import os +import json +import pika +import shlex +import rc_util +from subprocess import Popen,PIPE +from pathlib import Path +from rc_rmq import RCRMQ +import rabbit_config as rcfg + +task = "group_member" + +args = rc_util.get_args() +logger = rc_util.get_logger(args) + +# Instantiate rabbitmq object +rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"}) + + +def group_member(ch, method, properties, body): + msg = json.loads(body) + username = msg["username"] + action = msg["action"] + msg["task"] = task + state = msg["state"] + + try: + if 'remove' in msg["groups"]: + for each_group in msg["groups"]["remove"]: + logger.debug(f'Removing user {username} from group {each_group}') + if str(rcfg.bright_cm_version).split(".")[0] == "8": + grp_remove_user_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "group; removefrom {each_group} groupmembers {username}; commit;"' + else: + grp_remove_user_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "group; removefrom {each_group} members {username}; commit;"' + + logger.info(f'Running command: {grp_remove_user_cmd}') + proc = Popen(shlex.split(grp_remove_user_cmd), stdout=PIPE, stderr=PIPE) + out,err = proc.communicate() + logger.debug(f'Result: {err}') + logger.info(f'User {username} is removed from {each_group} group') + + if 'add' in msg["groups"]: + for each_group in msg["groups"]["add"]: + logger.debug(f'Adding user {username} to group {each_group}') + if str(rcfg.bright_cm_version).split(".")[0] == "8": + grp_add_user_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "group; append {each_group} groupmembers {username}; commit;"' + else: + grp_add_user_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "group; append {each_group} members {username}; commit;"' + + logger.info(f'Running command: {grp_add_user_cmd}') + proc = Popen(shlex.split(grp_add_user_cmd), stdout=PIPE, stderr=PIPE) + out,err = proc.communicate() + logger.debug(f'Result: {err}') + logger.info(f'User {username} is added to {each_group} group') + + + msg["success"] = True + + except Exception: + msg["success"] = False + msg["errmsg"] = "Exception raised, while adding user to group {groupname}, check the logs for stack trace" + logger.error("", exc_info=True) + + + corr_id = properties.correlation_id + reply_to = properties.reply_to + + logger.debug(f'corr_id: {corr_id} \n reply_to: {reply_to}') + # send response to the callback queue + if reply_to: + props = pika.BasicProperties(correlation_id=corr_id) + logger.debug("Sending confirmation back to reply_to") + rc_rmq.publish_msg( + { + "routing_key": reply_to, + "props": props, + "msg": msg + } + ) + else: + print("Error: no reply_to") + + logger.debug(f'User {username} confirmation sent from {task}') + + ch.basic_ack(delivery_tag=method.delivery_tag) + + +logger.info(f"Start listening to queue: {task}") +rc_rmq.bind_queue(queue=task, routing_key='group_member.*', durable=True) + +rc_rmq.start_consume( + {"queue": task, "cb": group_member} +) + +logger.info("Disconnected") +rc_rmq.disconnect() + diff --git a/prod_rmq_agents/ssh_access.py b/prod_rmq_agents/ssh_access.py index e289e579ab618a8ccc3f66ad070db22c1016f48f..947d66a89077d4eb327955beaa1b1561a33ef2c0 100644 --- a/prod_rmq_agents/ssh_access.py +++ b/prod_rmq_agents/ssh_access.py @@ -2,8 +2,9 @@ import os import json import pika +import uuid import rc_util -from os import popen +from subprocess import Popen,PIPE from pathlib import Path from rc_rmq import RCRMQ import rabbit_config as rcfg @@ -19,44 +20,92 @@ rc_rmq = RCRMQ({"exchange": rcfg.Exchange, "exchange_type": "topic"}) def ssh_access(ch, method, properties, body): msg = json.loads(body) + routing_key = method.routing_key username = msg["username"] action = msg["action"] msg["task"] = task - corr_id = properties.correlation_id - reply_to = properties.reply_to + queuename = msg["queuename"] + state = msg["state"] + global corr_id try: - block_ssh_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "group; use nossh; append members {username}; commit;"' - unblock_ssh_cmd = f'/cm/local/apps/cmd/bin/cmsh -n -c "group; use nossh; removefrom members {username}; commit;"' - - if action == 'lock': - block_ssh = popen(block_ssh_cmd).read().rstrip() - elif action == 'unlock': - unblock_ssh = popen(unblock_ssh_cmd).read().rstrip() - - msg["success"] = True - logger.info(f"User {username} is added to nossh group") - + # check if it's a response from group_member_agent + if routing_key == task: + logger.debug(f"corr_id sent by group_member agent: {properties.correlation_id}") + if corr_id == properties.correlation_id: + logger.debug(f'group_member agent confirmation msg["success"]: {msg["success"]}') + # forward confirmation response to acct_mgmt_workflow agent + rc_rmq.publish_msg( + { + "routing_key": f'acctmgr.done.{queuename}', + "msg": msg + } + ) + logger.debug(f'User {username} confirmation sent for {action}ing {task}') + + else: + corr_id = str(uuid.uuid4()) + logger.debug(f'corr_id generated: {corr_id}') + msg["groups"] = {} + + proc = Popen(['/usr/bin/groups', username], stdout=PIPE, stderr=PIPE) + out,err = proc.communicate() + + user_groups = out.decode().strip().split(":")[1].split() + state_groups = rcfg.state_groups + """ + Filter the lock group a user is in and assign to spl + lambda function returns common elements between two lists. For all + the true values by returned lambda function for common elements + corresponding values are included as a list by filter function. + """ + user_state_groups = list(filter(lambda x:x in list(rcfg.state_groups.values()),user_groups)) + + # Depending on state add user to the group corresponding to state. + # Remove user from user_state_groups they are already part of. + # eg: {"groups": { "add":[a,b,c], "remove":[d,e,f] } + if state == 'certification': + msg["groups"]["add"] = [state_groups[state]] + msg["groups"]["remove"] = user_state_groups + + elif state == 'hold': + msg["groups"]["add"] = [state_groups[state]] + msg["groups"]["remove"] = user_state_groups + + elif state == 'pre_certification': + msg["groups"]["add"] = [state_groups[state]] + msg["groups"]["remove"] = user_state_groups + + elif state == 'ok': + msg["groups"]["remove"] = user_state_groups + + # send a message to group_member.py agent + logger.debug(f"sending msg to group agent: {msg}") + rc_rmq.publish_msg( + { + "routing_key": f'group_member.{queuename}', + "props": pika.BasicProperties( + correlation_id = corr_id, + reply_to = task, + ), + "msg": msg + } + ) + logger.info(f"Request sent to add/remove user {username} to/from spl groups") + except Exception: msg["success"] = False - msg["errmsg"] = "Exception raised, while blocking user's ssh access, check the logs for stack trace" + msg["errmsg"] = "Exception raised in ssh_access agent, check the logs for stack trace" logger.error("", exc_info=True) - # send response to callback queue with it's correlation ID - rc_rmq.publish_msg( - { - "routing_key": f'acctmgr.done.{queuename}', - "msg": msg - } - ) - - logger.debug(f"User {username} confirmation sent for {action}ing {task}") - ch.basic_ack(delivery_tag=method.delivery_tag) logger.info(f"Start listening to queue: {task}") +rc_rmq.bind_queue(queue=task, routing_key='lock.*', durable=True) +rc_rmq.bind_queue(queue=task, routing_key='unlock.*', durable=True) rc_rmq.bind_queue(queue=task, routing_key='ssh.*', durable=True) +rc_rmq.bind_queue(queue=task, routing_key=task, durable=True) rc_rmq.start_consume( {"queue": task, "cb": ssh_access}