Шаблон проектирования рабочего пула с помощью PHP

В этой статье я покажу вам, как использовать шаблон проектирования рабочего пула с помощью PHP.В первой части я буду использовать пул и многопоточные классы для демонстрации параллельной обработки. Оба они являются частью объектно-ориентированного API pthreads и требуют, чтобы в вашей версии PHP была включена опция ZTS.

Рабочий пул - это шаблон проектирования, который включает в себя создание пула рабочих потоков или процессов и назначение им задач для параллельного выполнения. Этот шаблон предоставляет решение для управления параллельным выполнением и контроля количества одновременно обрабатываемых задач.

Преимущества использования шаблона рабочего пула

Преимущества использования шаблона рабочего пула включают:

Эффективное использование ресурсов

ограничивая количество одновременных процессов, использующих пул, код может контролировать и оптимизировать использование ресурсов, предотвращая чрезмерное количество процессов, выполняемых в пуле. я подавляю систему.

Повышенная производительность

Параллельное выполнение задач с использованием нескольких рабочих процессов может значительно повысить производительность всей системы за счет эффективного использования доступных ресурсов.

Упрощенное назначение задач

Пул работников позволяет абстрагироваться от сложностей управления отдельными рабочими процессами. Основной процесс может просто назначать задачи пулу, а пул заботится о распределении задач и управлении процессами.

Масштабируемость

Шаблон рабочего пула обеспечивает легкую масштабируемость. Код может быть адаптирован для обработки больших рабочих нагрузок путем увеличения размера рабочего пула без существенных изменений в общей структуре.

Используя шаблон рабочего пула, код обеспечивает лучший контроль над управлением процессами, использованием ресурсов и выполнением задач, что приводит к повышению эффективности и быстродействия. Чтобы продемонстрировать преимущества этого шаблона проектирования, давайте предположим, что у нас есть очередь (для простоты - каталог), содержащая те xt сообщений, ожидающих отправки различным получателям.Наша работа заключается в отправке каждого сообщения через HTTP-шлюз.

Очередь может содержать тысячи сообщений, поэтому мы не сможем доставить их все сразу, так как это приведет к перегрузке шлюза (если его ограничитель скорости не заблокирует нас раньше). Если мы успешно доставим сообщение, то удалим его из очереди, если нет, то сохраним его (и попытаемся доставить позже).

Теперь давайте взглянем на пример кода

class MyPool extends Pool {

    public $data = [];
    private $numTasks = 0;
    
    public function submit(Threaded $task): int {
        $this->numTasks++;
        return parent::submit($task);
    }

    public function process() {
        // Run this loop as long as we have jobs in the pool
        while (count($this->data) < $this->numTasks) {
            $this->collect(function (SmsWorker $task) {
                // If a task was marked as done, collect its results
                if ($task->isDone()) {
                    $this->data[] = (object) [
                        'complete' => $task->complete,
                        'result' => $task->result,
                        'recipient' => $task->recipient
                    ];
                }
                return $task->isDone();
            });
        }
    
        // All jobs are done, we can shutdown the pool
        $this->shutdown();
    
        return $this->data;
    }

}

Класс MyPool расширяет встроенный класс Pool, который управляет пулом потоков. Нам нужно расширить исходный класс, поскольку он не поддерживает сбор данных, создаваемых потоками.

Метод process собирает результаты выполнения задач и завершает работу пула: он использует цикл для проверки того, имеет ли массив $data тот же размер, что и свойство $numTasks, что означает, что все задачи были выполнены.Внутри цикла он вызывает метод collect родительского класса, который принимает функцию обратного вызова в качестве параметра и возвращает целое число, представляющее количество оставшихся задач в пуле.Обратный вызов function принимает объект SmsWorker в качестве параметра и возвращает логическое значение, которое указывает, выполнена задача или нет.

Если задача выполнена, она добавляет объект с тремя свойствами: complete, который указывает, была ли задача выполнена успешно или нет; result, который содержит сообщение о результате выполнения задачи; и recipient, который является номером телефона получателя SMS. Вся эта информация затем сохраняется в свойстве $data в class.

class SmsWorker extends Threaded {

    private $url;
    private $recipient;
    private $file_name;

    private $result;
    private $complete = false;

    public function __construct($url, $recipient, $file_name) {
        $this->url = $url;
        $this->recipient = $recipient;
        $this->file_name = $file_name;
    }

    public function run() {

        try {
            if (Helpers::sendMessage($this->url, '200')) {
                $this->result = 'SMS successfully sent to '.$this->recipient.' phone number';
                unlink(DIR.'/'.$this->file_name);
            } else {
                $this->result = 'Failed to send SMS to '.$this->recipient.' phone number';
            }
        } catch (Exception $e) {
            $this->result = 'Error: ' . $e->getMessage();
        }

        $this->complete = true;

    }

    public function isDone() {
        return $this->complete;
    }

}

Я создал класс Helpers, который содержит все служебные функции, чтобы вы могли сосредоточиться на важных частях. Класс Threaded - это встроенный класс, который позволяет создавать потоки и манипулировать ими.Класс SmsWorker расширяет класс Threaded и представляет задачу, которая отправляет SMS получателю, используя URL-адрес. Он сохраняет результат своего выполнения и сообщает о его завершении обратно в pool.

class SmsClient {

    private $maxProcess;
    private $results = [];

    public function __construct($maxProcess) {
        $this->maxProcess = $maxProcess;
    }

    public function run() {

        try {
            $files = Helpers::scanDirectory(DIR);
            $pool = new MyPool($this->maxProcess);

            $widSuffix = 1;

            foreach ($files as $fileName) {
                $smsData = Helpers::processSms($fileName);
                $wid = $pool->submit(new SmsWorker($smsData['url'], $smsData['recipient'], $fileName));
                $wid = (string) "${wid}." . $widSuffix++;
                $this->results[$smsData['recipient']][] = $wid;
                Helpers::printMessage("SMS client (${wid}) started");
            }

            array_map(function ($task) {
                Helpers::printMessage('SMS client ('.$this->results[$task->recipient][0].') finished: '.$task->result);
                $this->results[$task->recipient][] = $task->result;
            }, $pool->process());
        } catch (Exception $e) {
            Helpers::printMessage('Error: ' . $e->getMessage());
        }

    }

}

“Центральной частью” кода является наш SmsClient: Он считывает все сообщения в нашей очереди.Создает пул работников.Вводите работников в пул с необходимой информацией (например, рецепт nt, URL-адрес шлюза, имя файла).Отслеживает работников и их ответы по мере завершения ими своей операции, используя массив, в который вводится номер получателя.Печатает окончательные результаты.

Применяя шаблон рабочего пула, код обеспечивает улучшенный параллелизм. Он эффективно управляет выполнением задач отправки SMS с использованием пула рабочих потоков, что приводит к лучшему использованию ресурсов и производительности.Если вы запустите код, он выведет результат, аналогичный приведенному ниже: Пример вывода нашего процессора очередей.

Чтобы продемонстрировать, как pthreads API отправляет рабочие задачи и собирает результаты, я установил максимальное количество потоков равным 2.In в нашем примере у нас есть очередь, содержащая 15 сообщений, и наш SmsClient отправляет в общей сложности 15 работников для их обработки.Рабочие идентификаторы равны либо 0, либо 1, что соответствует максимальному количеству потоков, которые могут выполняться одновременно. Второе число после "." представляет собой порядковый номер, позволяющий легко различать распределение арендной платы между рабочими.Как вы можете заметить, результаты собираются не в порядке их отправки. В многопоточной системе работники сообщают о своих результатах, как только они завершают свои задачи, которые могут отличаться от порядка создания.