Skip to content
Snippets Groups Projects

Feature task manager

Merged Bo-Chun Chen requested to merge github/fork/diedpigs/feat-task-manager into feat-cod-rmq
1 file
+ 133
0
Compare changes
  • Side-by-side
  • Inline
task_manager.py 0 → 100644
+ 133
0
 
#!/usr/bin/env python
 
import sys
 
import json
 
import rc_util
 
from rc_rmq import RCRMQ
 
from datetime import datetime
 
 
task = 'task_manager'
 
 
args = rc_util.get_args()
 
logger = rc_util.get_logger(args)
 
 
record = {
 
'uid': -1,
 
'gid': -1,
 
'email': '',
 
'fullname': '',
 
'last_update': datetime.now(),
 
'request': {
 
'get_next_uid_gid': None
 
},
 
'create': {
 
'subscribe_mail_list': None,
 
'bright_account': None
 
},
 
'verify': {
 
'git_commit': None,
 
'dir_verify': None
 
},
 
'notify': {
 
'notify_user': None
 
},
 
'delivery_tags': None
 
}
 
 
# Currently tracking users
 
tracking = {}
 
 
# Instantiate rabbitmq object
 
rc_rmq = RCRMQ({'exchange': 'RegUsr', 'exchange_type': 'topic'})
 
 
def task_manager(ch, method, properties, body):
 
msg = json.loads(body)
 
username = method.routing_key.split('.')[1]
 
task_name = msg['task']
 
done = success = msg['success']
 
routing_key = ""
 
 
if username not in tracking:
 
current = tracking[username] = record.copy()
 
current['delivery_tags'] = []
 
current['uid'] = msg.get('uid', -1)
 
current['gid'] = msg.get('gid', -1)
 
current['email'] = msg.get('email', '')
 
current['fullname'] = msg.get('fullname', '')
 
 
logger.debug(f'Tracking user {username}')
 
else:
 
current = tracking[username]
 
 
# Save the delivery tags for future use
 
current['delivery_tags'].append(method.delivery_tag)
 
 
try:
 
if task_name in current['request']:
 
current['request'][task_name] = success
 
routing_key = 'create.' + username
 
done = success
 
 
logger.debug(f'Request level task(s) done?{done}')
 
 
elif task_name in current['create']:
 
current['create'][task_name] = success
 
routing_key = 'verify.' + username
 
done = True
 
for status in current['create'].values():
 
if status is not True:
 
done = False
 
 
logger.debug(f'Create level task(s) done?{done}')
 
 
elif task_name in current['verify']:
 
current['verify'][task_name] = success
 
routing_key = 'notify.' + username
 
done = True
 
for status in current['verify'].values():
 
if status is not True:
 
done = False
 
 
logger.debug(f'Verify level task(s) done?{done}')
 
 
elif task_name in current['notify']:
 
current['verify'][task_name] = success
 
routing_key = 'complete.' + username
 
done = success
 
 
logger.debug(f'Notify level task(s) done?{done}')
 
 
except Exception as exception:
    • Created by: rtripath89

      done=False variable needs to be defined outside if-else statements, else it gives the following error:

      2020-04-24 17:07:50,009 [blocking_connection] - Created channel=1
      Traceback (most recent call last):
        File "task_manager.py", line 126, in <module>
          'cb': task_manager
        File "/opt/rabbitmq_agents/rc_rmq.py", line 111, in start_consume
          self._channel.start_consuming()
        File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 1866, in start_consuming
          self._process_data_events(time_limit=None)
        File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2027, in _process_data_events
          self.connection.process_data_events(time_limit=time_limit)
        File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 834, in process_data_events
          self._dispatch_channel_events()
        File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 566, in _dispatch_channel_events
          impl_channel._get_cookie()._dispatch_events()
        File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 1494, in _dispatch_events
          evt.properties, evt.body)
        File "task_manager.py", line 100, in task_manager
          if done:
      UnboundLocalError: local variable 'done' referenced before assignment
Please register or sign in to reply
 
logger.error('', exc_info=True)
 
 
if done:
 
# Send trigger message
 
rc_rmq.publish_msg({
 
'routing_key': routing_key,
 
'msg': {
 
'username': username,
 
'fullname': current['fullname'],
 
'email': current['email'],
 
'uid': current['uid'],
 
'gid': current['gid']
 
}
 
})
 
 
logger.debug(f"Trigger message '{routing_key}' sent")
 
 
# Acknowledge all message from last level
 
for tag in current['delivery_tags']:
 
ch.basic_ack(tag)
 
current['delivery_tags'] = []
 
 
logger.debug('Previous level messages acknowledged')
 
 
 
logger.info(f'Start listening to queue: {task}')
 
rc_rmq.start_consume({
 
'queue': task,
 
'routing_key': "confirm.*",
 
'cb': task_manager
 
})
 
 
logger.info('Disconnected')
 
rc_rmq.disconnect()
Loading