Skip to content
Snippets Groups Projects
rc_rmq.py 4 KiB
Newer Older
Bo-Chun Chen's avatar
Bo-Chun Chen committed
import json
import pika
import socket
import rabbit_config as rcfg
Bo-Chun Chen's avatar
Bo-Chun Chen committed

Bo-Chun Chen's avatar
Bo-Chun Chen committed
class RCRMQ(object):
Bo-Chun Chen's avatar
Bo-Chun Chen committed

    USER = "guest"
    PASSWORD = "guest"
    HOST = "localhost"
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    PORT = 5672
    VHOST = "/"
    EXCHANGE = ""
    EXCHANGE_TYPE = "direct"
    DEBUG = False
Bo-Chun Chen's avatar
Bo-Chun Chen committed

    def __init__(self, config=None, debug=False):
Bo-Chun Chen's avatar
Bo-Chun Chen committed
        if config:
            if "exchange" in config:
                self.EXCHANGE = config["exchange"]
            if "exchange_type" in config:
                self.EXCHANGE_TYPE = config["exchange_type"]
Bo-Chun Chen's avatar
Bo-Chun Chen committed

        hostname = socket.gethostname().split(".", 1)[0]

        self.HOST = rcfg.Server if hostname != rcfg.Server else "localhost"
        self.USER = rcfg.User
        self.PASSWORD = rcfg.Password
        self.VHOST = rcfg.VHost
        self.PORT = rcfg.Port
        self.DEBUG = debug

        if self.DEBUG:
            print(
                """
            Created RabbitMQ instance with:
              Exchange name: {},
              Exchange type: {},
              Host: {},
              User: {},
              VHost: {},
              Port: {}
            """.format(
                    self.EXCHANGE,
                    self.EXCHANGE_TYPE,
                    self.HOST,
                    self.USER,
                    self.VHOST,
                    self.PORT,
                )
            )
        self._consumer_tag = None
        self._connection = None
        self._consuming = False
        self._channel = None
Bo-Chun Chen's avatar
Bo-Chun Chen committed
        self._parameters = pika.ConnectionParameters(
            self.HOST,
            self.PORT,
            self.VHOST,
            pika.PlainCredentials(self.USER, self.PASSWORD),
        )
Bo-Chun Chen's avatar
Bo-Chun Chen committed

    def connect(self):
        if self.DEBUG:
            print(
                "Connecting...\n"
                + "Exchange: "
                + self.EXCHANGE
                + " Exchange type: "
                + self.EXCHANGE_TYPE
            )
Bo-Chun Chen's avatar
Bo-Chun Chen committed
        self._connection = pika.BlockingConnection(self._parameters)
        self._channel = self._connection.channel()
        self._channel.exchange_declare(
            exchange=self.EXCHANGE,
            exchange_type=self.EXCHANGE_TYPE,
            durable=True,
        )
Bo-Chun Chen's avatar
Bo-Chun Chen committed

    def bind_queue(
        self, queue="", routing_key=None, durable=True, exclusive=False
    ):

        if self._connection is None:
            self.connect()

        result = self._channel.queue_declare(
            queue=queue, durable=durable, exclusive=exclusive
        )

        self._channel.queue_bind(
            exchange=self.EXCHANGE,
            queue=result.method.queue,
            routing_key=routing_key,
        return result.method.queue

Bo-Chun Chen's avatar
Bo-Chun Chen committed
    def disconnect(self):
        if self._connection:
            self._channel.close()
            self._connection.close()
            self._connection = None
Bo-Chun Chen's avatar
Bo-Chun Chen committed

    def delete_queue(self, queue):
        self._channel.queue_delete(queue)
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    def publish_msg(self, obj):
        routing_key = obj.get("routing_key")
        props = obj.get("props")
Bo-Chun Chen's avatar
Bo-Chun Chen committed
        if self._connection is None:
            self.connect()
        self._channel.basic_publish(
            exchange=self.EXCHANGE,
            routing_key=routing_key,
            properties=props,
            body=json.dumps(obj["msg"]),
        )

    def start_consume(self, obj):
        queue = obj.get("queue", "")
        routing_key = obj.get("routing_key", queue or None)
        durable = obj.get("durable", True)
        exclusive = obj.get("exclusive", False)
        bind = obj.get("bind", True)
Bo-Chun Chen's avatar
Bo-Chun Chen committed
        if self._connection is None:
            self.connect()
        if bind:
            self.bind_queue(queue, routing_key, durable, exclusive)

        if self.DEBUG:
            print("Queue: " + queue + "\nRouting_key: " + routing_key)
        self._consumer_tag = self._channel.basic_consume(queue, obj["cb"])
Bo-Chun Chen's avatar
Bo-Chun Chen committed
        self._consuming = True
        try:
            self._channel.start_consuming()
        except KeyboardInterrupt:
            self._channel.stop_consuming()

    def stop_consume(self):
        self._channel.basic_cancel(self._consumer_tag)