Skip to content
Snippets Groups Projects

Feat rc_util add function

Closed Bo-Chun Chen requested to merge github/fork/diedpigs/feat-util-add-function into develop
1 file
+ 38
4
Compare changes
  • Side-by-side
  • Inline
+ 38
4
@@ -12,14 +12,16 @@ class RCRMQ(object):
@@ -12,14 +12,16 @@ class RCRMQ(object):
VHOST = '/'
VHOST = '/'
EXCHANGE = ''
EXCHANGE = ''
EXCHANGE_TYPE = 'direct'
EXCHANGE_TYPE = 'direct'
QUEUE = ''
QUEUE = None
 
DURABLE = False
 
ROUTING_KEY = None
def __init__(self, config=None):
def __init__(self, config=None):
if config:
if config:
if 'exchange' in config:
if 'exchange' in config:
self.EXCHANGE = config['exchange']
self.EXCHANGE = config['exchange']
if 'queue' in config:
if 'exchange_type' in config:
self.QUEUE = config['queue']
self.EXCHANGE_TYPE = config['exchange_type']
hostname = socket.gethostname().split(".", 1)[0]
hostname = socket.gethostname().split(".", 1)[0]
@@ -42,11 +44,43 @@ class RCRMQ(object):
@@ -42,11 +44,43 @@ class RCRMQ(object):
exchange_type=self.EXCHANGE_TYPE,
exchange_type=self.EXCHANGE_TYPE,
durable=True)
durable=True)
 
if self.QUEUE is not None:
 
self._channel.queue_declare(queue=self.QUEUE, durable=self.DURABLE)
 
self._channel.queue_bind(exchange=self.EXCHANGE,
 
queue=self.QUEUE,
 
routing_key=self.ROUTING_KEY)
 
def disconnect(self):
def disconnect(self):
 
self._channel.close()
self._connection.close()
self._connection.close()
def publish_msg(self, obj):
def publish_msg(self, obj):
 
if 'routing_key' in obj:
 
self.ROUTING_KEY = obj['routing_key']
 
 
self.connect()
 
 
self._channel.basic_publish(exchange=self.EXCHANGE,
 
routing_key=self.ROUTING_KEY,
 
body=json.dumps(obj['msg']))
 
 
self.disconnect()
 
 
def start_consume(self, obj):
 
if 'queue' in obj:
 
self.QUEUE = obj['queue']
 
self.ROUTING_KEY = obj['routing_key'] if 'routing_key' in obj else self.QUEUE
 
if 'durable' in obj:
 
self.DURABLE = obj['durable']
 
self.connect()
self.connect()
self._channel.basic_publish(exchange=self.EXCHANGE, routing_key=obj['routing_key'], body=json.dumps(obj['msg']))
 
self._consumer_tag = self._channel.basic_consume(self.QUEUE,obj['cb'])
 
self._channel.start_consuming()
 
self.disconnect()
self.disconnect()
 
def stop_consume(self):
 
self._channel.basic_cancel(self._consumer_tag)
 
if not self.DURABLE:
 
self._channel.queue_delete(self.QUEUE)
Loading