Skip to content
Snippets Groups Projects
rc_rmq.py 3.75 KiB
Newer Older
Bo-Chun Chen's avatar
Bo-Chun Chen committed
import json
import socket

import pika

import rabbit_config as rcfg
Bo-Chun Chen's avatar
Bo-Chun Chen committed

class RCRMQ:
    """
    Main RC rabbitmq class that handles connection
    """
Bo-Chun Chen's avatar
Bo-Chun Chen committed

    EXCHANGE = ""
    EXCHANGE_TYPE = "direct"
    DEBUG = False
Bo-Chun Chen's avatar
Bo-Chun Chen committed

    def __init__(self, config=None, debug=False):
Bo-Chun Chen's avatar
Bo-Chun Chen committed
        if config:
            if "exchange" in config:
                self.EXCHANGE = config["exchange"]
            if "exchange_type" in config:
                self.EXCHANGE_TYPE = config["exchange_type"]
Bo-Chun Chen's avatar
Bo-Chun Chen committed

        hostname = socket.gethostname().split(".", 1)[0]

        host = rcfg.Server if hostname != rcfg.Server else "localhost"
        user = rcfg.User
        password = rcfg.Password
        vhost = rcfg.VHost
        port = rcfg.Port
        self.DEBUG = debug

        if self.DEBUG:
            print(
            Created RabbitMQ instance with:
              Exchange name: {self.EXCHANGE},
              Exchange type: {self.EXCHANGE_TYPE},
              Host: {host},
              User: {user},
              VHost: {vhost},
              Port: {port}
        self._consumer_tag = None
        self._connection = None
        self._consuming = False
        self._channel = None
Bo-Chun Chen's avatar
Bo-Chun Chen committed
        self._parameters = pika.ConnectionParameters(
            host,
            port,
            vhost,
            pika.PlainCredentials(user, password),
Bo-Chun Chen's avatar
Bo-Chun Chen committed

    def connect(self):
        if self.DEBUG:
            print(
                "Connecting...\n"
                + "Exchange: "
                + self.EXCHANGE
                + " Exchange type: "
                + self.EXCHANGE_TYPE
            )
Bo-Chun Chen's avatar
Bo-Chun Chen committed
        self._connection = pika.BlockingConnection(self._parameters)
        self._channel = self._connection.channel()
        self._channel.exchange_declare(
            exchange=self.EXCHANGE,
            exchange_type=self.EXCHANGE_TYPE,
            durable=True,
        )
Bo-Chun Chen's avatar
Bo-Chun Chen committed

    def bind_queue(
        self, queue="", routing_key=None, durable=True, exclusive=False
    ):

        if self._connection is None:
            self.connect()

        result = self._channel.queue_declare(
            queue=queue, durable=durable, exclusive=exclusive
        )

        self._channel.queue_bind(
            exchange=self.EXCHANGE,
            queue=result.method.queue,
            routing_key=routing_key,
        return result.method.queue

Bo-Chun Chen's avatar
Bo-Chun Chen committed
    def disconnect(self):
        if self._connection:
            self._channel.close()
            self._connection.close()
            self._connection = None
Bo-Chun Chen's avatar
Bo-Chun Chen committed

    def delete_queue(self, queue):
        self._channel.queue_delete(queue)
Bo-Chun Chen's avatar
Bo-Chun Chen committed
    def publish_msg(self, obj):
        routing_key = obj.get("routing_key")
        props = obj.get("props")
Bo-Chun Chen's avatar
Bo-Chun Chen committed
        if self._connection is None:
            self.connect()
        self._channel.basic_publish(
            exchange=self.EXCHANGE,
            routing_key=routing_key,
            properties=props,
            body=json.dumps(obj["msg"]),
        )

    def start_consume(self, obj):
        queue = obj.get("queue", "")
        routing_key = obj.get("routing_key", queue or None)
        durable = obj.get("durable", True)
        exclusive = obj.get("exclusive", False)
        bind = obj.get("bind", True)
Bo-Chun Chen's avatar
Bo-Chun Chen committed
        if self._connection is None:
            self.connect()
        if bind:
            self.bind_queue(queue, routing_key, durable, exclusive)

        if self.DEBUG:
            print("Queue: " + queue + "\nRouting_key: " + routing_key)
        self._consumer_tag = self._channel.basic_consume(queue, obj["cb"])
Bo-Chun Chen's avatar
Bo-Chun Chen committed
        self._consuming = True
        try:
            self._channel.start_consuming()
        except KeyboardInterrupt:
            self._channel.stop_consuming()

    def stop_consume(self):
        self._channel.basic_cancel(self._consumer_tag)