Python LanguageВведение в RabbitMQ с использованием AMQPStorm

замечания

Последняя версия AMQPStorm доступна на pypi или вы можете установить ее с помощью pip

pip install amqpstorm

Как потреблять сообщения от RabbitMQ

Начните с импорта библиотеки.

from amqpstorm import Connection

При использовании сообщений мы сначала должны определить функцию обработки входящих сообщений. Это может быть любая вызываемая функция и должна принимать объект сообщения или кортеж сообщения (в зависимости от параметра to_tuple определенного в start_consuming ).

Помимо обработки данных из входящего сообщения, нам также необходимо подтвердить или отклонить сообщение. Это важно, так как нам нужно сообщить RabbitMQ, что мы правильно получили и обработали сообщение.

def on_message(message):
    """This function is called on message received.

    :param message: Delivered message.
    :return:
    """
    print("Message:", message.body)

    # Acknowledge that we handled the message without any issues.
    message.ack()

    # Reject the message.
    # message.reject()

    # Reject the message, and put it back in the queue.
    # message.reject(requeue=True)

Затем нам нужно настроить соединение с сервером RabbitMQ.

connection = Connection('127.0.0.1', 'guest', 'guest')

После этого нам нужно настроить канал. Каждое соединение может иметь несколько каналов, и, как правило, при выполнении многопоточных задач рекомендуется (но не обязательно) иметь по одному на поток.

channel = connection.channel()

Как только мы настроим наш канал, нам нужно сообщить RabbitMQ, что мы хотим начать использовать сообщения. В этом случае мы будем использовать нашу ранее определенную функцию on_message для обработки всех наших потребляемых сообщений.

Очередь, которую мы будем слушать на сервере RabbitMQ, будет simple_queue , и мы также сообщаем RabbitMQ, что мы будем признавать все входящие сообщения, как только мы с ними закончим.

channel.basic.consume(callback=on_message, queue='simple_queue', no_ack=False)

Наконец, нам нужно запустить цикл ввода-вывода, чтобы начать обработку сообщений, предоставляемых сервером RabbitMQ.

channel.start_consuming(to_tuple=False)

Как публиковать сообщения в RabbitMQ

Начните с импорта библиотеки.

from amqpstorm import Connection
from amqpstorm import Message

Затем нам нужно открыть соединение с сервером RabbitMQ.

connection = Connection('127.0.0.1', 'guest', 'guest')

После этого нам нужно настроить канал. Каждое соединение может иметь несколько каналов, и, как правило, при выполнении многопоточных задач рекомендуется (но не обязательно) иметь по одному на поток.

channel = connection.channel()

Как только мы настроим наш канал, мы можем начать готовить наше сообщение.

# Message Properties.
properties = {
    'content_type': 'text/plain',
    'headers': {'key': 'value'}
}

# Create the message.
message = Message.create(channel=channel, body='Hello World!', properties=properties)

Теперь мы можем опубликовать сообщение, просто позвонив в publish и предоставив routing_key . В этом случае мы отправим сообщение в очередь с именем simple_queue .

message.publish(routing_key='simple_queue')

Как создать задержанную очередь в RabbitMQ

Сначала нам нужно настроить два основных канала: один для основной очереди и один для очереди задержки. В моем примере в конце я включаю пару дополнительных флагов, которые не требуются, но делает код более надежным; таких как confirm delivery , delivery_mode и durable . Вы можете найти более подробную информацию о них в RabbitMQ руководстве .

После того, как мы установили каналы, мы добавим привязку к основному каналу, которую мы можем использовать для отправки сообщений с канала задержки в нашу основную очередь.

channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

Затем нам нужно настроить наш канал задержки для пересылки сообщений в основную очередь по истечении срока их действия.

delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000,
    'x-dead-letter-exchange': 'amq.direct',
    'x-dead-letter-routing-key': 'hello'
})
  • x-message-ttl (Message - Time To Live)

    Обычно это используется для автоматического удаления старых сообщений в очереди по истечении определенной продолжительности, но, добавив два необязательных аргумента, мы можем изменить это поведение, и вместо этого этот параметр определяет в миллисекундах, сколько сообщений будет оставаться в очереди ожидания.

  • х анкерного письмо маршрутизации ключ

    Эта переменная позволяет нам передавать сообщение в другую очередь, как только они истекли, вместо того, чтобы по умолчанию полностью удалять его.

  • х-буквальный обмен

    Эта переменная определяет, какой Exchange используется для передачи сообщения с hello_delay в очередь hello.

Публикация в очередь ожидания

Когда мы закончим настройку всех основных параметров Pika, вы просто отправляете сообщение в очередь ожидания, используя базовую публикацию.

delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mod': 2})

После выполнения сценария вы должны увидеть следующие очереди, созданные в вашем модуле управления RabbitMQ. введите описание изображения здесь

Пример.

from amqpstorm import Connection

connection = Connection('127.0.0.1', 'guest', 'guest')

# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_deliveries()
channel.queue.declare(queue='hello', durable=True)

# We need to bind this channel to an exchange, that will be used to transfer
# messages from our delay queue.
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_deliveries()

# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000, # Delay until the message is transferred in milliseconds.
    'x-dead-letter-exchange': 'amq.direct', # Exchange used to transfer the message from A to B.
    'x-dead-letter-routing-key': 'hello' # Name of the queue we want the message transferred to.
})

delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mode': 2})

print("[x] Sent")