Как использовать RabbitMQ с PHP

AMQP (Advanced Message Queuing Protocol) — это сетевой протокол, который может доставлять сообщения из одной конечной точки приложения в другую конечную точку приложения. Его не волнует платформа или язык указанных приложений, если они поддерживают AMQP.

По сути, одна из конечных точек приложения отправляет сообщение, являясь, таким образом, источником через брокера AMQP. В свою очередь, брокер доставляет сообщение другой конечной точке приложения, называемой Потребителем.


RabbitMQ — это брокер AMQP, поддерживающий несколько языков программирования, например PHP.


Преимущество наличия брокера сообщений, такого как RabbitMQ, и AMQP, являющегося сетевым протоколом, заключается в том, что производитель, брокер и потребитель могут жить на разных физических/виртуальных серверах в разных географических точках.


Кроме того, поскольку сети ненадежны, и приложения могут не полностью обработать сообщение, AMQP поддерживает подтверждения сообщений либо автоматически, либо когда конечная точка приложения решает их отправить.


RabbitMQ реализует протокол AMQP 0-9-1.

Глоссарий

Еще одним преимуществом AMQP 0-9-1 является то, что приложение определяет логику маршрутизации вместо администратора брокера. Это дает разработчику большую гибкость без необходимости изучения нового языка программирования/сценариев/разметки.

Отправка запроса на асинхронную обработку данных

Предположим, у нас есть пиццерия, и мы получаем онлайн-заказы. Давайте также предположим, что у нас есть автоматизированная система, которая обрабатывает заказы, но эта система не может быть выставлена ​​на всеобщее обозрение…


Мы реализуем самый простой из шаблонов:

Во-первых, у нас есть следующий скрипт для приема запросов с формы:

<?php
chdir(dirname(__DIR__));
require_once('vendor/autoload.php');

use Acme\AmqpWrapper\SimpleSender;

$theName = filter_input(INPUT_POST, 'theName', FILTER_SANITIZE_STRING);
$simpleSender = new SimpleSender();
$simpleSender->execute($theName);
header("Location: orderReceived.html");

Этот код просто проверит именованный параметр POST theName и отправит его объекту, который мы создали для его обработки. Давайте посмотрим на метод SimpleSender::execute():

<?php

// ... SOME CODE HERE ...

    /**
     * Sends a message to the pizzaTime queue.
     * 
     * @param string $message
     */
    public function execute($message)
    {
        /**
         * Create a connection to RabbitAMQP
         */
        $connection = new AMQPConnection(
            'localhost',    #host - host name where the RabbitMQ server is runing
            5672,           #port - port number of the service, 5672 is the default
            'guest',        #user - username to connect to server
            'guest'         #password
            );


        /** @var $channel AMQPChannel */
        $channel = $connection->channel();
        
        $channel->queue_declare(
            'pizzaTime',    #queue name - Queue names may be up to 255 bytes of UTF-8 characters
            false,          #passive - can use this to check whether an exchange exists without modifying the server state
            false,          #durable - make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
            false,          #exclusive - used by only one connection and the queue will be deleted when that connection closes
            false           #autodelete - queue is deleted when last consumer unsubscribes
            );
        
        $msg = new AMQPMessage($message);
        
        $channel->basic_publish(
            $msg,           #message 
            '',             #exchange
            'pizzaTime'     #routing key
            );
        
        $channel->close();
        $connection->close();
    }

Построчная разбивка выглядит следующим образом:

<?php
/* ... MORE CODE HERE ... */

        $connection = new AMQPConnection(
            'localhost',    #host - host name where the RabbitMQ server is runing
            5672,           #port - port number of the service, 5672 is the default
            'guest',        #user - username to connect to server
            'guest'         #password
            );
            
/* ... MORE CODE HERE ... */

Сначала мы создаем объект соединения. Имейте в виду, что учетные данные гость: гость используются по умолчанию для RabbitMQ. Однако вам будет разрешено подключаться к серверу, используя их, только если вы подключаетесь с того же хоста (localhost).


Поскольку RabbitMQ прослушивает и обслуживает, используя один порт, нам нужно создать канал (представьте его как виртуальный порт), чтобы $channel = $connection->channel();другие клиенты могли подключаться к серверу.

<?php
/* ... MORE CODE HERE ... */

        $channel->queue_declare(
            'pizzaTime',    #queue name - Queue names may be up to 255 bytes of UTF-8 characters
            false,          #passive - can use this to check whether an exchange exists without modifying the server state
            false,          #durable - make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
            false,          #exclusive - used by only one connection and the queue will be deleted when that connection closes
            false           #autodelete - queue is deleted when last consumer unsubscribes
            );
            
/* ... MORE CODE HERE ... */

Как только наш канал будет готов, давайте объявим очередь для отправки запроса. Преимущество RabbitMQ в том, что мы можем создавать очереди непосредственно из клиента, но мы должны быть осторожны при их создании. Давайте кратко объясним параметры, используемые для создания очереди с$channel->queue_declare()

  • Имя очереди : это произвольное имя, которое будет использоваться для идентификации очереди.
  • Passive : если установлено значение true, сервер только проверит, может ли очередь быть создана, false фактически попытается создать очередь.
  • Durable : как правило, если сервер останавливается или дает сбой, все очереди и сообщения теряются… если только мы не объявим очередь устойчивой, и в этом случае очередь будет сохраняться при перезапуске сервера.
  • Exclusive : если true, очередь может использоваться только тем соединением, которое ее создало.
  • Autodelete : если true, очередь будет удалена, как только в ней не будет сообщений и не будет подключенных подписчиков.

В нашем примере очередь не будет сохраняться при перезапуске сервера, может использоваться другими подключениями и не будет удалена, если на нее больше не будет подписчиков.


Затем мы создали объект сообщения с $msg = new AMQPMessage($message);параметром $message POST, theName который мы получили из формы. Сообщение может быть любой строкой.

<?php
/* ... MORE CODE HERE ... */

        $channel->basic_publish(
            $msg,           #message 
            '',             #exchange
            'pizzaTime'     #routing key
            );
            
/* ... MORE CODE HERE ... */

Теперь нам нужно опубликовать сообщение в очереди. Однако мы не можем публиковать сообщения напрямую в очередь, если это не через обмен. Мы никогда не объявляли обмен, так как же это будет возможно? Оказывается, когда мы создаем очередь, не определяя обмен для привязки очереди, будет использоваться обмен по умолчанию. Мы можем опубликовать сообщение в очереди через обмен по умолчанию с $channel->basic_publish() параметрами, которые он использует:

  • Сообщение: сообщение, которое мы хотим отправить
  • Обмен: обратите внимание, что мы используем пустую строку, потому что мы будем использовать обмен по умолчанию
  • Ключ маршрутизации: имя очереди, в которую мы хотим доставить сообщение.
<?php
/* ... MORE CODE HERE ... */

        $channel->close();
        $connection->close();
            
/* ... MORE CODE HERE ... */

После того, как мы закончим, мы должны закрыть соединение с каналом и сервером.

Обратите внимание, что мы не получили ответа от сервера. В лучшем случае мы можем быть уверены, что сообщение поставлено в очередь, но мы полностью игнорируем, если сообщение достигло конечного получателя. Это часть красоты AMQP… мы можем быстро направлять клиентов на наш общедоступный сайт и асинхронно обрабатывать заказы в приложении бэк-офиса.


Итак, заказ на пиццу стоит в очереди, как нам его получить? Прежде всего, мы должны знать, что потребитель должен установить постоянное соединение с сервером очереди (он же подписаться), чтобы он мог получать сообщения с сервера. Сервер не будет сам отправлять эти сообщения в наше приложение. К счастью, создать такое соединение очень просто.

<?php
namespace Acme\AmqpWrapper;

use PhpAmqpLib\Connection\AMQPConnection;


class SimpleReceiver
{
    /* ... SOME CODE HERE ... */

    /**
     * Listens for incoming messages
     */
    public function listen()
    {
        $connection = new AMQPConnection(
            'localhost',    #host 
            5672,           #port
            'guest',        #user
            'guest'         #password
            );
            
        $channel = $connection->channel();
        
        $channel->queue_declare(
            'pizzaTime',    #queue name, the same as the sender
            false,          #passive
            false,          #durable
            false,          #exclusive
            false           #autodelete
            );
        
        $channel->basic_consume(
            'pizzaTime',                    #queue 
            '',                             #consumer tag - Identifier for the consumer, valid within the current channel. just string
            false,                          #no local - TRUE: the server will not send messages to the connection that published them
            true,                           #no ack - send a proper acknowledgment from the worker, once we're done with a task
            false,                          #exclusive - queues may only be accessed by the current connection
            false,                          #no wait - TRUE: the server will not respond to the method. The client should not wait for a reply method
            array($this, 'processOrder')    #callback - method that will receive the message
            );
            
        while(count($channel->callbacks)) {
            $channel->wait();
        }
        
        $channel->close();
        $connection->close();
    }

    /**
     * @param $msg
     */
    public function processOrder($msg)
    {
        /* ... CODE TO PROCESS ORDER HERE ... */
    }
}

Точно так же, как мы подключились, создали канал и объявили очередь в производителе, мы должны сделать то же самое внутри потребителя. Однако в потребителе мы должны подписаться на канал с помощью $channel->basic_consume(), а используемые параметры определяются следующим образом:

  • Очередь: должно быть то же имя очереди, которое мы определили в производителе.
  • Тег потребителя: произвольное имя, данное потребителю. Если это поле пустое, сервер сгенерирует уникальный тег
  • Нет локального: это неясный параметр, если он активирован, сервер не будет доставлять свои собственные сообщения.
  • No Ack(nowledgement): автоматически подтверждает, что потребитель получил сообщение, поэтому нам не нужно делать это вручную.
  • Нет ожидания: если установлено, сервер не будет ждать завершения процесса в потребителе.
  • Обратный вызов: может быть именем функции, массивом, содержащим объект и имя метода, или замыканием, которое получит поставленное в очередь сообщение. Этот обратный вызов должен принимать параметр, содержащий такое сообщение. В нашем примере array($this, 'processOrder')используется для определения processOrder()метода текущего объекта в качестве обратного вызова.
<?php

/* ... SOME CODE HERE ... */

        while(count($channel->callbacks)) {
            $channel->wait();
        }
        
/* ... SOME CODE HERE ... */

«Магия» происходит внутри whileцикла. Если у подписчика есть хотя бы один определенный обратный вызов, мы будем ждать любого события в канале. Каждый раз, когда сообщение получено, processOrder()будет выполняться наш определенный обратный вызов, поэтому мы можем обрабатывать сообщение по мере необходимости.


Как мы зажжем это? Просто создайте скрипт, который будет вызывать SimpleReceiver::listen() метод, например:

<?php
chdir(dirname(__DIR__));

require_once('vendor/autoload.php');

use Acme\AmqpWrapper\SimpleReceiver;

$receiver = new SimpleReceiver();
$receiver->listen();

Теперь запустите процесс в консоли с помощью php <script name> и дайте ему сделать свою работу. Если вы хотите убить потребителя, простой Ctrl + Cпрервет процесс.


Одна из приятных особенностей наличия сервера очередей заключается в том, что если по какой-либо причине ваше задание-потребитель выйдет из строя, это не нарушит работу службы для пользователей вашего общедоступного приложения. Сообщения будут просто складываться в очередь, и как только вы перезапустите потребителя, сообщения будут доставляться ему одно за другим для обработки.

Заключение

В этом посте мы представили теорию AMQP и систем массового обслуживания и продемонстрировали их использование на простом примере php. Спасибо за прочтение.