diff --git a/.github/workflows/linting.yaml b/.github/workflows/linting.yaml new file mode 100644 index 0000000000000000000000000000000000000000..4e9c69a121ef0b5ed7a94af4575a272ee98a5028 --- /dev/null +++ b/.github/workflows/linting.yaml @@ -0,0 +1,33 @@ +name: Linting + +on: [push, pull_request] + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - name: Checkout + uses: actions/checkout@v2 + - name: Set up Python 3.6 + uses: actions/setup-python@v2 + with: + # Semantic version range syntax or exact version of a Python version + python-version: '3.6' + # Optional - x64 or x86 architecture, defaults to x64 + architecture: 'x64' + - name: find trailing whitespace + uses: harupy/find-trailing-whitespace@master + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install flake8 + #if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + - name: Lint with flake8 + run: | + # stop the build if there are Python syntax errors or undefined names + #flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + flake8 . --count --max-line-length=79 --show-source --statistics + # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide + #flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000000000000000000000000000000000000..c786d1bc0fd7e67779e94063058c626e1f973785 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,15 @@ +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v3.2.0 + hooks: + - id: trailing-whitespace + - id: end-of-file-fixer + - id: check-yaml + - repo: https://github.com/psf/black + rev: 21.5b0 + hooks: + - id: black + - repo: https://github.com/pycqa/flake8 + rev: 3.9.1 + hooks: + - id: flake8 diff --git a/agent_template.py b/agent_template.py index c6e722338bad04999621bd7dfd47a41d2b962d28..3e09cfd651d7339c028d0ff6939048d249972a17 100644 --- a/agent_template.py +++ b/agent_template.py @@ -3,10 +3,10 @@ import sys import json from rc_rmq import RCRMQ -task = 'task_name' +task = "task_name" # Instantiate rabbitmq object -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) # Define your callback function def on_message(ch, method, properties, body): @@ -18,15 +18,17 @@ def on_message(ch, method, properties, body): msg = json.loads(body) # Do Something - print('[{}]: Callback called.'.format(task)) + print("[{}]: Callback called.".format(task)) # Acknowledge message ch.basic_ack(delivery_tag=method.delivery_tag) print("Start listening to queue: {}".format(task)) -rc_rmq.start_consume({ - 'queue': task, # Define your Queue name - 'routing_key': "#", # Define your routing key - 'cb': on_message # Pass in callback function you just define -}) +rc_rmq.start_consume( + { + "queue": task, # Define your Queue name + "routing_key": "#", # Define your routing key + "cb": on_message, # Pass in callback function you just define + } +) diff --git a/create_account.py b/create_account.py index d46906ff10aa105ffa2688a4406881ea85692840..6869f98918653e01d15f587d89d0cc31024e639f 100755 --- a/create_account.py +++ b/create_account.py @@ -6,21 +6,34 @@ import argparse import signal parser = argparse.ArgumentParser() -parser.add_argument('username', help='username that will be created') -parser.add_argument('email', nargs='?', default='', help="User's email") -parser.add_argument('full_name', nargs='?', default='', help="User's full name") -parser.add_argument('reason', nargs='?', default='', help='Reason of requesting') -parser.add_argument('--domain', default='localhost', help='domain of email') -parser.add_argument('-v', '--verbose', action='store_true', help='verbose output') -parser.add_argument('-n', '--dry-run', action='store_true', help='enable dry run mode') +parser.add_argument("username", help="username that will be created") +parser.add_argument("email", nargs="?", default="", help="User's email") +parser.add_argument( + "full_name", nargs="?", default="", help="User's full name" +) +parser.add_argument( + "reason", nargs="?", default="", help="Reason of requesting" +) +parser.add_argument( + "--domain", default="localhost", help="domain of email" +) +parser.add_argument( + "-v", "--verbose", action="store_true", help="verbose output" +) +parser.add_argument( + "-n", "--dry-run", action="store_true", help="enable dry run mode" +) args = parser.parse_args() timeout = 60 -if args.email == '': +queuename = rc_util.encode_name(args.username) + +if args.email == "": args.email = args.username - if '@' not in args.email: - args.email = args.username + '@' + args.domain + if "@" not in args.email: + args.email = args.username + "@" + args.domain + def timeout_handler(signum, frame): print("Process timeout, there's might some issue with agents") @@ -29,13 +42,13 @@ def timeout_handler(signum, frame): def callback(channel, method, properties, body): msg = json.loads(body) - username = msg['username'] + username = msg["username"] - if msg['success']: - print(f'Account for {username} has been created.') + if msg["success"]: + print(f"Account for {username} has been created.") else: print(f"There's some issue while creating account for {username}") - errmsg = msg.get('errmsg', []) + errmsg = msg.get("errmsg", []) for err in errmsg: print(err) @@ -43,12 +56,20 @@ def callback(channel, method, properties, body): rc_util.rc_rmq.delete_queue() -rc_util.add_account(args.username, email=args.email, full=args.full_name, reason=args.reason) -print(f'Account for {args.username} requested.') +rc_util.add_account( + args.username, + queuename=queuename, + email=args.email, + full=args.full_name, + reason=args.reason, +) +print(f"Account for {args.username} requested.") # Set initial timeout timer signal.signal(signal.SIGALRM, timeout_handler) signal.setitimer(signal.ITIMER_REAL, timeout) -print('Waiting for completion...') -rc_util.consume(args.username, routing_key=f'complete.{args.username}', callback=callback) +print("Waiting for completion...") +rc_util.consume( + queuename, routing_key=f"complete.{queuename}", callback=callback +) diff --git a/dev_rmq_agents/ohpc_account_create.py b/dev_rmq_agents/ohpc_account_create.py index 4504702f8d8f8d611b8db1ab4b52811b08ec6ade..206c4045094814751adc18876bf51d797911fd53 100644 --- a/dev_rmq_agents/ohpc_account_create.py +++ b/dev_rmq_agents/ohpc_account_create.py @@ -8,12 +8,13 @@ from rc_rmq import RCRMQ task = "ohpc_account" # Instantiate rabbitmq object -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) + def ohpc_account_create(ch, method, properties, body): msg = json.loads(body) print("Message received {}".format(msg)) - username = msg['username'] + username = msg["username"] success = False try: subprocess.call(["sudo", "useradd", username]) @@ -24,28 +25,25 @@ def ohpc_account_create(ch, method, properties, body): print("[{}]: Error: {}".format(task, e)) ch.basic_ack(delivery_tag=method.delivery_tag) - msg['uid'] = getpwnam(username).pw_uid - msg['gid'] = getpwnam(username).pw_gid + msg["uid"] = getpwnam(username).pw_uid + msg["gid"] = getpwnam(username).pw_gid # send confirm message - rc_rmq.publish_msg({ - 'routing_key': 'confirm.' + username, - 'msg': { - 'task': task, - 'success': success + rc_rmq.publish_msg( + { + "routing_key": "confirm." + username, + "msg": {"task": task, "success": success}, } - }) + ) if success: # send create message to other agent - rc_rmq.publish_msg({ - 'routing_key': 'create.' + username, - 'msg': msg - }) + rc_rmq.publish_msg( + {"routing_key": "create." + username, "msg": msg} + ) + print("Start Listening to queue: {}".format(task)) -rc_rmq.start_consume({ - 'queue': task, - 'routing_key': 'request.*', - 'cb': ohpc_account_create -}) +rc_rmq.start_consume( + {"queue": task, "routing_key": "request.*", "cb": ohpc_account_create} +) diff --git a/dev_rmq_agents/ood_account_create.py b/dev_rmq_agents/ood_account_create.py index 1ff9cc9fd8a61df98ae1473c20e01480f6e4aa90..339819ec86044ae180dfe1ecb40345ee6f3d5904 100644 --- a/dev_rmq_agents/ood_account_create.py +++ b/dev_rmq_agents/ood_account_create.py @@ -4,21 +4,26 @@ import json import subprocess from rc_rmq import RCRMQ -task = 'ood_account' +task = "ood_account" # Instantiate rabbitmq object -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) + def ood_account_create(ch, method, properties, body): msg = json.loads(body) print("Message received {}".format(msg)) - username = msg['username'] - user_uid = str(msg['uid']) - user_gid = str(msg['gid']) + username = msg["username"] + user_uid = str(msg["uid"]) + user_gid = str(msg["gid"]) success = False try: - subprocess.call(["sudo", "groupadd", "-r", "-g", user_gid, username]) - subprocess.call(["sudo", "useradd", "-u", user_uid, "-g", user_gid, username]) + subprocess.call( + ["sudo", "groupadd", "-r", "-g", user_gid, username] + ) + subprocess.call( + ["sudo", "useradd", "-u", user_uid, "-g", user_gid, username] + ) print("[{}]: User {} has been added".format(task, username)) success = True except: @@ -28,17 +33,15 @@ def ood_account_create(ch, method, properties, body): ch.basic_ack(delivery_tag=method.delivery_tag) # send confirm message - rc_rmq.publish_msg({ - 'routing_key': 'confirm.' + username, - 'msg': { - 'task': task, - 'success': success + rc_rmq.publish_msg( + { + "routing_key": "confirm." + username, + "msg": {"task": task, "success": success}, } - }) + ) + print("Start listening to queue: {}".format(task)) -rc_rmq.start_consume({ - 'queue': task, - 'routing_key': "create.*", - 'cb': ood_account_create -}) +rc_rmq.start_consume( + {"queue": task, "routing_key": "create.*", "cb": ood_account_create} +) diff --git a/dev_rmq_agents/slurm_agent.py b/dev_rmq_agents/slurm_agent.py index e07af9bd233b952aa385848a4504859f31a22177..eacc44c92ee61de611c6576cc0642adf6ed7992a 100755 --- a/dev_rmq_agents/slurm_agent.py +++ b/dev_rmq_agents/slurm_agent.py @@ -4,19 +4,40 @@ import json import subprocess from rc_rmq import RCRMQ -task = 'slurm_account' +task = "slurm_account" # Instantiate rabbitmq object -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) + def slurm_account_create(ch, method, properties, body): msg = json.loads(body) print("Message received {}".format(msg)) - username = msg['username'] + username = msg["username"] success = False try: - subprocess.call(["sudo", "sacctmgr", "add", "account", username, "-i", "Descripition: Add user"]) - subprocess.call(["sudo", "sacctmgr", "add", "user", username, "account=" + username, "-i"]) + subprocess.call( + [ + "sudo", + "sacctmgr", + "add", + "account", + username, + "-i", + "Descripition: Add user", + ] + ) + subprocess.call( + [ + "sudo", + "sacctmgr", + "add", + "user", + username, + "account=" + username, + "-i", + ] + ) print("SLURM account for user {} has been added".format(username)) success = True except: @@ -26,17 +47,15 @@ def slurm_account_create(ch, method, properties, body): ch.basic_ack(delivery_tag=method.delivery_tag) # send confirm message - rc_rmq.publish_msg({ - 'routing_key': 'confirm.' + username, - 'msg': { - 'task': task, - 'success': success + rc_rmq.publish_msg( + { + "routing_key": "confirm." + username, + "msg": {"task": task, "success": success}, } - }) + ) + print("Start listening to queue: {}".format(task)) -rc_rmq.start_consume({ - 'queue': task, - 'routing_key': "create.*", - 'cb': slurm_account_create -}) +rc_rmq.start_consume( + {"queue": task, "routing_key": "create.*", "cb": slurm_account_create} +) diff --git a/flask_producer.py b/flask_producer.py index dc3c44634745278fa426821d2ea0a2ecc9e61e3f..1ef993ad630fb5cc1a0ba25a72b69c8a9e619471 100755 --- a/flask_producer.py +++ b/flask_producer.py @@ -15,7 +15,7 @@ user_name = sys.argv[2] message = { "username": user_name, "fullname": "Full Name", - "reason": "Reason1, Reason2." + "reason": "Reason1, Reason2.", } hostname = socket.gethostname().split(".", 1)[0] @@ -23,33 +23,44 @@ connect_host = rcfg.Server if hostname != rcfg.Server else "localhost" # Set up credentials to connect to RabbitMQ server credentials = pika.PlainCredentials(rcfg.User, rcfg.Password) -parameters = pika.ConnectionParameters(connect_host, - rcfg.Port, - rcfg.VHost, - credentials) +parameters = pika.ConnectionParameters( + connect_host, rcfg.Port, rcfg.VHost, credentials +) # Establish connection to RabbitMQ server connection = pika.BlockingConnection(parameters) channel = connection.channel() -channel.exchange_declare(exchange=rcfg.Exchange, exchange_type='direct') +channel.exchange_declare(exchange=rcfg.Exchange, exchange_type="direct") -channel.basic_publish(exchange=rcfg.Exchange, routing_key=node, body=json.dumps(message)) +channel.basic_publish( + exchange=rcfg.Exchange, routing_key=node, body=json.dumps(message) +) print(" [x] Sent {}: {}".format(node, json.dumps(message))) # creates a named queue -result = channel.queue_declare(queue=user_name, exclusive=False, durable=True) +result = channel.queue_declare( + queue=user_name, exclusive=False, durable=True +) # bind the queue with exchange -channel.queue_bind(exchange=rcfg.Exchange, queue=user_name, routing_key=user_name) +channel.queue_bind( + exchange=rcfg.Exchange, queue=user_name, routing_key=user_name +) + def work(ch, method, properties, body): msg = json.loads(body) - print("Received message from {}: \n\t{}".format(method.routing_key, msg)) - #queue_unbind(queue, exchange=None, routing_key=None, arguments=None, callback=None) + print( + "Received message from {}: \n\t{}".format(method.routing_key, msg) + ) + # queue_unbind(queue, exchange=None, routing_key=None, arguments=None, callback=None) channel.queue_delete(method.routing_key) + # ingest messages, and assume delivered via auto_ack -channel.basic_consume(queue=sys.argv[2], on_message_callback=work, auto_ack=True) +channel.basic_consume( + queue=sys.argv[2], on_message_callback=work, auto_ack=True +) print("Subscribing to queue: {}".format(sys.argv[2])) # initiate message ingestion diff --git a/prod_rmq_agents/dir_verify.py b/prod_rmq_agents/dir_verify.py index 156db1cff7f62ac3dea884614f37d716b6846acf..fd2fa8842e891e80674c75e1d6f6a96440c0b1ef 100644 --- a/prod_rmq_agents/dir_verify.py +++ b/prod_rmq_agents/dir_verify.py @@ -8,69 +8,73 @@ from pathlib import Path from rc_rmq import RCRMQ import rabbit_config as rcfg -task = 'dir_verify' +task = "dir_verify" dirs = rcfg.User_dirs args = rc_util.get_args() logger = rc_util.get_logger(args) # Instantiate rabbitmq object -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) + def dir_verify(ch, method, properties, body): msg = json.loads(body) - username = msg['username'] - msg['task'] = task - msg['success'] = True + username = msg["username"] + msg["task"] = task + msg["success"] = True missing_dirs = [] try: for d in dirs: - path = Path(d) / msg['username'] + path = Path(d) / msg["username"] if args.dry_run: - logger.info(f'Checking dirs: {path}') + logger.info(f"Checking dirs: {path}") else: if not path.exists(): # check if dirs exist and record any missing dirs missing_dirs.append(path) - msg['success'] = False - msg['errmsg'] = f"Error: missing dirs {missing_dirs}" - logger.info(f'{path} does not exist') + msg["success"] = False + msg["errmsg"] = f"Error: missing dirs {missing_dirs}" + logger.info(f"{path} does not exist") else: # check existing dirs for correct ownership and permissions status = os.stat(path) mask = oct(status.st_mode)[-3:] uid = str(status.st_uid) gid = str(status.st_gid) - if mask!='700' or uid!=msg['uid'] or gid!=msg['gid']: - msg['success'] = False - msg['errmsg'] = f"Error: dir {path} permissions or ownership are wrong" + if ( + mask != "700" + or uid != msg["uid"] + or gid != msg["gid"] + ): + msg["success"] = False + msg[ + "errmsg" + ] = f"Error: dir {path} permissions or ownership are wrong" except Exception as exception: - msg['success'] = False - msg['errmsg'] = "Exception raised, check the logs for stack trace" - logger.error('', exc_info=True) + msg["success"] = False + msg["errmsg"] = "Exception raised, check the logs for stack trace" + logger.error("", exc_info=True) # send confirm message - rc_rmq.publish_msg({ - 'routing_key': 'confirm.' + username, - 'msg': msg - }) + rc_rmq.publish_msg( + {"routing_key": "confirm." + msg["queuename"], "msg": msg} + ) - logger.debug(f'User {username} confirmation sent') + logger.debug(f"User {username} confirmation sent") ch.basic_ack(delivery_tag=method.delivery_tag) -logger.info(f'Start listening to queue: {task}') -rc_rmq.start_consume({ - 'queue': task, - 'routing_key': "verify.*", - 'cb': dir_verify -}) +logger.info(f"Start listening to queue: {task}") +rc_rmq.start_consume( + {"queue": task, "routing_key": "verify.*", "cb": dir_verify} +) -logger.info('Disconnected') +logger.info("Disconnected") rc_rmq.disconnect() diff --git a/prod_rmq_agents/get-next-uid-gid.py b/prod_rmq_agents/get-next-uid-gid.py index 565501b7cf7dc1e371718ad01992e73fe516ce8c..5fbd7e4de50c9edbb4889b654ac4360e06dd9765 100644 --- a/prod_rmq_agents/get-next-uid-gid.py +++ b/prod_rmq_agents/get-next-uid-gid.py @@ -9,36 +9,41 @@ import rc_util from os import popen from rc_rmq import RCRMQ import rabbit_config as rcfg +from subprocess import run +import shlex -task = 'create_account' +task = "create_account" # Instantiate rabbitmq object -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) args = rc_util.get_args() # Logger logger = rc_util.get_logger() -#Account creation +# Account creation + + def create_account(msg): - logger.info(f'Account creation request received: {msg}') - username = msg['username'] - uid = msg['uid'] - email = msg['email'] - fullname = msg['fullname'] - msg['success'] = False + logger.info(f"Account creation request received: {msg}") + username = msg["username"] + uid = msg["uid"] + email = msg["email"] + fullname = msg["fullname"] + msg["success"] = False - # Bright command to create user - cmd = '/cm/local/apps/cmd/bin/cmsh -c ' + # Bright command to create user + cmd = "/cm/local/apps/cmd/bin/cmsh -c " cmd += f'"user; add {username}; set id {uid}; set email {email}; set commonname \\"{fullname}\\"; ' cmd += 'commit;"' if not args.dry_run: - popen(cmd) + run(shlex.split(cmd)) time.sleep(rcfg.Delay) - logger.info(f'Bright command to create user:{cmd}') + logger.info(f"Bright command to create user:{cmd}") + # Define your callback function def resolve_uid_gid(ch, method, properties, body): @@ -46,8 +51,8 @@ def resolve_uid_gid(ch, method, properties, body): # Retrieve message msg = json.loads(body) logger.info("Received {}".format(msg)) - username = msg['username'] - msg['success'] = False + username = msg["username"] + msg["success"] = False # Determine next available UID try: @@ -56,45 +61,45 @@ def resolve_uid_gid(ch, method, properties, body): if user_exists: logger.info("The user, {} already exists".format(username)) - msg['uid'] = user_exists.split(':')[2] - msg['gid'] = user_exists.split(':')[3] + msg["uid"] = user_exists.split(":")[2] + msg["gid"] = user_exists.split(":")[3] else: cmd_uid = "/usr/bin/getent passwd | \ - awk -F: '($3>10000) && ($3<20000) && ($3>maxuid) { maxuid=$3; } END { print maxuid+1; }'" - msg['uid'] = popen(cmd_uid).read().rstrip() + awk -F: 'BEGIN { maxuid=10000 } ($3>10000) && ($3<20000) && ($3>maxuid) { maxuid=$3; } END { print maxuid+1; }'" + msg["uid"] = popen(cmd_uid).read().rstrip() logger.info(f"UID query: {cmd_uid}") cmd_gid = "/usr/bin/getent group | \ - awk -F: '($3>10000) && ($3<20000) && ($3>maxgid) { maxgid=$3; } END { print maxgid+1; }'" - msg['gid'] = popen(cmd_gid).read().rstrip() + awk -F: 'BEGIN { maxgid=10000 } ($3>10000) && ($3<20000) && ($3>maxgid) { maxgid=$3; } END { print maxgid+1; }'" + msg["gid"] = popen(cmd_gid).read().rstrip() logger.info(f"GID query: {cmd_gid}") create_account(msg) - msg['task'] = task - msg['success'] = True + msg["task"] = task + msg["success"] = True except Exception as exception: - msg['success'] = False - msg['errmsg'] = f"Exception raised during account creation, check logs for stack trace" - logger.error('', exc_info=True) + msg["success"] = False + msg[ + "errmsg" + ] = f"Exception raised during account creation, check logs for stack trace" + logger.error("", exc_info=True) # Acknowledge message ch.basic_ack(delivery_tag=method.delivery_tag) # Send confirm message - logger.debug('rc_rmq.publish_msg()') - rc_rmq.publish_msg({ - 'routing_key': 'confirm.' + username, - 'msg': msg - }) - logger.info('confirmation sent') + logger.debug("rc_rmq.publish_msg()") + rc_rmq.publish_msg( + {"routing_key": "confirm." + msg["queuename"], "msg": msg} + ) + logger.info("confirmation sent") + logger.info("Start listening to queue: {}".format(task)) -rc_rmq.start_consume({ - 'queue': task, - 'routing_key': "request.*", - 'cb': resolve_uid_gid -}) +rc_rmq.start_consume( + {"queue": task, "routing_key": "request.*", "cb": resolve_uid_gid} +) logger.info("Disconnected") rc_rmq.disconnect() diff --git a/prod_rmq_agents/git_commit.py b/prod_rmq_agents/git_commit.py index ce6e35ada357065ee6419ce49ac1ebc1d87e5823..37aa34b014262a9e1b347db5f67d3ae1d0614a98 100644 --- a/prod_rmq_agents/git_commit.py +++ b/prod_rmq_agents/git_commit.py @@ -7,97 +7,127 @@ import rc_util from rc_rmq import RCRMQ import rabbit_config as rmq_cfg -task = 'git_commit' +task = "git_commit" # Instantiate rabbitmq object -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) # Define some location repo_location = os.path.expanduser(rmq_cfg.rc_users_ldap_repo_loc) -users_dir = repo_location + '/users' -groups_dir = repo_location + '/groups' +users_dir = repo_location + "/users" +groups_dir = repo_location + "/groups" args = rc_util.get_args() logger = rc_util.get_logger(args) if not args.dry_run: - git = sh.git.bake('--git-dir', repo_location+'/.git', '--work-tree', repo_location) - ldapsearch = sh.Command('ldapsearch') + git = sh.git.bake( + "--git-dir", repo_location + "/.git", "--work-tree", repo_location + ) + ldapsearch = sh.Command("ldapsearch") else: - git = sh.echo.bake('--git-dir', repo_location+'/.git', '--work-tree', repo_location) - ldapsearch = sh.echo.bake('ldapsearch') + git = sh.echo.bake( + "--git-dir", repo_location + "/.git", "--work-tree", repo_location + ) + ldapsearch = sh.echo.bake("ldapsearch") + def git_commit(ch, method, properties, body): msg = json.loads(body) - username = msg['username'] - msg['task'] = task - msg['success'] = False - branch_name = 'issue-add-users-' + username.lower() - user_ldif = users_dir + f'/{username}.ldif' - group_ldif = groups_dir + f'/{username}.ldif' + username = msg["username"] + msg["task"] = task + msg["success"] = False + branch_name = "issue-add-users-" + username.lower() + user_ldif = users_dir + f"/{username}.ldif" + group_ldif = groups_dir + f"/{username}.ldif" logger.info("Received: %s", msg) logger.debug("branch_name: %s", branch_name) try: - logger.debug('git checkout master') - git.checkout('master') - logger.debug('git pull') + logger.debug("git checkout master") + git.checkout("master") + logger.debug("git pull") git.pull() - branch_exists = git.branch('--list', branch_name) + branch_exists = git.branch("--list", branch_name) if not branch_exists: - logger.debug('git checkout -b %s', branch_name) - git.checkout('-b', branch_name) - logger.debug("open(%s, 'w'), open(%s, 'w')", user_ldif, group_ldif) - with open(user_ldif, 'w') as ldif_u,\ - open(group_ldif, 'w') as ldif_g: - logger.debug(f"ldapsearch -LLL -x -h ldapserver -b 'dc=cm,dc=cluster' uid={username} > {user_ldif}") - ldapsearch('-LLL', '-x', '-h', 'ldapserver', '-b', "dc=cm,dc=cluster", f"uid={username}", _out=ldif_u) - logger.debug(f"ldapsearch -LLL -x -h ldapserver -b 'ou=Group,dc=cm,dc=cluster' cn={username} > {group_ldif}") - ldapsearch('-LLL', '-x', '-h', 'ldapserver', '-b', "ou=Group,dc=cm,dc=cluster", f"cn={username}", _out=ldif_g) - logger.info('user ldif files generated.') - - logger.debug('git add %s', user_ldif) + logger.debug("git checkout -b %s", branch_name) + git.checkout("-b", branch_name) + logger.debug( + "open(%s, 'w'), open(%s, 'w')", user_ldif, group_ldif + ) + with open(user_ldif, "w") as ldif_u, open( + group_ldif, "w" + ) as ldif_g: + logger.debug( + f"ldapsearch -LLL -x -H ldaps://ldapserver -b 'dc=cm,dc=cluster' uid={username} > {user_ldif}" + ) + ldapsearch( + "-LLL", + "-x", + "-H", + "ldaps://ldapserver", + "-b", + "dc=cm,dc=cluster", + f"uid={username}", + _out=ldif_u, + ) + logger.debug( + f"ldapsearch -LLL -x -H ldapserver -b 'ou=Group,dc=cm,dc=cluster' cn={username} > {group_ldif}" + ) + ldapsearch( + "-LLL", + "-x", + "-H", + "ldaps://ldapserver", + "-b", + "ou=Group,dc=cm,dc=cluster", + f"cn={username}", + _out=ldif_g, + ) + logger.info("user ldif files generated.") + + logger.debug("git add %s", user_ldif) + git.add(user_ldif) - logger.debug('git add %s', group_ldif) + logger.debug("git add %s", group_ldif) git.add(group_ldif) - logger.debug("git commit -m 'Added new cheaha user: %s'", username) + logger.debug( + "git commit -m 'Added new cheaha user: %s'", username + ) git.commit(m="Added new cheaha user: " + username) - logger.debug('git checkout master') - git.checkout('master') + logger.debug("git checkout master") + git.checkout("master") - logger.debug('git merge %s --no-ff --no-edit', branch_name) - git.merge(branch_name, '--no-ff', '--no-edit') - logger.debug('git push origin master') - git.push('origin', 'master') + logger.debug("git merge %s --no-ff --no-edit", branch_name) + git.merge(branch_name, "--no-ff", "--no-edit") + logger.debug("git push origin master") + git.push("origin", "master") # merge with gitlab api - logger.info('Added ldif files and committed to git repo') + logger.info("Added ldif files and committed to git repo") - msg['success'] = True + msg["success"] = True except Exception as exception: - logger.error('', exc_info=True) + logger.error("", exc_info=True) # Send confirm message - logger.debug('rc_rmq.publish_msge()') - rc_rmq.publish_msg({ - 'routing_key': 'confirm.' + username, - 'msg': msg - }) - logger.info('confirmation sent') + logger.debug("rc_rmq.publish_msge()") + rc_rmq.publish_msg( + {"routing_key": "confirm." + msg["queuename"], "msg": msg} + ) + logger.info("confirmation sent") # Acknowledge message - logger.debug('ch.basic_ack()') + logger.debug("ch.basic_ack()") ch.basic_ack(delivery_tag=method.delivery_tag) logger.info("Start listening to queue: %s", task) -rc_rmq.start_consume({ - 'queue': task, - 'routing_key': "verify.*", - 'cb': git_commit -}) +rc_rmq.start_consume( + {"queue": task, "routing_key": "verify.*", "cb": git_commit} +) logger.info("Disconnected") rc_rmq.disconnect() diff --git a/prod_rmq_agents/notify_user.py b/prod_rmq_agents/notify_user.py index 431692b19a5b9d307c38e21c5e99adc1bb9210e6..92b442458a4c04d7695b32da897432f6225181f5 100644 --- a/prod_rmq_agents/notify_user.py +++ b/prod_rmq_agents/notify_user.py @@ -9,24 +9,25 @@ from jinja2 import Template from datetime import datetime import rabbit_config as rcfg import mail_config as mail_cfg -task = 'notify_user' + +task = "notify_user" args = rc_util.get_args() logger = rc_util.get_logger(args) -db = dataset.connect(f'sqlite:///.agent_db/user_reg.db') -table = db['users'] +db = dataset.connect(f"sqlite:///.agent_db/user_reg.db") +table = db["users"] # Instantiate rabbitmq object -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) # Email instruction to user def notify_user(ch, method, properties, body): msg = json.loads(body) - username = msg['username'] - user_email = msg['email'] - msg['task'] = task - msg['success'] = False + username = msg["username"] + user_email = msg["email"] + msg["task"] = task + msg["success"] = False errmsg = "" try: @@ -34,71 +35,77 @@ def notify_user(ch, method, properties, body): # Search username in database record = table.find_one(username=username) - if record['sent'] is not None: - errmsg = 'Updating database counter' + if record["sent"] is not None: + errmsg = "Updating database counter" # Update counter - count = record['count'] + count = record["count"] if args.dry_run: - logger.info('Update counter in database') + logger.info("Update counter in database") else: - table.update({ - 'username': username, - 'count': count + 1 - }, ['username']) + table.update( + {"username": username, "count": count + 1}, + ["username"], + ) - logger.debug(f'User {username} counter updated to {count + 1}') + logger.debug(f"User {username} counter updated to {count + 1}") else: # Send email to user receivers = [user_email, rcfg.Admin_email] - message = Template(mail_cfg.Whole_mail).render(username=username, to=user_email) + message = Template(mail_cfg.Whole_mail).render( + username=username, to=user_email + ) if args.dry_run: - logger.info(f'smtp = smtplib.SMTP({rcfg.Mail_server})') - logger.info(f'smtp.sendmail({rcfg.Sender}, {receivers}, message)') - logger.info(f"table.update({{'username': {username}, 'count': 1, 'sent_at': datetime.now()}}, ['username'])") + logger.info(f"smtp = smtplib.SMTP({rcfg.Mail_server})") + logger.info( + f"smtp.sendmail({rcfg.Sender}, {receivers}, message)" + ) + logger.info( + f"table.update({{'username': {username}, 'count': 1, 'sent_at': datetime.now()}}, ['username'])" + ) else: - errmsg = 'Sending email to user' + errmsg = "Sending email to user" smtp = smtplib.SMTP(rcfg.Mail_server) smtp.sendmail(rcfg.Sender, receivers, message) - logger.debug(f'Email sent to: {user_email}') + logger.debug(f"Email sent to: {user_email}") - errmsg = 'Updating database email sent time' - table.update({ - 'username': username, - 'count': 1, - 'sent': datetime.now() - }, ['username']) + errmsg = "Updating database email sent time" + table.update( + { + "username": username, + "count": 1, + "sent": datetime.now(), + }, + ["username"], + ) - logger.debug(f'User {username} inserted into database') + logger.debug(f"User {username} inserted into database") - msg['success'] = True + msg["success"] = True except Exception as exception: - logger.error('', exc_info=True) - msg['errmsg'] = errmsg if errmsg else 'Unexpected error' + logger.error("", exc_info=True) + msg["errmsg"] = errmsg if errmsg else "Unexpected error" # Send confirm message - rc_rmq.publish_msg({ - 'routing_key': 'confirm.' + username, - 'msg': msg - }) + rc_rmq.publish_msg( + {"routing_key": "confirm." + msg["queuename"], "msg": msg} + ) - logger.debug(f'User {username} confirmation sent') + logger.debug(f"User {username} confirmation sent") # Acknowledge the message ch.basic_ack(delivery_tag=method.delivery_tag) if __name__ == "__main__": - logger.info(f'Start listening to queue: {task}') - rc_rmq.start_consume({ - 'queue': task, - 'routing_key': "notify.*", - 'cb': notify_user - }) - - logger.info('Disconnected') + logger.info(f"Start listening to queue: {task}") + rc_rmq.start_consume( + {"queue": task, "routing_key": "notify.*", "cb": notify_user} + ) + + logger.info("Disconnected") rc_rmq.disconnect() diff --git a/prod_rmq_agents/subscribe_mail_lists.py b/prod_rmq_agents/subscribe_mail_lists.py index 236d0b5b9e6d64b0c0e4732b616796fd85718d8f..fcbc8c8e9c93c41ff66e569a83a8b475d72a534f 100644 --- a/prod_rmq_agents/subscribe_mail_lists.py +++ b/prod_rmq_agents/subscribe_mail_lists.py @@ -9,44 +9,45 @@ from email.message import EmailMessage from rc_rmq import RCRMQ import rabbit_config as rcfg -task = 'subscribe_mail_list' +task = "subscribe_mail_list" # Instantiate rabbitmq object -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) # Parse arguments args = rc_util.get_args() # Logger -logger = rc_util.get_logger()# Define your callback function +logger = rc_util.get_logger() # Define your callback function + def mail_list_subscription(ch, method, properties, body): # Retrieve message msg = json.loads(body) logger.info("Received msg {}".format(msg)) - username = msg['username'] - fullname = msg['fullname'] - email = msg['email'] + username = msg["username"] + fullname = msg["fullname"] + email = msg["email"] mail_list_admin = rcfg.Sender mail_list = rcfg.Mail_list mail_list_bcc = rcfg.Mail_list_bcc server = rcfg.Mail_server - listserv_cmd = f'QUIET ADD hpc-announce {email} {fullname} \ - \nQUIET ADD hpc-users {email} {fullname}' + listserv_cmd = f"QUIET ADD hpc-announce {email} {fullname} \ + \nQUIET ADD hpc-users {email} {fullname}" logger.info("Adding user{} to mail list".format(username)) - msg['success'] = False + msg["success"] = False try: # Create a text/plain message email_msg = EmailMessage() - email_msg['From'] = mail_list_admin - email_msg['To'] = mail_list - email_msg['Subject'] = '' - email_msg['Bcc'] = mail_list_bcc + email_msg["From"] = mail_list_admin + email_msg["To"] = mail_list + email_msg["Subject"] = "" + email_msg["Bcc"] = mail_list_bcc # Create an smtp object and send email s = smtplib.SMTP(server) @@ -54,31 +55,35 @@ def mail_list_subscription(ch, method, properties, body): email_msg.set_content(listserv_cmd) if not args.dry_run: s.send_message(email_msg) - logging.info(f'This email will add user {username} to listserv \n{email_msg}') + logging.info( + f"This email will add user {username} to listserv \n{email_msg}" + ) s.quit() - msg['task'] = task - msg['success'] = True + msg["task"] = task + msg["success"] = True except Exception as exception: - logger.error('', exc_info=True) + logger.error("", exc_info=True) # Acknowledge message ch.basic_ack(delivery_tag=method.delivery_tag) # send confirm message - logger.debug('rc_rmq.publish_msg()') - rc_rmq.publish_msg({ - 'routing_key': 'confirm.' + username, - 'msg': msg - }) - logger.info('confirmation sent') + logger.debug("rc_rmq.publish_msg()") + rc_rmq.publish_msg( + {"routing_key": "confirm." + msg["queuename"], "msg": msg} + ) + logger.info("confirmation sent") + logger.info("Start listening to queue: {}".format(task)) -rc_rmq.start_consume({ - 'queue': task, # Define your Queue name - 'routing_key': "verify.*", # Define your routing key - 'cb': mail_list_subscription # Pass in callback function you just define -}) +rc_rmq.start_consume( + { + "queue": task, # Define your Queue name + "routing_key": "verify.*", # Define your routing key + "cb": mail_list_subscription, # Pass in callback function you just define + } +) logger.info("Disconnected") rc_rmq.disconnect() diff --git a/prod_rmq_agents/task_manager.py b/prod_rmq_agents/task_manager.py index a65db78ac76c190b3bc30465c202548cb4fbd9eb..036540ac65c837881e74601eea4ab4d366be590c 100644 --- a/prod_rmq_agents/task_manager.py +++ b/prod_rmq_agents/task_manager.py @@ -12,57 +12,59 @@ from datetime import datetime import mail_config as mail_cfg import rabbit_config as rcfg -task = 'task_manager' +task = "task_manager" timeout = 30 args = rc_util.get_args() logger = rc_util.get_logger(args) -db = dataset.connect(f'sqlite:///.agent_db/user_reg.db') -table = db['users'] +db = dataset.connect(f"sqlite:///.agent_db/user_reg.db") +table = db["users"] record = { - 'uid': -1, - 'gid': -1, - 'email': '', - 'reason': '', - 'fullname': '', - 'last_update': datetime.now(), - 'errmsg': None, - 'waiting': set(), - 'request': { - 'create_account': None + "uid": -1, + "gid": -1, + "email": "", + "reason": "", + "fullname": "", + "last_update": datetime.now(), + "errmsg": None, + "waiting": set(), + "request": {"create_account": None}, + "verify": { + "git_commit": None, + "dir_verify": None, + "subscribe_mail_list": None, }, - 'verify': { - 'git_commit': None, - 'dir_verify': None, - 'subscribe_mail_list': None - }, - 'notify': { - 'notify_user': None - }, - 'reported': False + "notify": {"notify_user": None}, + "reported": False, } # Currently tracking users tracking = {} # Instantiate rabbitmq object -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) + def notify_admin(username, user_record): receivers = [rcfg.Admin_email] - result = "SUCCESS" if user_record["request"]["create_account"] and\ - user_record["verify"]["git_commit"] and\ - user_record["verify"]["dir_verify"] and\ - user_record["verify"]["subscribe_mail_list"] and\ - user_record["notify"]["notify_user"]\ - else "FAILED" + result = ( + "SUCCESS" + if user_record["request"]["create_account"] + and user_record["verify"]["git_commit"] + and user_record["verify"]["dir_verify"] + and user_record["verify"]["subscribe_mail_list"] + and user_record["notify"]["notify_user"] + else "FAILED" + ) - message = Template(mail_cfg.UserReportHead).render(username=username, fullname=user_record['fullname'], result=result) - if user_record['reported']: - message += ' (Duplicate)' + message = Template(mail_cfg.UserReportHead).render( + username=username, fullname=user_record["fullname"], result=result + ) + if user_record["reported"]: + message += " (Duplicate)" message += f""" \n User Creation Report for user {username} uid: {user_record["uid"]}, gid: {user_record["gid"]} @@ -73,24 +75,26 @@ def notify_admin(username, user_record): 'subscribe_mail_list': {user_record["verify"]["subscribe_mail_list"]} 'notify_user': {user_record["notify"]["notify_user"]} """ - if user_record['errmsg']: + if user_record["errmsg"]: message += """ Error(s): """ - for msg in user_record['errmsg']: + for msg in user_record["errmsg"]: message += msg + "\n" if args.dry_run: - logger.info(f'smtp = smtplib.SMTP({rcfg.Mail_server})') - logger.info(f'smtp.sendmail({rcfg.Sender}, {rcfg.Admin_email}, message)') + logger.info(f"smtp = smtplib.SMTP({rcfg.Mail_server})") + logger.info( + f"smtp.sendmail({rcfg.Sender}, {rcfg.Admin_email}, message)" + ) logger.info(message) else: smtp = smtplib.SMTP(rcfg.Mail_server) smtp.sendmail(rcfg.Sender, receivers, message) - logger.debug(f'User report sent to: {rcfg.Admin_email}') + logger.debug(f"User report sent to: {rcfg.Admin_email}") def insert_db(username, msg): @@ -99,34 +103,38 @@ def insert_db(username, msg): if not record: # SQL insert - table.insert({ - 'username': username, - 'uid': msg.get('uid', -1), - 'gid': msg.get('gid', -1), - 'email': msg.get('email', ''), - 'reason': msg.get('reason', ''), - 'fullname': msg.get('fullname', ''), - 'create_account': None, - 'git_commit': None, - 'dir_verify': None, - 'subscribe_mail_list': None, - 'notify_user': None, - 'sent': None, - 'reported': False, - 'last_update': datetime.now() - }) + table.insert( + { + "username": username, + "uid": msg.get("uid", -1), + "gid": msg.get("gid", -1), + "email": msg.get("email", ""), + "reason": msg.get("reason", ""), + "fullname": msg.get("fullname", ""), + "create_account": None, + "git_commit": None, + "dir_verify": None, + "subscribe_mail_list": None, + "notify_user": None, + "sent": None, + "reported": False, + "last_update": datetime.now(), + "queuename": msg.get("queuename", ""), + } + ) def update_db(username, data): - obj = { 'username': username, **data } - table.update(obj, ['username']) + obj = {"username": username, **data} + table.update(obj, ["username"]) def task_manager(ch, method, properties, body): msg = json.loads(body) - username = method.routing_key.split('.')[1] - task_name = msg['task'] - success = msg['success'] + queuename = method.routing_key.split(".")[1] + username = msg["username"] + task_name = msg["task"] + success = msg["success"] send = completed = terminated = False routing_key = "" @@ -137,133 +145,144 @@ def task_manager(ch, method, properties, body): user_db = table.find_one(username=username) current = tracking[username] = copy.deepcopy(record) - current['errmsg'] = [] - current['uid'] = user_db['uid'] if user_db else msg['uid'] - current['gid'] = user_db['gid'] if user_db else msg['gid'] - current['email'] = user_db['email'] if user_db else msg['email'] - current['reason'] = user_db['reason'] if user_db else msg['reason'] - current['fullname'] = user_db['fullname'] if user_db else msg['fullname'] + current["errmsg"] = [] + current["queuename"] = ( + user_db["queuename"] if user_db else msg["queuename"] + ) + current["uid"] = user_db["uid"] if user_db else msg["uid"] + current["gid"] = user_db["gid"] if user_db else msg["gid"] + current["email"] = user_db["email"] if user_db else msg["email"] + current["reason"] = user_db["reason"] if user_db else msg["reason"] + current["fullname"] = ( + user_db["fullname"] if user_db else msg["fullname"] + ) if user_db: # Restore task status - current['request']['create_account'] = user_db['create_account'] - current['verify']['git_commit'] = user_db['git_commit'] - current['verify']['dir_verify'] = user_db['dir_verify'] - current['verify']['subscribe_mail_list'] = user_db['subscribe_mail_list'] - current['notify']['notify_user'] = user_db['notify_user'] - - current['reported'] = user_db['reported'] - - for t in ['git_commit', 'dir_verify', 'subscribe_mail_list']: + current["request"]["create_account"] = user_db[ + "create_account" + ] + current["verify"]["git_commit"] = user_db["git_commit"] + current["verify"]["dir_verify"] = user_db["dir_verify"] + current["verify"]["subscribe_mail_list"] = user_db[ + "subscribe_mail_list" + ] + current["notify"]["notify_user"] = user_db["notify_user"] + + current["reported"] = user_db["reported"] + + for t in ["git_commit", "dir_verify", "subscribe_mail_list"]: if user_db[t] is None: - current['waiting'].add(t) + current["waiting"].add(t) - if not current['waiting'] and user_db['notify_user'] is None: - current['waiting'].add('notify_user') + if not current["waiting"] and user_db["notify_user"] is None: + current["waiting"].add("notify_user") - logger.debug(f'Loaded user {username} from DB') + logger.debug(f"Loaded user {username} from DB") else: insert_db(username, msg) - logger.debug(f'Tracking user {username}') + logger.debug(f"Tracking user {username}") - current['last_update'] = datetime.now() + current["last_update"] = datetime.now() # Update Database - update_db(username, { - task_name: success, - 'last_update': current['last_update']} + update_db( + username, + {task_name: success, "last_update": current["last_update"]}, ) # Save error message if the task was failed if not success: - errmsg = msg.get('errmsg', '') + errmsg = msg.get("errmsg", "") if errmsg: - current['errmsg'].append(f"{task_name}: {errmsg}") + current["errmsg"].append(f"{task_name}: {errmsg}") # Define message that's going to be published message = { - 'username': username, - 'uid': current['uid'], - 'gid': current['gid'], - 'email': current['email'], - 'reason': current['reason'], - 'fullname': current['fullname'] + "username": username, + "queuename": queuename, + "uid": current["uid"], + "gid": current["gid"], + "email": current["email"], + "reason": current["reason"], + "fullname": current["fullname"], } try: - if task_name in current['request']: - current['request'][task_name] = success - routing_key = 'verify.' + username + if task_name in current["request"]: + current["request"][task_name] = success + routing_key = "verify." + queuename # Terminate the process if failed if not success: terminated = True - routing_key = 'complete.' + username - message['success'] = False - message['errmsg'] = current['errmsg'] + routing_key = "complete." + queuename + message["success"] = False + message["errmsg"] = current["errmsg"] send = True - current['waiting'] = {'git_commit', 'dir_verify', 'subscribe_mail_list'} - logger.debug(f'Request level {task_name}? {success}') - - elif task_name in current['verify']: - current['verify'][task_name] = success - current['waiting'].discard(task_name) - routing_key = 'notify.' + username - - if not current['waiting']: + current["waiting"] = { + "git_commit", + "dir_verify", + "subscribe_mail_list", + } + logger.debug(f"Request level {task_name}? {success}") + + elif task_name in current["verify"]: + current["verify"][task_name] = success + current["waiting"].discard(task_name) + routing_key = "notify." + queuename + + if not current["waiting"]: send = True - current['waiting'] = {'notify_user'} + current["waiting"] = {"notify_user"} # Terminate if dir_verify failed and all agents has responsed - if send and not current['verify']['dir_verify']: + if send and not current["verify"]["dir_verify"]: terminated = True - routing_key = 'complete.' + username - message['success'] = False - message['errmsg'] = current['errmsg'] + routing_key = "complete." + queuename + message["success"] = False + message["errmsg"] = current["errmsg"] - logger.debug(f'Verify level {task_name}? {success}') + logger.debug(f"Verify level {task_name}? {success}") - elif task_name in current['notify']: - current['notify'][task_name] = success - current['waiting'].discard(task_name) - routing_key = 'complete.' + username - message['success'] = success - message['errmsg'] = current['errmsg'] + elif task_name in current["notify"]: + current["notify"][task_name] = success + current["waiting"].discard(task_name) + routing_key = "complete." + queuename + message["success"] = success + message["errmsg"] = current["errmsg"] send = True # The whole creation process has completed completed = True - logger.debug(f'Notify level {task_name}? {success}') + logger.debug(f"Notify level {task_name}? {success}") except Exception as exception: - logger.error('', exc_info=True) + logger.error("", exc_info=True) if send: # Send trigger message - rc_rmq.publish_msg({ - 'routing_key': routing_key, - 'msg': message - }) + rc_rmq.publish_msg({"routing_key": routing_key, "msg": message}) logger.debug(f"Trigger message '{routing_key}' sent") - logger.debug('Previous level messages acknowledged') + logger.debug("Previous level messages acknowledged") # Send report to admin if completed or terminated: notify_admin(username, current) - update_db(username, {'reported': True}) + update_db(username, {"reported": True}) tracking.pop(username) - logger.debug('Admin report sent') + logger.debug("Admin report sent") # Acknowledge message ch.basic_ack(method.delivery_tag) @@ -272,22 +291,27 @@ def task_manager(ch, method, properties, body): def timeout_handler(signum, frame): current_time = datetime.now() for user in tuple(tracking): - delta = current_time - tracking[user]['last_update'] + delta = current_time - tracking[user]["last_update"] if delta.seconds > timeout: - rc_rmq.publish_msg({ - 'routing_key': 'complete.' + user, - 'msg': { - 'username': user, - 'success': False, - 'errmsg': ["Timeout on " + ', '.join(tracking[user]['waiting'])] + rc_rmq.publish_msg( + { + "routing_key": "complete." + user, + "msg": { + "username": user, + "success": False, + "errmsg": [ + "Timeout on " + + ", ".join(tracking[user]["waiting"]) + ], + }, } - }) + ) notify_admin(user, tracking[user]) - update_db(user, {'reported': True}) + update_db(user, {"reported": True}) tracking.pop(user) @@ -296,12 +320,10 @@ def timeout_handler(signum, frame): signal.signal(signal.SIGALRM, timeout_handler) signal.setitimer(signal.ITIMER_REAL, timeout, timeout) -logger.info(f'Start listening to queue: {task}') -rc_rmq.start_consume({ - 'queue': task, - 'routing_key': "confirm.*", - 'cb': task_manager -}) +logger.info(f"Start listening to queue: {task}") +rc_rmq.start_consume( + {"queue": task, "routing_key": "confirm.*", "cb": task_manager} +) -logger.info('Disconnected') +logger.info("Disconnected") rc_rmq.disconnect() diff --git a/prod_rmq_agents/user_reg_event_logger.py b/prod_rmq_agents/user_reg_event_logger.py index 02c12483ce3540e8f36c2d902f998c6e1ba50f9b..63b6f5bfb0e557a3030631c70d221b2d45aeef9e 100644 --- a/prod_rmq_agents/user_reg_event_logger.py +++ b/prod_rmq_agents/user_reg_event_logger.py @@ -3,10 +3,10 @@ import sys import json from rc_rmq import RCRMQ -task = 'user_reg_event_log' +task = "user_reg_event_log" # Instantiate rabbitmq object -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) # Define your callback function def log_user_reg_events(ch, method, properties, body): @@ -18,7 +18,9 @@ def log_user_reg_events(ch, method, properties, body): routing_key = method.routing_key action = routing_key.split(".")[0] user = routing_key.split(".")[1] - print(f'Got a {action} message for {user} with routing key: {routing_key}') + print( + f"Got a {action} message for {user} with routing key: {routing_key}" + ) print(msg) # Acknowledge message @@ -26,8 +28,10 @@ def log_user_reg_events(ch, method, properties, body): print("Start listening to queue: {}".format(task)) -rc_rmq.start_consume({ - 'queue': task, # Define your Queue name - 'routing_key': "#", # Define your routing key - 'cb': log_user_reg_events # Pass in callback function you just define -}) +rc_rmq.start_consume( + { + "queue": task, # Define your Queue name + "routing_key": "#", # Define your routing key + "cb": log_user_reg_events, # Pass in callback function you just define + } +) diff --git a/prod_rmq_agents/user_reg_logger.py b/prod_rmq_agents/user_reg_logger.py index 402c9e319584d36560287be9e14c55d16ab509e6..19997096e1ab82650146d09fadacf0c62328e5f3 100755 --- a/prod_rmq_agents/user_reg_logger.py +++ b/prod_rmq_agents/user_reg_logger.py @@ -6,11 +6,11 @@ import rc_util from rc_rmq import RCRMQ from datetime import datetime -# Define queue name -task = 'reg_logger' +# Define queue name +task = "reg_logger" # Instantiate rabbitmq object -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) # Parse arguments args = rc_util.get_args() @@ -19,25 +19,23 @@ args = rc_util.get_args() logger = rc_util.get_logger() # Open registry table in DB -db = dataset.connect('sqlite:///reg_logger.db') -account_req_table = db['registry'] +db = dataset.connect("sqlite:///.agent_db/reg_logger.db") +account_req_table = db["registry"] # Define registration logger callback def log_registration(ch, method, properties, body): account_req = json.loads(body) - account_req['req_time'] = datetime.now(), + account_req["req_time"] = datetime.now() account_req_table.insert(account_req) - logger.info("logged account request for %s", account_req['username']) + logger.info("logged account request for %s", account_req["username"]) ch.basic_ack(delivery_tag=method.delivery_tag) + logger.info("Start listening to queue: {}".format(task)) # Start consuming messages from queue with callback function -rc_rmq.start_consume({ - 'queue': task, - 'routing_key': "create.*", - 'cb': log_registration -}) - +rc_rmq.start_consume( + {"queue": task, "routing_key": "request.*", "cb": log_registration} +) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000000000000000000000000000000000000..1d15ad7d9a23fece8d2321b3f09abba072fa4c6e --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,3 @@ +[tool.black] +line-length=79 +target-version=['py36'] diff --git a/rabbit_config.py.example b/rabbit_config.py.example index ca94f8aaa8dcb2ad9fe98bb4665099e8832ce2a6..21386da2c7dfadaec5804114ee27f12c18dc4608 100644 --- a/rabbit_config.py.example +++ b/rabbit_config.py.example @@ -24,6 +24,7 @@ Subject = 'New User Account' Info_url = 'https://www.google.com' Mail_list = 'root@localhost' Mail_list_bcc = 'cmsupport@localhost' +Support_email = 'support@listserv.uab.edu' Head = f"""From: {Sender_alias} <{Sender}> To: <{{{{ to }}}}> diff --git a/rc_rmq.py b/rc_rmq.py index 37afa3f44f872454a3b96fccf2d5a6a538fe01cf..daa61c237e77662b86238b29c18e2afe5ba6c001 100644 --- a/rc_rmq.py +++ b/rc_rmq.py @@ -3,15 +3,16 @@ import pika import socket import rabbit_config as rcfg + class RCRMQ(object): - USER = 'guest' - PASSWORD = 'guest' - HOST = 'localhost' + USER = "guest" + PASSWORD = "guest" + HOST = "localhost" PORT = 5672 - VHOST = '/' - EXCHANGE = '' - EXCHANGE_TYPE = 'direct' + VHOST = "/" + EXCHANGE = "" + EXCHANGE_TYPE = "direct" QUEUE = None DURABLE = True ROUTING_KEY = None @@ -19,10 +20,10 @@ class RCRMQ(object): def __init__(self, config=None, debug=False): if config: - if 'exchange' in config: - self.EXCHANGE = config['exchange'] - if 'exchange_type' in config: - self.EXCHANGE_TYPE = config['exchange_type'] + if "exchange" in config: + self.EXCHANGE = config["exchange"] + if "exchange_type" in config: + self.EXCHANGE_TYPE = config["exchange_type"] hostname = socket.gethostname().split(".", 1)[0] @@ -34,7 +35,8 @@ class RCRMQ(object): self.DEBUG = debug if self.DEBUG: - print(""" + print( + """ Created RabbitMQ instance with: Exchange name: {}, Exchange type: {}, @@ -42,34 +44,52 @@ class RCRMQ(object): User: {}, VHost: {}, Port: {} - """.format(self.EXCHANGE, self.EXCHANGE_TYPE, self.HOST, self.USER, self.VHOST, self.PORT)) + """.format( + self.EXCHANGE, + self.EXCHANGE_TYPE, + self.HOST, + self.USER, + self.VHOST, + self.PORT, + ) + ) self._consumer_tag = None self._connection = None self._consuming = False self._channel = None self._parameters = pika.ConnectionParameters( - self.HOST, - self.PORT, - self.VHOST, - pika.PlainCredentials(self.USER, self.PASSWORD)) + self.HOST, + self.PORT, + self.VHOST, + pika.PlainCredentials(self.USER, self.PASSWORD), + ) def connect(self): if self.DEBUG: - print("Connecting...\n" + "Exchange: " + self.EXCHANGE + " Exchange type: " + self.EXCHANGE_TYPE) + print( + "Connecting...\n" + + "Exchange: " + + self.EXCHANGE + + " Exchange type: " + + self.EXCHANGE_TYPE + ) self._connection = pika.BlockingConnection(self._parameters) self._channel = self._connection.channel() self._channel.exchange_declare( - exchange=self.EXCHANGE, - exchange_type=self.EXCHANGE_TYPE, - durable=True) + exchange=self.EXCHANGE, + exchange_type=self.EXCHANGE_TYPE, + durable=True, + ) def bind_queue(self): self._channel.queue_declare(queue=self.QUEUE, durable=self.DURABLE) - self._channel.queue_bind(exchange=self.EXCHANGE, - queue=self.QUEUE, - routing_key=self.ROUTING_KEY) + self._channel.queue_bind( + exchange=self.EXCHANGE, + queue=self.QUEUE, + routing_key=self.ROUTING_KEY, + ) def disconnect(self): self._channel.close() @@ -80,32 +100,43 @@ class RCRMQ(object): self._channel.queue_delete(self.QUEUE) def publish_msg(self, obj): - if 'routing_key' in obj: - self.ROUTING_KEY = obj['routing_key'] + if "routing_key" in obj: + self.ROUTING_KEY = obj["routing_key"] if self._connection is None: self.connect() - self._channel.basic_publish(exchange=self.EXCHANGE, - routing_key=self.ROUTING_KEY, - body=json.dumps(obj['msg'])) + self._channel.basic_publish( + exchange=self.EXCHANGE, + routing_key=self.ROUTING_KEY, + body=json.dumps(obj["msg"]), + ) def start_consume(self, obj): - if 'queue' in obj: - self.QUEUE = obj['queue'] - self.ROUTING_KEY = obj['routing_key'] if 'routing_key' in obj else self.QUEUE - if 'durable' in obj: - self.DURABLE = obj['durable'] + if "queue" in obj: + self.QUEUE = obj["queue"] + self.ROUTING_KEY = ( + obj["routing_key"] if "routing_key" in obj else self.QUEUE + ) + if "durable" in obj: + self.DURABLE = obj["durable"] if self.DEBUG: - print("Queue: " + self.QUEUE + "\nRouting_key: " + self.ROUTING_KEY) + print( + "Queue: " + + self.QUEUE + + "\nRouting_key: " + + self.ROUTING_KEY + ) if self._connection is None: self.connect() self.bind_queue() - self._consumer_tag = self._channel.basic_consume(self.QUEUE,obj['cb']) + self._consumer_tag = self._channel.basic_consume( + self.QUEUE, obj["cb"] + ) self._consuming = True try: self._channel.start_consuming() diff --git a/rc_util.py b/rc_util.py index e16fe306583750d200fb56de6ff9408ea55a70e7..0741cd18fb4f3cf91321c88fe4af0a11e6532a3d 100644 --- a/rc_util.py +++ b/rc_util.py @@ -2,61 +2,83 @@ import logging import argparse from rc_rmq import RCRMQ import json +from urllib.parse import quote +from time import sleep + +rc_rmq = RCRMQ({"exchange": "RegUsr", "exchange_type": "topic"}) +tasks = { + "create_account": None, + "git_commit": None, + "dir_verify": None, + "subscribe_mail_list": None, + "notify_user": None, +} +logger_fmt = "%(asctime)s [%(module)s] - %(message)s" + + +def add_account(username, queuename, email, full="", reason=""): + rc_rmq.publish_msg( + { + "routing_key": "request." + queuename, + "msg": { + "username": username, + "email": email, + "fullname": full, + "reason": reason, + "queuename": queuename, + }, + } + ) + rc_rmq.disconnect() -rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'}) -tasks = {'create_account': None, 'git_commit': None, 'dir_verify': None, 'subscribe_mail_list': None, 'notify_user': None} -logger_fmt = '%(asctime)s [%(module)s] - %(message)s' - -def add_account(username, email, full='', reason=''): - rc_rmq.publish_msg({ - 'routing_key': 'request.' + username, - 'msg': { - "username": username, - "email": email, - "fullname": full, - "reason": reason - } - }) - rc_rmq.disconnect() def worker(ch, method, properties, body): msg = json.loads(body) - username = msg['username'] + username = msg["username"] - if msg['success']: - print(f'Account for {username} has been created.') + if msg["success"]: + print(f"Account for {username} has been created.") else: print(f"There's some issue while creating account for {username}") - errmsg = msg.get('errmsg', []) + errmsg = msg.get("errmsg", []) for err in errmsg: print(err) rc_rmq.stop_consume() rc_rmq.delete_queue() -def consume(username, routing_key='', callback=worker, debug=False): - if routing_key == '': - routing_key = 'complete.' + username + +def consume(queuename, routing_key="", callback=worker, debug=False): + if routing_key == "": + routing_key = "complete." + queuename if debug: sleep(5) else: - rc_rmq.start_consume({ - 'queue': username, - 'routing_key': routing_key, - 'cb': callback - }) + rc_rmq.start_consume( + { + "queue": queuename, + "routing_key": routing_key, + "cb": callback, + } + ) rc_rmq.disconnect() - return { 'success' : True } + return {"success": True} + def get_args(): # Parse arguments parser = argparse.ArgumentParser() - parser.add_argument('-v', '--verbose', action='store_true', help='verbose output') - parser.add_argument('-n', '--dry-run', action='store_true', help='enable dry run mode') + parser.add_argument( + "-v", "--verbose", action="store_true", help="verbose output" + ) + parser.add_argument( + "-n", "--dry-run", action="store_true", help="enable dry run mode" + ) return parser.parse_args() + def get_logger(args=None): if args is None: args = get_args() @@ -72,3 +94,9 @@ def get_logger(args=None): logging.basicConfig(format=logger_fmt, level=logger_lvl) return logging.getLogger(__name__) + +def encode_name(uname): + uname_quote = quote(uname) + if "." in uname_quote: + uname_quote = uname_quote.replace(".", "%2E") + return uname_quote diff --git a/requirements.txt b/requirements.txt index 562cb606a776a6080f5acab0f53490d766dd2a2b..e62954e164da5deb8aec7508d6a940265f40d2da 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,5 @@ pika==1.1.0 pyldap==3.0.0.post1 dataset==1.3.1 Jinja2==2.11.2 -sh==1.12.14 \ No newline at end of file +sh==1.12.14 +pre-commit==2.12.1