Python queue | Очереди и приоритетные очереди на практике

Очереди являются основой многочисленных алгоритмов, используемых в играх, искусственном интеллекте, спутниковой навигации и планировании задач. Они входят в число самых популярных абстрактных типов данных , которые студенты, изучающие информатику, изучают в начале своего обучения. В то же время разработчики программного обеспечения часто используют очереди сообщений более высокого уровня для достижения лучшей масштабируемости микросервисной архитектуры.

Изучение типов очередей

Очередь — это абстрактный тип данных , который представляет собой последовательность элементов, организованных в соответствии с набором правил. В этом разделе вы узнаете о наиболее распространенных типах очередей и соответствующих им правилах расположения элементов. По крайней мере, каждая очередь предоставляет операции для добавления и удаления элементов за постоянное время или O(1) с использованием нотации Big O. Это означает, что обе операции должны быть мгновенными независимо от размера очереди.


Некоторые очереди могут поддерживать другие, более специфические операции. Пришло время узнать о них больше!

Очередь: первым пришел, первым ушел (FIFO)

Слово очередь может иметь разные значения в зависимости от контекста. Однако, когда люди ссылаются на очередь без использования каких-либо квалификаторов, они обычно имеют в виду очередь FIFO, которая напоминает очередь, которую вы можете найти на кассе в продуктовом магазине или на туристической достопримечательности:

https://static.dailymoscow.ru/uploads/kaliningrad/2015/01/DSC_8168NG1.jpg Туристы выстраиваются в очередь, чтобы попасть в музей.


Обратите внимание, что в отличие от очереди на фото, где люди стоят бок о бок, очередь, строго говоря, будет одиночной, и люди будут допущены по одному.


FIFO — это сокращение от first-in, first-out , которое описывает поток элементов в очереди. Элементы в такой очереди будут обрабатываться в порядке очереди , как работает большинство реальных очередей.

Примечание. Элементы очереди FIFO можно представить себе как автомобили, останавливающиеся на светофоре.

Добавление элемента в очередь FIFO обычно называют операцией постановки в очередь , а извлечение элемента из нее — операцией удаления из очереди . Не путайте операцию удаления из очереди с типом данных deque (двусторонняя очередь) , о котором вы узнаете позже!


Постановка в очередь и удаление из очереди — это две независимые операции, которые могут выполняться с разной скоростью. Этот факт делает очереди FIFO идеальным инструментом для буферизации данных в сценариях потоковой передачи и для планирования задач , которые должны ждать, пока какой-либо общий ресурс станет доступным. Например, веб-сервер, перегруженный HTTP-запросами, может поместить их в очередь вместо того, чтобы немедленно отклонить с ошибкой.

Примечание. В программах, использующих параллелизм , очередь FIFO часто сама становится общим ресурсом, чтобы облегчить двустороннюю связь между асинхронными рабочими процессами. Временно блокируя доступ для чтения или записи к своим элементам, блокирующая очередь может элегантно координировать пул производителей и пул потребителей. Вы найдете больше информации об этом варианте использования в последующих разделах об очередях в многопоточности и многопроцессорности .

Еще один момент, который стоит отметить в отношении очереди, изображенной выше, заключается в том, что она может неограниченно расти по мере поступления новых элементов. Представьте себе очередь на кассе, тянущуюся к задней части магазина во время напряженного сезона покупок! Однако в некоторых ситуациях вы можете предпочесть работать с ограниченной очередью с заранее известной фиксированной емкостью. Ограниченная очередь может помочь контролировать ограниченные ресурсы двумя способами:

  1. Необратимо отвергая элементы, которые не подходят
  2. Путем перезаписи самого старого элемента в очереди

В соответствии с первой стратегией, как только очередь FIFO становится насыщенной, она не будет принимать больше элементов, пока другие не покинут очередь, чтобы освободить место.

Deque: двусторонняя очередь

Двусторонняя очередь или deque (произносится как колода ) — это более общий тип данных, который объединяет и расширяет идеи, лежащие в основе стека и очереди. Это позволяет вам ставить в очередь или удалять элементы из очереди с обоих концов за постоянное время в любой момент. Таким образом, двухсторонняя очередь может работать как очередь FIFO или LIFO, а также что-то среднее между ними и за их пределами.


Используя тот же пример очереди людей, что и раньше, вы можете использовать двухстороннюю очередь для моделирования более сложных угловых случаев. В реальной жизни последний человек в очереди может проявить нетерпение и решить выйти из очереди раньше или присоединиться к другой очереди на новой, только что открывшейся кассе. И наоборот, тому, кто забронировал посещение онлайн на определенную дату и время заранее, может быть разрешено встать в очередь без ожидания.


Добавление элемента в ограниченную двухстороннюю очередь , которая уже достигла своей полной емкости, перезапишет элемент, расположенный в настоящее время на противоположном конце. Эта функция может быть удобна для выделения нескольких первых или нескольких последних элементов последовательности. Вы также можете остановиться где-нибудь в этой последовательности, а затем двигаться влево или вправо небольшими шагами.


Двухстороннюю очередь можно использовать для реализации балансировщика нагрузки или планировщика задач, работающего в циклическом режиме. В приложении с графическим интерфейсом вы можете использовать очередь для хранения недавно открытых файлов , разрешить пользователю отменять и повторять свои действия или позволять пользователю перемещаться вперед и назад по истории просмотра веб-страниц .Как видите, очереди находят множество практических применений, особенно для отслеживания самой последней активности. Однако некоторые проблемы потребуют от вас использования еще одного типа очереди, о котором вы прочтете далее.

Приоритетная очередь: отсортировано от высокого к низкому

Очередь с приоритетом отличается от тех, что вы видели до сих пор, потому что она не может хранить обычные элементы. Вместо этого каждый элемент теперь должен иметь связанный с ним приоритет для сравнения с другими элементами. Очередь будет поддерживать отсортированный порядок , позволяя новым элементам присоединяться там, где это необходимо, и перетасовывать существующие элементы, если это необходимо. Когда два элемента имеют одинаковый приоритет, они будут следовать своему порядку вставки.

Примечание. Убедитесь, что для ваших приоритетов выбран тип данных, значения которого сопоставимы с помощью операторов сравнения , например, меньше ( <). Например, целые числа и метки времени подойдут, а комплексные числа не подойдут для указания приоритета, поскольку они не реализуют никакого соответствующего оператора сравнения.

Этот вид очереди работает аналогично приоритетной посадке в самолет:

Пассажирские трапы подключены к Airbus A380 в аэропорту Франкфурта

Обычные пассажиры присоединятся к очереди в самом конце, если только они не сопровождаются маленькими детьми, не имеют инвалидности или не заработали баллы лояльности, и в этом случае они будут быстро доставлены в начало очереди. Путешественники бизнес-класса обычно наслаждаются роскошью отдельной, гораздо меньшей очереди, но даже им иногда приходится пропускать пассажиров первого класса.


Новый элемент вставляется между элементом с более высоким или равным приоритетом и другим элементом с более низким приоритетом. Это правило напоминает алгоритм сортировки вставками, который оказывается стабильным , поскольку элементы с одинаковым приоритетом никогда не меняются местами. Вы можете использовать приоритетную очередь для сортировки последовательности элементов по заданному ключу или для получения первых нескольких элементов . Однако это было бы излишним, поскольку существуют гораздо более эффективные алгоритмы сортировки . Очередь с приоритетом лучше подходит для ситуаций, когда элементы могут появляться и исчезать динамически. Одной из таких ситуаций может быть поиск кратчайшего пути во взвешенном графе с использованием алгоритма Дейкстры , о котором вы прочтете позже.

Примечание. Несмотря на то, что приоритетная очередь концептуально представляет собой последовательность, ее наиболее эффективная реализация строится на основе структуры данных кучи , которая является своего рода двоичным деревом . Поэтому термины «куча» и «очередь с приоритетом» иногда используются взаимозаменяемо.

Это было длинное введение в теорию и таксономию очередей. Попутно вы узнали об очередях FIFO, стеках (очередях LIFO), деках и очередях с приоритетом. Вы также увидели разницу между ограниченными и неограниченными очередями и получили представление об их потенциальных применениях. Теперь пришло время попробовать реализовать некоторые из этих очередей самостоятельно.

Реализация очередей в Python

Прежде всего, стоит ли вам самостоятельно реализовывать очередь в Python? В большинстве случаев ответом на этот вопрос будет решительное «нет» . Язык поставляется с включенными батареями, и очереди не являются исключением. На самом деле вы обнаружите, что Python имеет множество реализаций очередей, подходящих для решения различных задач.


При этом попытка создать что-то с нуля может стать бесценным опытом. Вас также могут попросить предоставить реализацию очереди во время технического собеседования . Итак, если вам интересна эта тема, то читайте дальше. В противном случае, если вы хотите использовать очереди только на практике , вы можете полностью пропустить этот раздел.

Представление очередей FIFO и LIFO с помощью Deque

Чтобы представить очередь FIFO в памяти компьютера, вам потребуется последовательность , имеющая O(1) или постоянное время, производительность для операции постановки в очередь на одном конце и столь же эффективная операция удаления из очереди на другом конце. Как вы уже знаете, двухсторонняя или двухсторонняя очередь удовлетворяет этим требованиям. Кроме того, он достаточно универсален, чтобы адаптироваться и к очереди LIFO.


Однако, поскольку его кодирование выходит за рамки этого руководства, вы будете использовать deque коллекцию Python из стандартной библиотеки.


Примечание. Deque — это абстрактный тип данных, который можно реализовать несколькими способами. Использование двусвязного списка в качестве базовой реализации гарантирует, что доступ и удаление элементов с обоих концов будут иметь требуемую временную сложность O(1). Если ваша двухсторонняя очередь имеет фиксированный размер, вы можете вместо этого использовать циклический буфер , что позволит вам получить доступ к любому элементу за постоянное время. В отличие от связанного списка, циклический буфер представляет собой структуру данных с произвольным доступом .


Почему бы не использовать Python listвместо collections.dequeстроительного блока для вашей очереди FIFO?

.append() Обе последовательности позволяют довольно дешево ставить элементы в очередь своими методами, с небольшой оговоркой для списков, которые время от времени будут требовать копирования всех элементов в новую ячейку памяти, когда их количество превысит определенный порог.


К сожалению, удаление элемента из начала списка с помощью list.pop(0) или эквивалентная вставка элемента с помощью list.insert(0, element) гораздо менее эффективны. Такие операции всегда сдвигают оставшиеся элементы, что приводит к линейной или O (n) временной сложности. Напротив, deque.popleft() и deque.appendleft() вообще избегайте этого шага.


После этого вы можете приступить к определению своего пользовательского Queue класса на основе коллекции Python deque.

Создание типа данных очереди

Теперь, когда вы выбрали подходящее представление очереди, вы можете запустить свой любимый редактор кода , например Visual Studio Code или PyCharm , и создать новый модуль Python для различных реализаций очередей. Вы можете вызвать файл queues.py(форма множественного числа), чтобы избежать конфликта с queue модулем с таким же названием (форма единственного числа), уже доступным в стандартной библиотеке Python.

Примечание: вы поближе познакомитесь со встроенным queue модулем в следующем разделе, посвященном потокобезопасным очередям в Python.

Поскольку вы хотите, чтобы ваша пользовательская очередь FIFO поддерживала по крайней мере операции постановки в очередь и удаления из очереди, напишите базовый Queueкласс, который делегирует эти две операции методам deque.append() и deque.popleft() соответственно:

# queues.py

from collections import deque

class Queue:
    def __init__(self):
        self._elements = deque()

    def enqueue(self, element):
        self._elements.append(element)

    def dequeue(self):
        return self._elements.popleft()

Этот класс просто обертывает collections.deque экземпляр и вызывает его ._elements. Ведущее подчеркивание в имени атрибута указывает на внутренний бит реализации, к которому должен обращаться и изменять только класс. Такие поля иногда называют приватными, потому что они не должны быть видны за пределами тела класса.


Вы можете протестировать свою очередь FIFO, импортировав ее из локального модуля в интерактивном сеансе интерпретатора Python :

>>> from queues import Queue

>>> fifo = Queue()
>>> fifo.enqueue("1st")
>>> fifo.enqueue("2nd")
>>> fifo.enqueue("3rd")

>>> fifo.dequeue()
'1st'
>>> fifo.dequeue()
'2nd'
>>> fifo.dequeue()
'3rd'

Как и ожидалось, поставленные в очередь элементы возвращаются к вам в исходном порядке. Если вы хотите, вы можете улучшить свой класс, сделав его итерируемым и способным сообщать о своей длине и, возможно, принимать начальные элементы:

# queues.py

from collections import deque

class Queue:
    def __init__(self, *elements):
        self._elements = deque(elements)

    def __len__(self):
        return len(self._elements)

    def __iter__(self):
        while len(self) > 0:
            yield self.dequeue()

    def enqueue(self, element):
        self._elements.append(element)

    def dequeue(self):
        return self._elements.popleft()

A dequeпринимает необязательный итерируемый объект, который вы можете предоставить через различное количество позиционных аргументов *elementsв вашем методе инициализатора. Реализуя специальный .__iter__() метод, вы сделаете экземпляры класса пригодными для использования в for цикле , а реализация .__len__() сделает их совместимыми с len() функцией. Приведенный выше метод .__iter__() является примером итератора-генератора , который лениво выдает элементы .

Примечание. Реализация .__iter__() заставляет вашу настраиваемую очередь уменьшать свой размер, удаляя элементы из самой себя, когда вы выполняете итерацию по ней.

Перезапустите интерпретатор Python и снова импортируйте свой класс, чтобы увидеть обновленный код в действии:

>>> from queues import Queue

>>> fifo = Queue("1st", "2nd", "3rd")
>>> len(fifo)
3

>>> for element in fifo:
...     print(element)
...
1st
2nd
3rd

>>> len(fifo)
0

Первоначально очередь состоит из трех элементов, но ее длина уменьшается до нуля после использования всех элементов в цикле.

Построение типа данных приоритетной очереди

Представьте, что вы разрабатываете программное обеспечение для автомобильной компании. Современные автомобили — это практически компьютеры на колесах, которые используют шину локальной сети контроллеров (CAN) для передачи сообщений о различных событиях, происходящих в вашем автомобиле, таких как отпирание дверей или срабатывание подушки безопасности. Ясно, что некоторые из этих событий более важны, чем другие, и должны быть соответствующим образом расставлены по приоритетам.

Забавный факт: вы можете загрузить мобильное приложение для своего смартфона, например Torque , которое позволит вам подключиться к шине CAN вашего автомобиля через Bluetooth или специальную сеть Wi-Fi через небольшое сканирующее устройство , подключенное к бортовой сети вашего автомобиля. порт диагностики платы (OBD).

Эта настройка позволит вам контролировать параметры вашего автомобиля в режиме реального времени, даже если они не отображаются на приборной панели! Это включает в себя такие вещи, как температура охлаждающей жидкости, напряжение батареи, количество миль на галлон и выбросы. Кроме того, вы сможете проверить, сообщают ли ЭБУ вашего автомобиля какие-либо коды неисправностей.


Можно пропустить сообщение о неисправной фаре или немного подождать, пока уровень громкости звука не снизится. Однако, когда вы нажимаете на педаль тормоза, вы ожидаете немедленного эффекта, потому что это критическая для безопасности подсистема. Каждое сообщение имеет приоритет в протоколе шины CAN, который сообщает промежуточным устройствам, следует ли им передавать сообщение дальше или полностью игнорировать его.


Несмотря на то, что это упрощение проблемы, вы можете думать о шине CAN как об очереди приоритетов, которая сортирует сообщения в соответствии с их важностью. Теперь вернитесь в редактор кода и определите следующий класс в модуле Python, который вы создали ранее:

# queues.py

from collections import deque
from heapq import heappop, heappush

# ...

class PriorityQueue:
    def __init__(self):
        self._elements = []

    def enqueue_with_priority(self, priority, value):
        heappush(self._elements, (priority, value))

    def dequeue(self):
        return heappop(self._elements)

Это базовая реализация очереди с приоритетом, которая определяет кучу элементов, используя список Python и два метода, которые манипулируют им. Метод .enqueue_with_priority()принимает два аргумента, приоритет и соответствующее значение, которое затем оборачивает в кортеж и помещает в кучу с помощью модуля heapq. Обратите внимание, что приоритет стоит перед значением, чтобы воспользоваться тем, как Python сравнивает кортежи.


К сожалению, есть несколько проблем с приведенной выше реализацией, которые становятся очевидными, когда вы пытаетесь ее использовать:

>>> from queues import PriorityQueue

>>> CRITICAL = 3
>>> IMPORTANT = 2
>>> NEUTRAL = 1

>>> messages = PriorityQueue()
>>> messages.enqueue_with_priority(IMPORTANT, "Windshield wipers turned on")
>>> messages.enqueue_with_priority(NEUTRAL, "Radio station tuned in")
>>> messages.enqueue_with_priority(CRITICAL, "Brake pedal depressed")
>>> messages.enqueue_with_priority(IMPORTANT, "Hazard lights turned on")

>>> messages.dequeue()
(1, 'Radio station tuned in')

Вы определили три уровня приоритета: критический, важный и нейтральный. Затем вы создали очередь с приоритетами и использовали ее для постановки в очередь нескольких сообщений с этими приоритетами. Однако вместо удаления из очереди сообщения с наивысшим приоритетом вы получили кортеж, соответствующий сообщению с самым низким приоритетом.

Примечание. В конечном счете, вам решать, как вы хотите определить порядок своих приоритетов. В этом руководстве более низкий приоритет соответствует более низкому числовому значению, а более высокий приоритет имеет большее значение.

Тем не менее, в некоторых случаях может быть удобнее изменить этот порядок. Например, в алгоритме кратчайшего пути Дейкстры вы захотите отдать приоритет путям с меньшей общей стоимостью по сравнению с путями с высокой стоимостью. Чтобы справиться с такой ситуацией, вы позже реализуете другой класс.


Поскольку куча Python является минимальной кучей, ее первый элемент всегда имеет наименьшее значение. Чтобы исправить это, вы можете перевернуть знак приоритета при добавлении кортежа в кучу, сделав приоритет отрицательным числом , чтобы самый высокий стал самым низким:

# queues.py

# ...

class PriorityQueue:
    def __init__(self):
        self._elements = []

    def enqueue_with_priority(self, priority, value):
        heappush(self._elements, (-priority, value))

    def dequeue(self):
        return heappop(self._elements)[1]

С этим небольшим изменением вы будете размещать критические сообщения перед важными и нейтральными. Кроме того, при выполнении операции удаления из очереди вы распаковываете значение из кортежа, обращаясь к его второму компоненту, расположенному в индексе один, используя синтаксис квадратных скобок ( []).


Теперь, если вы вернетесь к интерактивному интерпретатору Python, импортируете обновленный код и снова поставите в очередь те же сообщения, они вернутся к вам в более разумном порядке:

>>> messages.dequeue()
'Brake pedal depressed'

>>> messages.dequeue()
'Hazard lights turned on'

>>> messages.dequeue()
'Windshield wipers turned on'

>>> messages.dequeue()
'Radio station tuned in'

Сначала вы получаете критическое сообщение, затем два важных, а затем нейтральное сообщение. Пока все хорошо, правда? Однако есть две проблемы с вашей реализацией. Один из них вы уже можете увидеть на выходе, а другой проявится только при определенных обстоятельствах. Можете ли вы обнаружить эти проблемы?

Использование очередей на практике

Как упоминалось во введении к этому руководству, очереди являются основой многих важных алгоритмов. Одной из особенно интересных областей применения является посещение узлов графа , который может, например, представлять карту дорог между городами. Очереди могут быть полезны для поиска кратчайшего или наиболее оптимального пути между двумя местами.


В этом разделе вы собираетесь использовать только что построенные очереди для реализации классических алгоритмов обхода графа . Существует множество способов представления графиков в коде и столько же библиотек Python, которые уже хорошо с этим справляются. Для простоты вы воспользуетесь преимуществами библиотек networkx и pygraphviz , а также широко используемого языка описания графов DOT .


Вы можете установить эти библиотеки в свою виртуальную среду , используя pip:

(venv) $ python -m pip install networkx pygraphviz

Кроме того, вы можете установить все зависимости, необходимые для остальной части этого руководства, за один шаг, следуя инструкциям в файле, READMEкоторый вы найдете в дополнительных материалах. Обратите внимание, что установка pygraphviz может быть немного сложной, поскольку для этого требуется набор инструментов компилятора C.

Образец данных: дорожная карта Соединенного Королевства

После установки необходимых библиотек вы прочтете взвешенный и неориентированный граф городов Соединенного Королевства из файла DOT, который вы можете найти в сопроводительных материалах:


Этот граф имеет 70 узлов, представляющих города Великобритании, и 137 ребер, взвешенных по расчетному расстоянию в милях между соединенными городами:

Города в Соединенном Королевстве

Обратите внимание, что приведенный выше график представляет собой упрощенную модель дорожной сети в Великобритании, поскольку в нем не учитываются типы дорог, их пропускная способность, ограничения скорости, трафик или объездные дороги. Он также игнорирует тот факт, что обычно два города соединяет более одной дороги. Таким образом, кратчайший путь, определенный спутниковой навигацией или Google Maps , скорее всего, будет отличаться от того, который вы найдете с очередями в этом руководстве.


Тем не менее, приведенный выше график представляет собой фактическое дорожное сообщение между городами, а не прямые линии по прямой. Хотя в визуализации края могут выглядеть как прямые линии, в реальной жизни они точно не такие. Графически один и тот же график можно представить множеством способов.


Далее вы будете использовать библиотеку networkx для чтения этого графика в Python.

Объектное представление городов и дорог

Хотя networkx не может читать файлы DOT самостоятельно, библиотека предоставляет несколько вспомогательных функций, которые делегируют эту задачу другим сторонним библиотекам. В этом руководстве вы будете использовать pygraphviz для чтения примера файла DOT:

>>> import networkx as nx
>>> print(nx.nx_agraph.read_dot("roadmap.dot"))
MultiGraph named 'Cities in the United Kingdom' with 70 nodes and 137 edges

Несмотря на то, что установка pygraphviz в некоторых операционных системах может быть немного сложной, на сегодняшний день она является самой быстрой и наиболее совместимой с расширенными функциями формата DOT. По умолчанию networkx представляет узлы графа с помощью текстовых идентификаторов, которые могут дополнительно иметь связанный словарь атрибутов:

>>> import networkx as nx
>>> graph = nx.nx_agraph.read_dot("roadmap.dot")
>>> graph.nodes["london"]
{'country': 'England',
 'latitude': '51.507222',
 'longitude': '-0.1275',
 'pos': '80,21!',
 'xlabel': 'City of London',
 'year': 0}

Например, "london" строка сопоставляется с соответствующим словарем пар ключ-значение. Атрибут pos, который содержит нормализованные координаты после применения проекции Меркатора к широте и долготе, учитывается Graphviz для размещения узлов в графической визуализации. Атрибут year указывает, когда город получил свой статус. Когда он равен нулю, это означает незапамятные времена.


Поскольку это не самый удобный способ думать о графиках, вы определите пользовательский тип данных, представляющий город на вашей дорожной карте. Идем дальше, создаем новый файл с именем graph.py и реализуем в нем следующий класс:

# graph.py

from typing import NamedTuple

class City(NamedTuple):
    name: str
    country: str
    year: int | None
    latitude: float
    longitude: float

    @classmethod
    def from_dict(cls, attrs):
        return cls(
            name=attrs["xlabel"],
            country=attrs["country"],
            year=int(attrs["year"]) or None,
            latitude=float(attrs["latitude"]),
            longitude=float(attrs["longitude"]),
        )

Вы расширяете именованный кортеж , чтобы гарантировать возможность хеширования объектов вашего узла , что требуется для networkx. Вместо этого вы можете использовать правильно сконфигурированный класс данных , но именованный кортеж можно хешировать из коробки. Это также сопоставимо, что вам может понадобиться позже, чтобы определить порядок обхода графа. Метод .from_dict() класса берет словарь атрибутов, извлеченных из файла DOT, и возвращает новый экземпляр вашего City класса.


Чтобы воспользоваться преимуществом вашего нового класса, вам потребуется создать новый экземпляр графа и принять к сведению сопоставление идентификаторов узлов с экземплярами города. Добавьте в модуль следующую вспомогательную функцию graph:

# graph.py

import networkx as nx

# ...

def load_graph(filename, node_factory):
    graph = nx.nx_agraph.read_dot(filename)
    nodes = {
        name: node_factory(attributes)
        for name, attributes in graph.nodes(data=True)
    }
    return nodes, nx.Graph(
        (nodes[name1], nodes[name2], weights)
        for name1, name2, weights in graph.edges(data=True)
    )

Функция принимает имя файла и вызываемую фабрику для объектов узла, таких как City.from_dict()метод вашего класса. Он начинается с чтения файла DOT и построения сопоставления идентификаторов узлов с объектно-ориентированным представлением узлов графа. Наконец, он возвращает это отображение и новый граф, содержащий узлы и взвешенные ребра.


Теперь вы можете снова начать играть с дорожной картой Великобритании в интерактивном сеансе интерпретатора Python:

>>> from graph import City, load_graph

>>> nodes, graph = load_graph("roadmap.dot", City.from_dict)

>>> nodes["london"]
City(
    name='City of London',
    country='England',
    year=None,
    latitude=51.507222,
    longitude=-0.1275
)

>>> print(graph)
Graph with 70 nodes and 137 edges

После импорта вспомогательной функции и Cityкласса из вашего модуля вы загружаете график из образца файла DOT и сохраняете результат в двух переменных. Переменная nodes позволяет получить ссылку на экземпляр класса Cityпо указанному имени, тогда как graph переменная содержит весь Graph объект networkx.


При поиске кратчайшего пути между двумя городами вам нужно определить непосредственных соседей данного города, чтобы найти доступные маршруты для следования. Вы можете сделать это несколькими способами с помощью networkx graph. В простейшем случае вы вызовете .neighbors() метод на графе с указанным узлом в качестве аргумента:

>>> for neighbor in graph.neighbors(nodes["london"]):
...     print(neighbor.name)
...
Bath
Brighton & Hove
Bristol
Cambridge
Canterbury
Chelmsford
Coventry
Oxford
Peterborough
Portsmouth
Southampton
Southend-on-Sea
St Albans
Westminster
Winchester

Это показывает только соседние узлы без возможных весов соединяющих ребер, таких как расстояния или расчетное время в пути, которые вам могут понадобиться для выбора наилучшего пути. Если вы хотите включить веса, то получите доступ к узлу, используя синтаксис квадратных скобок:

>>> for neighbor, weights in graph[nodes["london"]].items():
...     print(weights["distance"], neighbor.name)
...
115 Bath
53 Brighton & Hove
118 Bristol
61 Cambridge
62 Canterbury
40 Chelmsford
100 Coventry
58 Oxford
85 Peterborough
75 Portsmouth
79 Southampton
42 Southend-on-Sea
25 St Albans
1 Westminster
68 Winchester

Соседи всегда перечислены в том же порядке, в котором вы определили их в файле DOT. Чтобы отсортировать их по одному или нескольким весам, вы можете использовать следующий фрагмент кода:

>>> def sort_by(neighbors, strategy):
...     return sorted(neighbors.items(), key=lambda item: strategy(item[1]))
...
>>> def by_distance(weights):
...     return float(weights["distance"])
...
>>> for neighbor, weights in sort_by(graph[nodes["london"]], by_distance):
...     print(f"{weights['distance']:>3} miles, {neighbor.name}")
...
  1 miles, Westminster
 25 miles, St Albans
 40 miles, Chelmsford
 42 miles, Southend-on-Sea
 53 miles, Brighton & Hove
 58 miles, Oxford
 61 miles, Cambridge
 62 miles, Canterbury
 68 miles, Winchester
 75 miles, Portsmouth
 79 miles, Southampton
 85 miles, Peterborough
100 miles, Coventry
115 miles, Bath
118 miles, Bristol

Во-первых, вы определяете вспомогательную функцию, которая возвращает список соседей и их веса, отсортированные по указанной стратегии. Стратегия берет словарь всех весов, связанных с ребром, и возвращает ключ сортировки. Затем вы определяете конкретную стратегию, которая создает расстояние с плавающей запятой на основе входного словаря. Наконец, вы перебираете соседей Лондона, отсортированных по расстоянию в порядке возрастания.


Обладая этими элементарными знаниями о библиотеке networkx, вы теперь можете перейти к реализации алгоритмов обхода графа на основе типов данных пользовательских очередей, которые вы создали ранее.

Поиск в ширину с использованием очереди FIFO

В алгоритме поиска в ширину вы ищете узел, который удовлетворяет определенному условию, исследуя граф в концентрических слоях или уровнях. Вы начинаете обход графа с произвольно выбранного исходного узла , если граф не является древовидной структурой данных, и в этом случае вы обычно начинаете с корневого узла этого дерева. На каждом шаге вы посещаете всех непосредственных соседей текущего узла, прежде чем углубляться.

Примечание. Чтобы избежать зацикливания, когда граф содержит циклы, отслеживайте соседей, которые вы посетили, и пропускайте их при следующей встрече с ними. Например, вы можете добавить посещенные узлы в набор Python, а затем использовать оператор, inчтобы проверить, содержит ли набор заданный узел.

Например, предположим, что вы хотели найти какое-либо место в Соединенном Королевстве, получившее статус города в двадцатом веке, начав поиск с Эдинбурга. В библиотеке networkx уже реализовано множество алгоритмов, в том числе поиск в ширину, который может помочь перепроверить вашу будущую реализацию. Вызовите nx.bfs_tree()функцию на вашем графике, чтобы показать порядок обхода в ширину:

>>> import networkx as nx
>>> from graph import City, load_graph

>>> def is_twentieth_century(year):
...     return year and 1901 <= year <= 2000
...
>>> nodes, graph = load_graph("roadmap.dot", City.from_dict)
>>> for node in nx.bfs_tree(graph, nodes["edinburgh"]):
...     print("📍", node.name)
...     if is_twentieth_century(node.year):
...         print("Found:", node.name, node.year)
...         break
... else:
...     print("Not found")
...
📍 Edinburgh
📍 Dundee
📍 Glasgow
📍 Perth
📍 Stirling
📍 Carlisle
📍 Newcastle upon Tyne
📍 Aberdeen
📍 Inverness
📍 Lancaster
Found: Lancaster 1937

Выделенные строки указывают на всех шести непосредственных соседей Эдинбурга, который является вашим исходным узлом. Обратите внимание, что они посещаются последовательно без перерыва перед переходом к следующему слою графа. Последующий слой состоит из соседей второго уровня, начиная с исходного узла.


Вы исследуете непосещенных соседей выделенных городов. Первый — Данди, соседями которого являются Абердин и Перт, но вы уже были в Перте, поэтому пропускаете его и посещаете только Абердин. У Глазго нет непосещенных соседей, а у Перта есть только Инвернесс. Точно так же вы посетили соседей Стерлинга, но не посетили Карлайла, который связан с Ланкастером. Вы прекращаете поиски, потому что Ланкастер — ваш ответ.


Результат вашего поиска может иногда различаться в зависимости от вашего выбора начальной точки, а также от порядка соседей, если есть более одного узла, удовлетворяющего условию. Чтобы обеспечить согласованные результаты, вы можете отсортировать соседей по некоторым критериям. Например, вы можете сначала посетить города с более высокой широтой:

>>> def order(neighbors):
...     def by_latitude(city):
...         return city.latitude
...     return iter(sorted(neighbors, key=by_latitude, reverse=True))

>>> for node in nx.bfs_tree(graph, nodes["edinburgh"], sort_neighbors=order):
...     print("📍", node.name)
...     if is_twentieth_century(node.year):
...         print("Found:", node.name, node.year)
...         break
... else:
...     print("Not found")
...
📍 Edinburgh
📍 Dundee
📍 Perth
📍 Stirling
📍 Glasgow
📍 Newcastle upon Tyne
📍 Carlisle
📍 Aberdeen
📍 Inverness
📍 Sunderland
Found: Sunderland 1992

Теперь ответ другой, потому что Ньюкасл посещается раньше, чем Карлайл, из-за того, что он находится на несколько большей широте. В свою очередь, это заставляет алгоритм поиска в ширину находить Сандерленд раньше Ланкастера, который является альтернативным узлом, соответствующим вашему условию.

Примечание. Если вам интересно, почему order() оборачивает список отсортированных соседей в вызов iter(), это потому, что он nx.bfs_tree()ожидает объект итератора в качестве входных данных для своего sort_neighbors аргумента.

Теперь, когда вы получили общее представление об алгоритме поиска в ширину, пришло время реализовать его самостоятельно. Поскольку обход в ширину является основой для других интересных алгоритмов, вы извлечете его логику в отдельную функцию, которой сможете делегировать:

# graph.py

from queues import Queue

# ...

def breadth_first_traverse(graph, source):
    queue = Queue(source)
    visited = {source}
    while queue:
        yield (node := queue.dequeue())
        for neighbor in graph.neighbors(node):
            if neighbor not in visited:
                visited.add(neighbor)
                queue.enqueue(neighbor)

def breadth_first_search(graph, source, predicate):
    for node in breadth_first_traverse(graph, source):
        if predicate(node):
            return node

Первая функция принимает сетевой граф и исходный узел в качестве аргументов, возвращая узлы, посещенные при обходе в ширину. Обратите внимание, что он использует вашу очередь FIFO из queuesмодуля, чтобы отслеживать соседние узлы, гарантируя, что вы будете исследовать их последовательно на каждом уровне. Функция также помечает посещенные узлы, добавляя их в набор Python , так что каждый сосед посещается не более одного раза.


Примечание. Вместо использования whileцикла вместе с оператором walrus ( :=) для получения удаленного из очереди узла в одном выражении вы можете воспользоваться тем фактом, что ваша пользовательская очередь является итерируемой, удаляя элементы из очереди с помощью цикла for:

def breadth_first_traverse(graph, source):
    queue = Queue(source)
    visited = {source}
    for node in queue:
        yield node
        for neighbor in graph.neighbors(node):
            if neighbor not in visited:
                visited.add(neighbor)
                queue.enqueue(neighbor)

Однако это зависит от неочевидной детали реализации в вашем классе, поэтому в оставшейся части этого руководства Queue вы будете придерживаться более традиционного цикла.while


Вторая функция строится поверх первой, перебирая полученные узлы, и останавливается, когда текущий узел соответствует ожидаемым критериям. Если ни один из узлов не делает предикат истинным, функция неявно возвращает None.


Чтобы проверить реализацию поиска в ширину и обхода в действии, вы можете заменить вспомогательную функцию, встроенную в networkx, на свою собственную:

>>> from graph import (
...     City,
...     load_graph,
...     breadth_first_traverse,
...     breadth_first_search as bfs,
... )

>>> def is_twentieth_century(city):
...     return city.year and 1901 <= city.year <= 2000

>>> nodes, graph = load_graph("roadmap.dot", City.from_dict)
>>> city = bfs(graph, nodes["edinburgh"], is_twentieth_century)
>>> city.name
'Lancaster'

>>> for city in breadth_first_traverse(graph, nodes["edinburgh"]):
...     print(city.name)
...
Edinburgh
Dundee
Glasgow
Perth
Stirling
Carlisle
Newcastle upon Tyne
Aberdeen
Inverness
Lancaster
⋮

Как видите, порядок обхода идентичен вашей первой попытке с networkx, подтверждая, что ваш алгоритм работает правильно для этого набора данных. Однако ваши функции не позволяют сортировать соседей в определенном порядке. Попробуйте изменить код, чтобы он принимал необязательную стратегию сортировки. Вы можете щелкнуть сворачиваемый раздел ниже, чтобы увидеть одно из возможных решений:


Алгоритм поиска в ширину гарантирует, что вы в конечном итоге изучите все связанные узлы в графе, пока будете искать тот, который удовлетворяет желаемому условию. Вы можете использовать его, например, для решения лабиринта. Обход в ширину также является основой для поиска кратчайшего пути между двумя узлами в неориентированном и невзвешенном графе. В следующем разделе вы адаптируете свой код именно для этого.

Кратчайший путь с использованием обхода в ширину

Во многих случаях чем меньше узлов на пути от источника к месту назначения, тем короче расстояние. Вы могли бы воспользоваться этим наблюдением, чтобы оценить кратчайшее расстояние, если бы соединения между вашими городами не имели веса. Это было бы эквивалентно одинаковому весу на каждом ребре.


Обход графа с использованием подхода в ширину даст гарантированно путь с наименьшим количеством узлов . Иногда может быть более одного кратчайшего пути между двумя узлами. Например, есть два таких кратчайших пути между Абердином и Пертом, если не принимать во внимание расстояния по дорогам. Как и прежде, фактический результат в таком случае будет зависеть от того, как вы упорядочите соседние узлы.


Вы можете использовать networkx, чтобы выявить все кратчайшие пути между двумя городами, которые будут иметь одинаковую минимальную длину:

>>> import networkx as nx
>>> from graph import City, load_graph

>>> nodes, graph = load_graph("roadmap.dot", City.from_dict)

>>> city1 = nodes["aberdeen"]
>>> city2 = nodes["perth"]

>>> for i, path in enumerate(nx.all_shortest_paths(graph, city1, city2), 1):
...     print(f"{i}.", " → ".join(city.name for city in path))
...
1. Aberdeen → Dundee → Perth
2. Aberdeen → Inverness → Perth

После загрузки графа вы перечисляете кратчайшие пути между двумя городами и печатаете их на экране. Как видите, между Абердином и Пертом есть только два кратчайших пути. Напротив, Лондон и Эдинбург имеют четыре различных кратчайших пути с девятью узлами в каждом, но между ними существует много более длинных путей.


Как обход в ширину помогает точно найти кратчайший путь?

Всякий раз, когда вы посещаете узел, вы должны отслеживать предыдущий узел, который привел вас к нему, сохраняя эту информацию в виде пары ключ-значение в словаре. Позже вы сможете проследить свой путь от пункта назначения до источника, следуя предыдущим узлам. Вернитесь в редактор кода и создайте другую функцию, скопировав и адаптировав код из предыдущей breadth_first_traverse()функции:

# graph.py

# ...

def shortest_path(graph, source, destination, order_by=None):
    queue = Queue(source)
    visited = {source}
    previous = {}
    while queue:
        node = queue.dequeue()
        neighbors = list(graph.neighbors(node))
        if order_by:
            neighbors.sort(key=order_by)
        for neighbor in neighbors:
            if neighbor not in visited:
                visited.add(neighbor)
                queue.enqueue(neighbor)
                previous[neighbor] = node
                if neighbor == destination:
                    return retrace(previous, source, destination)

Эта новая функция принимает другой узел в качестве аргумента и дополнительно позволяет упорядочивать соседей, используя пользовательскую стратегию. Он также определяет пустой словарь, который вы заполняете при посещении соседа, связывая его с предыдущим узлом на вашем пути. Все пары ключ-значение в этом словаре являются непосредственными соседями без каких-либо узлов между ними.


Если между вашим источником и местом назначения существует путь, то функция возвращает список узлов, созданный с помощью другой вспомогательной функции, вместо того, чтобы выдавать отдельные узлы, такие как breadth_first_traverse().

Примечание. Вы можете попробовать реорганизовать этот код, объединив shortest_path()и breadth_first_traverse()в одну функцию, если хотите. Тем не менее, опытные программисты в целом согласны с тем, что небольшое повторение иногда может быть нормальным, если это делает ваш код более понятным и сосредоточенным на одной задаче.

Чтобы воссоздать кратчайший путь между вашим источником и пунктом назначения, вы можете итеративно искать словарь, созданный ранее, когда вы проходили граф с подходом в ширину:

# graph.py

from collections import deque

# ...

def retrace(previous, source, destination):
    path = deque()

    current = destination
    while current != source:
        path.appendleft(current)
        current = previous.get(current)
        if current is None:
            return None

    path.appendleft(source)
    return list(path)

Поскольку вы начинаете с места назначения и возвращаетесь обратно, использование dequeколлекции Python с операцией быстрого добавления слева может быть полезным. На каждой итерации вы добавляете текущий узел к пути и переходите к предыдущему узлу. Вы повторяете эти шаги, пока не дойдете до исходного узла или до тех пор, пока предыдущий узел не исчезнет.


Когда вы вызываете реализацию кратчайшего пути на основе очереди, вы получаете те же результаты, что и с networkx:

>>> from graph import shortest_path

>>> " → ".join(city.name for city in shortest_path(graph, city1, city2))
'Aberdeen → Dundee → Perth'

>>> def by_latitude(city):
...     return -city.latitude
...
>>> " → ".join(
...     city.name
...     for city in shortest_path(graph, city1, city2, by_latitude)
... )
'Aberdeen → Inverness → Perth'

Первый путь следует естественному порядку соседей из файла DOT, тогда как второй предпочитает соседей с более высокой широтой, которую вы указываете с помощью пользовательской стратегии сортировки. Чтобы применить убывающий порядок, вы добавляете знак минус ( -) перед .latitude атрибутом.


Обратите внимание, что для некоторых узлов путь может вообще не существовать. Например, Белфаст и Глазго не имеют наземного сообщения, потому что расположены на двух отдельных островах. Вам нужно сесть на паром, чтобы добраться из одного города в другой. Обход в ширину может сказать вам, остаются ли два узла соединенными или нет. Вот как реализовать такую ​​проверку:

# graph.py

# ...

def connected(graph, source, destination):
    return shortest_path(graph, source, destination) is not None

После запуска с исходного узла и прохождения всего подграфа подключенных узлов, таких как Северная Ирландия, словарь предыдущих узлов не будет включать ваш целевой узел. Таким образом, повторная трассировка немедленно остановится и вернет None, сообщив вам об отсутствии пути между источником и пунктом назначения.


Вы можете убедиться в этом в интерактивном сеансе интерпретатора Python:

>>> from graph import connected
>>> connected(graph, nodes["belfast"], nodes["glasgow"])
False
>>> connected(graph, nodes["belfast"], nodes["derry"])
True

Потрясающе! С помощью пользовательской очереди FIFO вы можете просматривать граф, находить кратчайший путь между двумя узлами и даже определять, связаны ли они. Добавив небольшую поправку в свой код, вы сможете изменить порядок обхода с обхода в ширину на обход в глубину, что вы и сделаете сейчас.

Поиск в глубину с использованием очереди LIFO

Как следует из названия, обход в глубину следует по пути от исходного узла, погружаясь в граф как можно глубже, прежде чем вернуться к последнему пересечению ребра и попробовать другую ветвь. Обратите внимание на разницу в порядке обхода, когда вы изменяете более ранний пример, заменяя его nx.bfs_tree() на nx.dfs_tree():

>>> import networkx as nx
>>> from graph import City, load_graph

>>> def is_twentieth_century(year):
...     return year and 1901 <= year <= 2000
...
>>> nodes, graph = load_graph("roadmap.dot", City.from_dict)
>>> for node in nx.dfs_tree(graph, nodes["edinburgh"]):
...     print("📍", node.name)
...     if is_twentieth_century(node.year):
...         print("Found:", node.name, node.year)
...         break
... else:
...     print("Not found")
...
📍 Edinburgh
📍 Dundee
📍 Aberdeen
📍 Inverness
📍 Perth
📍 Stirling
📍 Glasgow
📍 Carlisle
📍 Lancaster
Found: Lancaster 1937

Теперь выделенные соседи исходного узла больше не исследуются последовательно. Достигнув Данди, алгоритм продолжает двигаться по тому же пути вместо посещения следующего соседа Эдинбурга на первом слое графа.


Чтобы облегчить поиск с возвратом, вы можете заменить очередь FIFO на очередь LIFO в своей функции обхода в ширину, и вы очень близко подойдете к обходу в глубину. Однако он будет вести себя правильно только при обходе древовидных структур данных. В графиках с циклами есть небольшая разница, которая требует дополнительных изменений в вашем коде. В противном случае вы реализуете обход графа на основе стека , который работает совсем по-другому.

Примечание. При обходе бинарного дерева алгоритм поиска в глубину определяет несколько хорошо известных порядков посещения дочерними узлами, например, предварительный порядок, порядок и порядок следования.

В классическом обходе в глубину, помимо замены очереди стеком, вы изначально не будете отмечать исходный узел как посещенный:

# graph.py

from queues import Queue, Stack

# ...

def depth_first_traverse(graph, source, order_by=None):
    stack = Stack(source)
    visited = set()
    while stack:
        if (node := stack.dequeue()) not in visited:
            yield node
            visited.add(node)
            neighbors = list(graph.neighbors(node))
            if order_by:
                neighbors.sort(key=order_by)
            for neighbor in reversed(neighbors):
                stack.enqueue(neighbor)

Обратите внимание, что ваши посещенные узлы инициализируются пустым набором, прежде чем вы начнете извлекать элементы из стека. Вы также проверяете, был ли узел уже посещен намного раньше, чем при обходе в ширину. При повторении соседей вы меняете их порядок, чтобы учесть обращение очереди LIFO. Наконец, вы не отмечаете соседей как посещенных сразу после помещения их в стек.


Поскольку обход в глубину опирается на структуру данных стека, вы можете воспользоваться преимуществами встроенного стека вызовов , чтобы сохранить текущий путь поиска для последующего возврата и рекурсивно переписать свою функцию :

# graph.py

# ...

def recursive_depth_first_traverse(graph, source, order_by=None):
    visited = set()

    def visit(node):
        yield node
        visited.add(node)
        neighbors = list(graph.neighbors(node))
        if order_by:
            neighbors.sort(key=order_by)
        for neighbor in neighbors:
            if neighbor not in visited:
                yield from visit(neighbor)

    return visit(source)

Поступая таким образом, вы избегаете поддержки собственного стека, поскольку Python незаметно для вас помещает вызов каждой функции в стек. Он выталкивает один, когда соответствующая функция возвращает значение. Вам нужно только отслеживать посещенные узлы. Еще одним преимуществом рекурсивной реализации является тот факт, что вам не нужно переворачивать соседей при их переборе, и вы не заталкиваете уже посещенных соседей в стек.


Имея функцию обхода, теперь вы можете реализовать алгоритм поиска в глубину. Поскольку и алгоритмы поиска в ширину, и алгоритмы поиска в глубину выглядят почти одинаково и отличаются только порядком обхода, вы можете реорганизовать свой код, делегировав общие части обоих алгоритмов шаблонной функции:

# graph.py

# ...

def breadth_first_search(graph, source, predicate, order_by=None):
    return search(breadth_first_traverse, graph, source, predicate, order_by)

# ...

def depth_first_search(graph, source, predicate, order_by=None):
    return search(depth_first_traverse, graph, source, predicate, order_by)

def search(traverse, graph, source, predicate, order_by=None):
    for node in traverse(graph, source, order_by):
        if predicate(node):
            return node

Теперь ваши функции breadth_first_search()и depth_first_search()вызываются search()с соответствующей стратегией обхода. Протестируйте их в интерактивном сеансе интерпретатора Python:

>>> from graph import (
...     City,
...     load_graph,
...     depth_first_traverse,
...     depth_first_search as dfs,
... )

>>> def is_twentieth_century(city):
...     return city.year and 1901 <= city.year <= 2000
...
>>> nodes, graph = load_graph("roadmap.dot", City.from_dict)
>>> city = dfs(graph, nodes["edinburgh"], is_twentieth_century)
>>> city.name
'Lancaster'

>>> for city in depth_first_traverse(graph, nodes["edinburgh"]):
...     print(city.name)
...
Edinburgh
Dundee
Aberdeen
Inverness
Perth
Stirling
Glasgow
Carlisle
Lancaster
⋮

Несмотря на то, что результат поиска оказывается таким же, как и при использовании вашего алгоритма поиска в ширину, вы можете ясно видеть, что порядок обхода теперь другой и следует линейному пути.


Вы видели, как выбор между очередью FIFO и LIFO может повлиять на базовый алгоритм обхода графа. До сих пор вы рассматривали только количество промежуточных узлов при поиске кратчайшего пути между двумя городами. В следующем разделе вы сделаете еще один шаг вперед, используя приоритетную очередь для поиска наиболее оптимального маршрута, который иногда может содержать больше узлов.

Использование потокобезопасных очередей

Теперь предположим, что вы написали программу с более чем одним потоком выполнения. Помимо того, что очереди являются ценным алгоритмическим инструментом, они могут помочь абстрагироваться от параллельного доступа к общему ресурсу в многопоточной среде без необходимости явной блокировки. Python предоставляет несколько типов синхронизированных очередей , которые можно безопасно использовать в нескольких потоках для облегчения связи между ними.


В этом разделе вы собираетесь реализовать классическую задачу с несколькими производителями и несколькими потребителями, используя потокобезопасные очереди Python . В частности, вы создадите сценарий командной строки, который позволит вам выбрать количество производителей и потребителей, их относительную скорость и тип очереди:

$ python thread_safe_queues.py --producers 3 \
                               --consumers 2 \
                               --producer-speed 1 \
                               --consumer-speed 1 \
                               --queue fifo

Все параметры являются необязательными и имеют разумные значения по умолчанию. Когда вы запустите этот скрипт, вы увидите анимированную симуляцию потоков производителя и потребителя, взаимодействующих через синхронизированную очередь:

Анимированная визуализация производителей, потребителей и потокобезопасной очереди Визуализация производителей, потребителей и потокобезопасной очереди

Сценарий использует библиотеку Rich , которую вам необходимо сначала установить в виртуальную среду:

(venv) $ python -m pip install rich

Это позволит вам добавлять цвета, смайлики и визуальные компоненты в ваш терминал. Обратите внимание, что некоторые терминалы могут не поддерживать такое форматирование текста.


Прежде чем вы начнете использовать очереди, вам придется немного поработать. Создайте новый файл с именем thread_safe_queues.pyи определите точку входа в ваш скрипт, который будет анализировать аргументы модуля argparse:

# thread_safe_queues.py

import argparse
from queue import LifoQueue, PriorityQueue, Queue

QUEUE_TYPES = {
    "fifo": Queue,
    "lifo": LifoQueue,
    "heap": PriorityQueue
}

def main(args):
    buffer = QUEUE_TYPES[args.queue]()

def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("-q", "--queue", choices=QUEUE_TYPES, default="fifo")
    parser.add_argument("-p", "--producers", type=int, default=3)
    parser.add_argument("-c", "--consumers", type=int, default=2)
    parser.add_argument("-ps", "--producer-speed", type=int, default=1)
    parser.add_argument("-cs", "--consumer-speed", type=int, default=1)
    return parser.parse_args()

if __name__ == "__main__":
    try:
        main(parse_args())
    except KeyboardInterrupt:
        pass

Сначала вы импортируете необходимые модули и классы очередей в глобальное пространство имен. Функция main()— это ваша точка входа, которая получает проанализированные аргументы, предоставленные parse_args(), что определено ниже. Словарь QUEUE_TYPESсопоставляет имена очередей с соответствующими классами, которые вы вызываете для создания нового экземпляра очереди на основе значения аргумента командной строки.


Затем вы определяете продукты, которые ваши производители будут выбирать наугад и делать вид, что работают над ними:

# thread_safe_queues.py

# ...

PRODUCTS = (
    ":balloon:",
    ":cookie:",
    ":crystal_ball:",
    ":diving_mask:",
    ":flashlight:",
    ":gem:",
    ":gift:",
    ":kite:",
    ":party_popper:",
    ":postal_horn:",
    ":ribbon:",
    ":rocket:",
    ":teddy_bear:",
    ":thread:",
    ":yo-yo:",
)

# ...

Это текстовые коды, которые Рич в конечном итоге заменит соответствующими глифами эмодзи . Например, :balloon: будет отображаться как 🎈. Вы можете найти все коды смайликов, доступные в Rich, запустив их python -m rich.emoji в своем терминале.


Ваши потоки производителя и потребителя будут иметь множество общих атрибутов и поведений, которые вы можете инкапсулировать в общий базовый класс:

# thread_safe_queues.py

import threading

# ...

class Worker(threading.Thread):
    def __init__(self, speed, buffer):
        super().__init__(daemon=True)
        self.speed = speed
        self.buffer = buffer
        self.product = None
        self.working = False
        self.progress = 0

Рабочий класс расширяет threading.Threadкласс и настраивает себя как поток демона , чтобы его экземпляры не препятствовали выходу вашей программы после завершения основного потока, например, из-за сигнала прерывания клавиатуры . Рабочий объект ожидает, что скорость будет работать, а буферная очередь будет помещать готовые продукты или получать их из них.


В дополнение к этому вы сможете проверить состояние рабочего потока и запросить, чтобы он имитировал некоторую работу или время простоя:

# thread_safe_queues.py

from random import randint
from time import sleep

# ...

class Worker(threading.Thread):
    # ...

    @property
    def state(self):
        if self.working:
            return f"{self.product} ({self.progress}%)"
        return ":zzz: Idle"

    def simulate_idle(self):
        self.product = None
        self.working = False
        self.progress = 0
        sleep(randint(1, 3))

    def simulate_work(self):
        self.working = True
        self.progress = 0
        delay = randint(1, 1 + 15 // self.speed)
        for _ in range(100):
            sleep(delay / 100)
            self.progress += 1

Свойство возвращает строку либо с названием продукта и ходом работы, либо с общим сообщением, указывающим, что рабочий процесс в данный момент бездействует .state . Метод .simulate_idle()сбрасывает состояние рабочего потока и переходит в спящий режим на несколько случайно выбранных секунд. Точно так же .simulate_work()выбирает случайную задержку в секундах, адаптированную к скорости рабочего, и продвигается по работе.


Далее вы определите классы производителя и потребителя и соедините части вместе.

queue.Queue

Первая синхронизированная очередь, которую вы попробуете, — это неограниченная очередь FIFO или, проще говоря, очередь. Вам нужно будет передать его как вашим производителям, так и потребителям, так что вперед и определите их сейчас. Поток-производитель расширит рабочий класс и возьмет на выбор дополнительную коллекцию продуктов:

# thread_safe_queues.py

from random import choice, randint

# ...

class Producer(Worker):
    def __init__(self, speed, buffer, products):
        super().__init__(speed, buffer)
        self.products = products

    def run(self):
        while True:
            self.product = choice(self.products)
            self.simulate_work()
            self.buffer.put(self.product)
            self.simulate_idle()

# ...

Именно в .run() методе происходит все волшебство. Производитель работает в бесконечном цикле, выбирая случайный продукт и моделируя некоторую работу, прежде чем поместить этот продукт в очередь, называемую buffer. Затем он засыпает на случайный период времени, а когда снова просыпается, процесс повторяется.


Потребитель очень похож, но даже более прямолинеен, чем производитель:

# thread_safe_queues.py

# ...

class Consumer(Worker):
    def run(self):
        while True:
            self.product = self.buffer.get()
            self.simulate_work()
            self.buffer.task_done()
            self.simulate_idle()

# ...

Он также работает в бесконечном цикле, ожидая появления продукта в очереди. По умолчанию этот .get()метод блокируется , поэтому поток потребителя будет остановлен и будет ждать, пока в очереди не окажется хотя бы один продукт. Таким образом, ожидающий потребитель не будет тратить циклы ЦП впустую, в то время как операционная система выделяет ценные ресурсы другим потокам, выполняющим полезную работу.


Примечание. Чтобы избежать тупиковой ситуации , вы можете дополнительно установить тайм-аут для .get()метода, передав timeoutаргумент ключевого слова с количеством секунд ожидания, прежде чем сдаться.


Каждый раз, когда вы получаете что-то из синхронизированной очереди, ее внутренний счетчик увеличивается, чтобы другие потоки знали, что очередь еще не опустошена. Поэтому важно пометить задачу, удаленную из очереди, как выполненную, когда вы закончите ее обработку, если только у вас нет ни одного потока, присоединившегося к очереди. Это уменьшает внутренний счетчик очереди.


Теперь вернитесь к своей main()функции, создайте потоки производителя и потребителя и запустите их:

# thread_safe_queues.py

# ...

def main(args):
    buffer = QUEUE_TYPES[args.queue]()
    producers = [
        Producer(args.producer_speed, buffer, PRODUCTS)
        for _ in range(args.producers)
    ]
    consumers = [
        Consumer(args.consumer_speed, buffer) for _ in range(args.consumers)
    ]

    for producer in producers:
        producer.start()

    for consumer in consumers:
        consumer.start()

    view = View(buffer, producers, consumers)
    view.animate()

Количество производителей и потребителей определяется аргументами командной строки, переданными в вашу функцию. Они начнут работать и использовать очередь для межпотокового взаимодействия, как только вы их запустите. Экземпляр Viewвнизу постоянно перерисовывает экран, чтобы отразить текущее состояние вашего приложения:

Поточно-безопасная очередь FIFO

Производители всегда будут продвигать свою готовую продукцию через эту очередь к потребителям. Хотя иногда может показаться, что потребитель берет элемент непосредственно у производителя, это происходит только потому, что все происходит слишком быстро, чтобы заметить операции постановки в очередь и удаления из очереди.

Примечание. Стоит отметить, что всякий раз, когда производитель помещает что-то в синхронизированную очередь, максимум один потребитель удаляет этот элемент из очереди и обрабатывает его без ведома других потребителей. Если вы хотите уведомить более одного потребителя об определенном событии в вашей программе, взгляните на другие примитивы координации потоков в модуле threading.

Вы можете увеличить количество производителей, их скорость или и то, и другое, чтобы увидеть, как эти изменения повлияют на общую производительность вашей системы. Поскольку очередь не ограничена, она никогда не замедлит работу производителей. Однако, если вы укажете параметр очереди maxsize, то он начнет блокировать их до тех пор, пока в очереди снова не будет достаточно места.


Использование очереди FIFO заставляет производителей помещать элементы в левый конец очереди в визуализации выше. В то же время потребители конкурируют друг с другом за самый правый товар в очереди. В следующем разделе вы увидите, как это поведение изменится, когда вы вызовете скрипт с опцией --queue lifo.

queue.LifoQueue

С точки зрения ваших работников, нет абсолютно никакой необходимости вносить какие-либо изменения в ваш код, чтобы изменить то, как они общаются. Просто внедрив в них другой тип синхронизированной очереди, вы можете модифицировать правила общения воркеров. Запустите свой скрипт с очередью LIFO сейчас:

$ python thread_safe_queues.py --queue lifo

Когда вы используете очередь LIFO или стек, каждый новый продукт, который только что был создан, будет иметь приоритет над более старыми в очереди:

Поточно-безопасная очередь LIFO

В редких случаях, когда новые продукты создаются быстрее, чем ваши потребители могут с ними справиться, старые продукты могут страдать от голода , потому что они застревают в нижней части стека и никогда не потребляются. С другой стороны, это может не быть проблемой, если у вас достаточно большой пул потребителей или когда вы не получаете столько входящих продуктов. Голодание также может включать элементы в очереди с приоритетом, о чем вы прочтете далее.

queue.PriorityQueue

Чтобы использовать синхронизированную приоритетную очередь или кучу, вам потребуется внести в свой код несколько изменений. Прежде всего, вам понадобится новый тип продукта, который имеет соответствующий приоритет, поэтому определите два новых типа данных:

# thread_safe_queues.py

from dataclasses import dataclass, field
from enum import IntEnum

# ...

@dataclass(order=True)
class Product:
    priority: int
    label: str = field(compare=False)

    def __str__(self):
        return self.label

class Priority(IntEnum):
    HIGH = 1
    MEDIUM = 2
    LOW = 3

PRIORITIZED_PRODUCTS = (
    Product(Priority.HIGH, ":1st_place_medal:"),
    Product(Priority.MEDIUM, ":2nd_place_medal:"),
    Product(Priority.LOW, ":3rd_place_medal:"),
)

Для представления продуктов вы используете класс данных с настраиваемым строковым представлением и включенным упорядочением, но будьте осторожны, чтобы не сравнивать продукты по их меткам. В этом случае вы ожидаете, что метка будет строкой, но, как правило, это может быть любой объект, который вообще может быть несопоставим. Вы также определяете класс enum с известными значениями приоритета и три продукта с убывающими приоритетами от высшего к низшему.

Примечание. В отличие от вашей предыдущей реализации очереди с приоритетом, потокобезопасная очередь Python сначала упорядочивает элементы с наименьшим числовым значением приоритета.

Кроме того, когда пользователь указывает --queue heap параметр в командной строке, вы должны указать правильный набор продуктов для ваших потоков-производителей:

# thread_safe_queues.py

# ...

def main(args):
    buffer = QUEUE_TYPES[args.queue]()
    products = PRIORITIZED_PRODUCTS if args.queue == "heap" else PRODUCTS
    producers = [
        Producer(args.producer_speed, buffer, products)
        for _ in range(args.producers)
    ]
    # ...

Вы предоставляете простые или приоритетные продукты в зависимости от аргумента командной строки, используя условное выражение.


Остальная часть вашего кода может оставаться независимой от этого изменения, пока производители и потребители знают, как работать с новым типом продукта. Поскольку это всего лишь симуляция, рабочие потоки на самом деле не делают ничего полезного с продуктами, поэтому вы можете запустить свой скрипт с флагом --queue heap и увидеть результат:

Потокобезопасная приоритетная очередь

Помните, что структура данных в виде кучи представляет собой двоичное дерево, в котором сохраняются определенные отношения между его элементами. Таким образом, хотя продукты в приоритетной очереди кажутся не совсем правильными, на самом деле они потребляются в правильном порядке. Кроме того, из-за недетерминированного характера многопоточного программирования очереди Python не всегда сообщают свой самый актуальный размер.


Итак, вы познакомились с традиционным способом координации рабочих потоков с использованием трех типов синхронизированных очередей. Потоки Python хорошо подходят для задач , связанных с вводом-выводом , которые проводят большую часть своего времени в ожидании данных в сети, файловой системе или базе данных. Однако недавно появилась однопоточная альтернатива синхронизированным очередям, использующая преимущества асинхронных функций Python . Это то, на что вы сейчас посмотрите.

Использование асинхронных очередей

Если вы хотите использовать очереди в асинхронном контексте, вам поможет Python. Модуль asyncio предоставляет асинхронные аналоги очередям из threadingмодуля, которые вы можете использовать в функциях сопрограммы в одном потоке. Поскольку оба семейства очередей имеют схожий интерфейс, переключение с одного на другое должно быть относительно безболезненным. Кстати, если вам интересно как работает асинхронный Python, то можно посмотреть здесь.


В этом разделе вы напишете простейший веб-краулер , который рекурсивно переходит по ссылкам на указанном веб-сайте до заданного уровня глубины и подсчитывает количество посещений каждой ссылки. Для асинхронного извлечения данных вы будете использовать популярную aiohttp библиотеку, а для анализа гиперссылок HTML — beautifulsoup4. Прежде чем продолжить, обязательно установите обе библиотеки в свою виртуальную среду:

(venv) $ python -m pip install aiohttp beautifulsoup4

Теперь вы можете выполнять HTTP-запросы асинхронно и выбирать HTML-элементы из так называемого супа тегов, полученного с сервера.

Примечание. Вы можете использовать Beautiful Soup и Python для создания парсера , который собирает ценные данные при посещении веб-страниц.

Чтобы заложить основу для вашего поискового робота, вы сначала создадите несколько строительных блоков. Создайте новый файл с именем async_queues.pyи определите в нем следующую структуру:

# async_queues.py

import argparse
import asyncio
from collections import Counter

import aiohttp

async def main(args):
    session = aiohttp.ClientSession()
    try:
        links = Counter()
        display(links)
    finally:
        await session.close()

def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("url")
    parser.add_argument("-d", "--max-depth", type=int, default=2)
    parser.add_argument("-w", "--num-workers", type=int, default=3)
    return parser.parse_args()

def display(links):
    for url, count in links.most_common():
        print(f"{count:>3} {url}")

if __name__ == "__main__":
    asyncio.run(main(parse_args()))

Как и в большинстве асинхронных программ, вы передаете свою main() сопрограмму asyncio.run(), чтобы она могла выполнить ее в цикле событий по умолчанию . Сопрограмма принимает несколько аргументов командной строки, анализируемых вспомогательной функцией, определенной ниже, запускает новый aiohttp.ClientSession и определяет счетчик посещенных ссылок. Позже сопрограмма выводит список ссылок, отсортированных по количеству посещений в порядке убывания.


Чтобы запустить скрипт, вы укажете корневой URL-адрес и, при желании, максимальную глубину и количество рабочих процессов. Вот пример:

$ python async_queues.py https://www.python.org/ --max-depth 2 \
                                                 --num-workers 3

Есть еще несколько отсутствующих частей, таких как получение контента и анализ ссылок HTML, поэтому добавьте их в свой файл:

# async_queues.py

from urllib.parse import urljoin
from bs4 import BeautifulSoup

# ...

async def fetch_html(session, url):
    async with session.get(url) as response:
        if response.ok and response.content_type == "text/html":
            return await response.text()

def parse_links(url, html):
    soup = BeautifulSoup(html, features="html.parser")
    for anchor in soup.select("a[href]"):
        href = anchor.get("href").lower()
        if not href.startswith("javascript:"):
            yield urljoin(url, href)

Вы вернете полученный контент только в том случае, если он представляет собой HTML, о чем вы можете узнать, посмотрев на Content-Type HTTP-заголовок . При извлечении ссылок из HTML-содержимого вы пропустите встроенный JavaScript в hrefатрибуте и при желании присоедините относительный путь к текущему URL-адресу.


Далее вы собираетесь определить новый тип данных, представляющий задание, которое вы поместите в очередь, а также асинхронный рабочий процесс, выполняющий задание:

# async_queues.py

import sys
from typing import NamedTuple

# ...

class Job(NamedTuple):
    url: str
    depth: int = 1

# ...

async def worker(worker_id, session, queue, links, max_depth):
    print(f"[{worker_id} starting]", file=sys.stderr)
    while True:
        url, depth = await queue.get()
        links[url] += 1
        try:
            if depth <= max_depth:
                print(f"[{worker_id} {depth=} {url=}]", file=sys.stderr)
                if html := await fetch_html(session, url):
                    for link_url in parse_links(url, html):
                        await queue.put(Job(link_url, depth + 1))
        except aiohttp.ClientError:
            print(f"[{worker_id} failed at {url=}]", file=sys.stderr)
        finally:
            queue.task_done()

Задание состоит из URL-адреса для посещения и текущей глубины, которую работник будет использовать, чтобы остановить рекурсивное сканирование. Благодаря указанию задания в виде именованного кортежа вы распаковываете его отдельные компоненты в выделенной строке после удаления из очереди. Если вы не указываете глубину для задания, по умолчанию она равна единице.


Рабочий сидит в бесконечном цикле, ожидая поступления задания в очередь. После использования одного задания рабочий может поместить одно или несколько новых заданий с увеличенной глубиной в очередь для использования им самим или другими рабочими.


Поскольку ваш worker одновременно является производителем и потребителем , очень важно безоговорочно пометить работу как выполненную в предложении tryfinally, чтобы избежать тупиковой ситуации. Вы также должны обрабатывать ошибки в своем воркере, потому что в противном случае необработанные исключения заставят ваш воркер перестать принимать новые задания.

Примечание. Вы можете использовать эту print()функцию в асинхронном коде, например, для регистрации диагностических сообщений , поскольку все выполняется в одном потоке. С другой стороны, вам придется заменить его модулем loggingв многопоточном коде, потому что print()функция не является потокобезопасной.

Кроме того, обратите внимание, что вы печатаете диагностические сообщения в стандартную ошибку (stderr) , в то время как вывод вашей программы выводится в стандартный вывод (stdout) , которые представляют собой два совершенно разных потока. Это позволяет вам, например, перенаправить один или оба файла.


Ваш работник увеличивает количество обращений при посещении URL-адреса. Кроме того, если глубина текущего URL-адреса не превышает максимально допустимую глубину, рабочий процесс извлекает содержимое HTML, на которое указывает URL-адрес, и перебирает его ссылки. Оператор walrus ( :=) позволяет дождаться ответа HTTP, проверить, было ли возвращено содержимое, и присвоить результат переменной htmlв одном выражении.


Последний оставшийся шаг — создать экземпляр асинхронной очереди и передать его рабочим процессам.

asyncio.Queue

В этом разделе вы обновите свою main()сопрограмму, создав очередь и асинхронные задачи, которые запускают ваши рабочие процессы. Каждый воркер получит уникальный идентификатор, чтобы различать его в логе сообщения, aiohttpсессию, экземпляр очереди, счетчик посещений конкретной ссылки и максимальную глубину. Поскольку вы используете один поток, вам не нужно обеспечивать взаимоисключающий доступ к общим ресурсам:

# async_queues.py

# ...

async def main(args):
    session = aiohttp.ClientSession()
    try:
        links = Counter()
        queue = asyncio.Queue()
        tasks = [
            asyncio.create_task(
                worker(
                    f"Worker-{i + 1}",
                    session,
                    queue,
                    links,
                    args.max_depth,
                )
            )
            for i in range(args.num_workers)
        ]

        await queue.put(Job(args.url))
        await queue.join()

        for task in tasks:
            task.cancel()

        await asyncio.gather(*tasks, return_exceptions=True)

        display(links)
    finally:
        await session.close()

# ...

Вот построчная разбивка обновленного кода:

  • Строка 9 создает экземпляр асинхронной очереди FIFO.
  • Строки с 10 по 21 создают ряд рабочих сопрограмм, заключенных в асинхронные задачи , которые начинают выполняться как можно скорее в фоновом режиме в цикле событий.
  • Строка 23 помещает первое задание в очередь, что запускает сканирование.
  • Строка 24 заставляет основную сопрограмму ждать, пока очередь не будет опустошена и больше не останется заданий для выполнения.
  • Строки с 26 по 29 выполняют изящную очистку, когда фоновые задачи больше не нужны.

Пожалуйста, не запускайте поисковый робот на реальном веб-сайте, размещенном в Интернете. Это может вызвать нежелательный всплеск сетевого трафика и доставить вам неприятности. Чтобы протестировать поисковый робот, вам лучше запустить HTTP-сервер, встроенный в Python, который превращает локальную папку в вашей файловой системе в веб-сайт, по которому можно перемещаться. Например, следующая команда запустит сервер в локальной папке с виртуальной средой Python:

$ cd venv/
$ python -m http.server
Serving HTTP on 0.0.0.0 port 8000 (http://0.0.0.0:8000/) ...

Однако это не идеальная аналогия с реальным веб-сайтом, потому что файлы и папки составляют древовидную иерархию, тогда как веб-сайты часто представлены плотными мультиграфами с обратными ссылками. В любом случае, когда вы запустите поисковый робот по выбранному URL-адресу в другом окне терминала, вы заметите, что сканер переходит по ссылкам в их естественном порядке:

$ python async_queues.py http://localhost:8000 --max-depth=4
[Worker-1 starting]
[Worker-1 depth=1 url='http://localhost:8000']
[Worker-2 starting]
[Worker-3 starting]
[Worker-1 depth=2 url='http://localhost:8000/bin/']
[Worker-2 depth=2 url='http://localhost:8000/include/']
[Worker-3 depth=2 url='http://localhost:8000/lib/']
[Worker-2 depth=2 url='http://localhost:8000/lib64/']
[Worker-1 depth=2 url='http://localhost:8000/pyvenv.cfg']
[Worker-3 depth=3 url='http://localhost:8000/bin/activate']
[Worker-2 depth=3 url='http://localhost:8000/bin/activate.csh']
[Worker-1 depth=3 url='http://localhost:8000/bin/activate.fish']
[Worker-3 depth=3 url='http://localhost:8000/bin/activate.ps1']
[Worker-2 depth=3 url='http://localhost:8000/bin/pip']
[Worker-3 depth=3 url='http://localhost:8000/bin/pip3']
[Worker-1 depth=3 url='http://localhost:8000/bin/pip3.10']
[Worker-2 depth=3 url='http://localhost:8000/bin/python']
[Worker-3 depth=3 url='http://localhost:8000/bin/python3']
[Worker-1 depth=3 url='http://localhost:8000/bin/python3.10']
[Worker-2 depth=3 url='http://localhost:8000/lib/python3.10/']
[Worker-3 depth=3 url='http://localhost:8000/lib64/python3.10/']
[Worker-2 depth=4 url='http://localhost:8000/lib/python3.10/site-packages/']
[Worker-3 depth=4 url='http://localhost:8000/lib64/python3.10/site-packages/']
⋮

Он посещает единственный URL на первом уровне с глубиной, равной единице. Затем, пройдя по всем ссылкам на втором уровне, краулер переходит на третий уровень и так далее, пока не достигнет запрошенного максимального уровня глубины. После того, как все ссылки на заданном уровне исследованы, сканер никогда не возвращается на более ранний уровень. Это прямое следствие использования очереди FIFO, которая отличается от использования стека или очереди LIFO.

asyncio.LifoQueue

Как и в случае с синхронизированными очередями, их асинхронные компаньоны позволяют вам изменять поведение ваших воркеров без изменения их кода. Вернитесь к своему async_queuesмодулю и замените существующую очередь FIFO на очередь LIFO:

# async_queues.py

# ...

async def main(args):
    session = aiohttp.ClientSession()
    try:
        links = Counter()
        queue = asyncio.LifoQueue()
        tasks = [
            asyncio.create_task(
                worker(
                    f"Worker-{i + 1}",
                    session,
                    queue,
                    links,
                    args.max_depth,
                )
            )
            for i in range(args.num_workers)
        ]

        await queue.put(Job(args.url))
        await queue.join()

        for task in tasks:
            task.cancel()

        await asyncio.gather(*tasks, return_exceptions=True)

        display(links)
    finally:
        await session.close()

# ...

Не останавливая HTTP-сервер, снова запустите поисковый робот, используя те же параметры:

$ python async_queues.py http://localhost:8000 --max-depth=4
[Worker-1 starting]
[Worker-1 depth=1 url='http://localhost:8000']
[Worker-2 starting]
[Worker-3 starting]
[Worker-1 depth=2 url='http://localhost:8000/pyvenv.cfg']
[Worker-2 depth=2 url='http://localhost:8000/lib64/']
[Worker-3 depth=2 url='http://localhost:8000/lib/']
[Worker-1 depth=2 url='http://localhost:8000/include/']
[Worker-2 depth=3 url='http://localhost:8000/lib64/python3.10/']
[Worker-3 depth=3 url='http://localhost:8000/lib/python3.10/']
[Worker-1 depth=2 url='http://localhost:8000/bin/']
[Worker-2 depth=4 url='http://localhost:8000/lib64/python3.10/site-packages/']
[Worker-1 depth=3 url='http://localhost:8000/bin/python3.10']
[Worker-2 depth=3 url='http://localhost:8000/bin/python3']
[Worker-3 depth=4 url='http://localhost:8000/lib/python3.10/site-packages/']
[Worker-1 depth=3 url='http://localhost:8000/bin/python']
[Worker-2 depth=3 url='http://localhost:8000/bin/pip3.10']
[Worker-1 depth=3 url='http://localhost:8000/bin/pip3']
[Worker-3 depth=3 url='http://localhost:8000/bin/pip']
[Worker-2 depth=3 url='http://localhost:8000/bin/activate.ps1']
[Worker-1 depth=3 url='http://localhost:8000/bin/activate.fish']
[Worker-3 depth=3 url='http://localhost:8000/bin/activate.csh']
[Worker-2 depth=3 url='http://localhost:8000/bin/activate']
⋮

Предполагая, что содержимое не изменилось с момента последнего запуска, сканер посещает идентичные ссылки, но в другом порядке. Выделенные линии указывают на посещение ссылки на ранее исследованном уровне глубины.

Примечание. Если вы отслеживали уже посещенные ссылки и пропускали их при последующих встречах, это может привести к различным результатам в зависимости от используемого типа очереди. Это связано с тем, что многие альтернативные пути могут начинаться на разных уровнях глубины, но вести к одному и тому же месту назначения.

Далее вы увидите асинхронную очередь приоритетов в действии.

asyncio.PriorityQueue

Чтобы использовать ваши задания в очереди с приоритетом, вы должны указать, как их сравнивать при выборе их приоритетов. Например, вы можете сначала посетить более короткие URL-адреса.


Идем дальше и добавляем .__lt__()в свой класс специальный метод , которому делегирует оператор Jobless than ( ) при сравнении двух экземпляров задания:<

# async_queues.py

# ...

class Job(NamedTuple):
    url: str
    depth: int = 1

    def __lt__(self, other):
        if isinstance(other, Job):
            return len(self.url) < len(other.url)

Если вы сравниваете задание с совершенно другим типом данных, вы не можете сказать, какой из них меньше, поэтому вы неявно возвращаете None. С другой стороны, при сравнении двух экземпляров класса Jobвы решаете их приоритеты, изучая длины их соответствующих .urlполей:

>>> from async_queues import Job
>>> job1 = Job("http://localhost/")
>>> job2 = Job("https://localhost:8080/")
>>> job1 < job2
True

Чем короче URL-адрес, тем выше приоритет, поскольку меньшие значения имеют приоритет в минимальной куче.


Последнее изменение, которое нужно внести в ваш сценарий, — это использование очереди с асинхронным приоритетом вместо двух других:

# async_queues.py

# ...

async def main(args):
    session = aiohttp.ClientSession()
    try:
        links = Counter()
        queue = asyncio.PriorityQueue()
        tasks = [
            asyncio.create_task(
                worker(
                    f"Worker-{i + 1}",
                    session,
                    queue,
                    links,
                    args.max_depth,
                )
            )
            for i in range(args.num_workers)
        ]

        await queue.put(Job(args.url))
        await queue.join()

        for task in tasks:
            task.cancel()

        await asyncio.gather(*tasks, return_exceptions=True)

        display(links)
    finally:
        await session.close()

# ...

Попробуйте запустить поисковый робот с еще большим значением максимальной глубины, скажем, пятью:

$ python async_queues.py http://localhost:8000 --max-depth 5
[Worker-1 starting]
[Worker-1 depth=1 url='http://localhost:8000']
[Worker-2 starting]
[Worker-3 starting]
[Worker-1 depth=2 url='http://localhost:8000/bin/']
[Worker-2 depth=2 url='http://localhost:8000/lib/']
[Worker-3 depth=2 url='http://localhost:8000/lib64/']
[Worker-3 depth=2 url='http://localhost:8000/include/']
[Worker-2 depth=2 url='http://localhost:8000/pyvenv.cfg']
[Worker-1 depth=3 url='http://localhost:8000/bin/pip']
[Worker-3 depth=3 url='http://localhost:8000/bin/pip3']
[Worker-2 depth=3 url='http://localhost:8000/bin/python']
[Worker-1 depth=3 url='http://localhost:8000/bin/python3']
[Worker-3 depth=3 url='http://localhost:8000/bin/pip3.10']
[Worker-2 depth=3 url='http://localhost:8000/bin/activate']
[Worker-1 depth=3 url='http://localhost:8000/bin/python3.10']
[Worker-3 depth=3 url='http://localhost:8000/lib/python3.10/']
[Worker-2 depth=3 url='http://localhost:8000/bin/activate.ps1']
[Worker-3 depth=3 url='http://localhost:8000/bin/activate.csh']
[Worker-1 depth=3 url='http://localhost:8000/lib64/python3.10/']
[Worker-2 depth=3 url='http://localhost:8000/bin/activate.fish']
[Worker-3 depth=4 url='http://localhost:8000/lib/python3.10/site-packages/']
[Worker-1 depth=4 url='http://localhost:8000/lib64/python3.10/site-packages/']
⋮

Вы сразу заметите, что ссылки обычно просматриваются в порядке, определяемом длиной URL. Естественно, точный порядок будет немного меняться при каждом запуске из-за недетерминированного характера времени, необходимого серверу для ответа.


Асинхронные очереди — довольно новое дополнение к стандартной библиотеке Python. Они преднамеренно имитируют интерфейс соответствующих потокобезопасных очередей, что должно позволить любому опытному питонисту чувствовать себя как дома. Вы можете использовать асинхронные очереди для обмена данными между сопрограммами.

Интеграция Python с распределенными очередями сообщений

В распределенных системах с большим количеством движущихся частей часто желательно отделить компоненты приложения с помощью промежуточного брокера сообщений , который берет на себя бремя отказоустойчивой доставки сообщений между службами производителя и потребителя. Обычно для этого требуется собственная инфраструктура, что является как преимуществом, так и недостатком.


С одной стороны, это еще один уровень абстракции, который усложняет работу и требует обслуживания, но при правильной настройке он может обеспечить следующие преимущества:

  • Слабая связь: вы можете изменить или заменить один компонент другим, не затрагивая остальную часть вашей системы.
  • Гибкость: вы можете изменить бизнес-правила вашей системы, изменив конфигурацию брокера и правила доставки сообщений без написания кода.
  • Масштабируемость: вы можете динамически добавлять дополнительные компоненты определенного типа, чтобы справляться с возросшей рабочей нагрузкой в ​​определенной функциональной области.
  • Надежность: потребителям может потребоваться подтвердить сообщение, прежде чем брокер удалит его из очереди, чтобы обеспечить безопасную доставку. Запуск брокера в кластере может обеспечить дополнительную отказоустойчивость.
  • Постоянство: брокер может держать некоторые сообщения в очереди, пока потребители отключены из-за сбоя.
  • Производительность. Использование выделенной инфраструктуры для брокера сообщений разгружает службы приложений.

Существует множество различных типов брокеров сообщений и сценариев их использования. В этом разделе вы познакомитесь с некоторыми из них.

RabbitMQ:pika

RabbitMQ, вероятно, является одним из самых популярных брокеров сообщений с открытым исходным кодом, который позволяет маршрутизировать сообщения от производителей к потребителям несколькими способами. Вы можете легко запустить новый брокер RabbitMQ, не устанавливая его на свой компьютер, запустив временный контейнер Docker :

$ docker run -it --rm --name rabbitmq -p 5672:5672 rabbitmq

После запуска вы можете подключиться к нему на своем локальном хосте и порту по умолчанию 5672. Официальная документация рекомендует использовать библиотеку Pika для подключения к экземпляру RabbitMQ в Python. Вот как может выглядеть рудиментарный производитель:

# producer.py

import pika

QUEUE_NAME = "mailbox"

with pika.BlockingConnection() as connection:
    channel = connection.channel()
    channel.queue_declare(queue=QUEUE_NAME)
    while True:
        message = input("Message: ")
        channel.basic_publish(
            exchange="",
            routing_key=QUEUE_NAME,
            body=message.encode("utf-8")
        )

Вы открываете соединение, используя параметры по умолчанию, которые предполагают, что RabbitMQ уже запущен на вашем локальном компьютере. Затем вы создаете новый канал, который представляет собой облегченную абстракцию поверх TCP-соединения. Вы можете иметь несколько независимых каналов для отдельных передач. Перед входом в цикл убедитесь, что указанная очередь mailboxсуществует в брокере. Наконец, вы продолжаете публиковать сообщения, прочитанные пользователем.


Потребитель лишь немного длиннее, так как требует определения функции обратного вызова для обработки сообщений:

# consumer.py

import pika

QUEUE_NAME = "mailbox"

def callback(channel, method, properties, body):
    message = body.decode("utf-8")
    print(f"Got message: {message}")

with pika.BlockingConnection() as connection:
    channel = connection.channel()
    channel.queue_declare(queue=QUEUE_NAME)
    channel.basic_consume(
        queue=QUEUE_NAME,
        auto_ack=True,
        on_message_callback=callback
    )
    channel.start_consuming()

Большая часть шаблонного кода похожа на вашего производителя. Однако вам не нужно писать явный цикл, потому что потребитель будет бесконечно прослушивать сообщения.


Идите вперед и запустите несколько производителей и потребителей в отдельных вкладках терминала. Обратите внимание, что происходит, когда первый потребитель подключается к RabbitMQ после того, как в очереди уже есть несколько неиспользованных сообщений, или если к брокеру подключено более одного потребителя.

Redis:redis

Redis — это сокращение от Remote Dictionary Server, но на самом деле в нем скрыто много вещей. Это хранилище данных типа «ключ-значение» в памяти, которое обычно работает как сверхбыстрый кэш между традиционной базой данных SQL и сервером. В то же время он может служить постоянной базой данных NoSQL , а также брокером сообщений в модели публикации-подписки . Вы можете запустить локальный сервер Redis с помощью Docker:

$ docker run -it --rm --name redis -p 6379:6379 redis

Когда вы это сделаете, вы сможете подключиться к работающему контейнеру с помощью интерфейса командной строки Redis:

$ docker exec -it redis redis-cli
127.0.0.1:6379>

Взгляните на список команд в официальной документации и попробуйте их, пока вы подключены к серверу Redis. Кроме того, вы можете сразу перейти к Python. Первой библиотекой, указанной на официальной странице Redis, является redis, но стоит отметить, что вы можете выбирать из множества альтернатив, включая асинхронные.


Написание простого издателя занимает всего пару строк кода на Python:

# publisher.py

import redis

with redis.Redis() as client:
    while True:
        message = input("Message: ")
        client.publish("chatroom", message)

Вы подключаетесь к локальному экземпляру сервера Redis и сразу же начинаете публиковать сообщения на chatroomканале. Вам не нужно создавать каналы, потому что Redis сделает это за вас. Для подписки на канал требуется один дополнительный шаг — создание PubSubобъекта для вызова .subscribe()метода:

# subscriber.py

import redis

with redis.Redis() as client:
    pubsub = client.pubsub()
    pubsub.subscribe("chatroom")
    for message in pubsub.listen():
        if message["type"] == "message":
            body = message["data"].decode("utf-8")
            print(f"Got message: {body}")

Сообщения, полученные подписчиком, представляют собой словари Python с некоторыми метаданными, что позволяет вам решать, как с ними обращаться. Если у вас есть несколько активных подписчиков, которые слушают канал, все они получат одно и то же сообщение. С другой стороны, сообщения по умолчанию не сохраняются.

Kafka:kafka-python3

Kafka — безусловно, самый продвинутый и сложный из трех брокеров сообщений, с которыми вы познакомитесь в этом руководстве. Это распределенная платформа потоковой передачи, используемая в приложениях, управляемых событиями в реальном времени. Его основным преимуществом является способность обрабатывать большие объемы данных практически без задержки производительности.


Чтобы запустить Kafka, вам нужно настроить распределенный кластер. Вы можете использовать Docker Compose для запуска многоконтейнерного приложения Docker за один раз. Например, вы можете взять Apache Kafka, упакованный Bitnami :

# docker-compose.yml

version: "3"
services:
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: 'bitnami/kafka:latest'
    ports:
      - '9092:9092'
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper

Когда вы сохраните эту конфигурацию в файле с именем docker-compose.yml, вы сможете запустить две службы, выполнив следующую команду:

$ docker-compose up

Иногда вы можете столкнуться с проблемами, когда версия Kafka не соответствует версии вашей клиентской библиотеки. Библиотека Python, которая, кажется, поддерживает довольно недавнюю версию Kafka kafka-python3, создана по образцу клиента Java.


Ваш продюсер может отправлять сообщения на заданную тему следующим образом:

# producer.py

from kafka3 import KafkaProducer

producer = KafkaProducer(bootstrap_servers="localhost:9092")
while True:
    message = input("Message: ")
    producer.send(
        topic="datascience",
        value=message.encode("utf-8"),
    )

Этот .send()метод является асинхронным, поскольку он возвращает будущий объект , который вы можете ожидать, вызвав его блокирующий .get()метод. На стороне потребителя вы сможете читать отправленные сообщения, перебирая потребителя:

# consumer.py

from kafka3 import KafkaConsumer

consumer = KafkaConsumer("datascience")
for record in consumer:
    message = record.value.decode("utf-8")
    print(f"Got message: {message}")

Конструктор потребителя берет одну или несколько тем, которые могут его заинтересовать.


Естественно, вы едва коснулись возможностей этих мощных брокеров сообщений. Ваша цель в этом разделе состояла в том, чтобы получить краткий обзор и отправную точку на случай, если вы захотите изучить их самостоятельно.

Заключение

Теперь вы хорошо разбираетесь в теории очередей в информатике и знаете их практическое применение , начиная от поиска кратчайшего пути в графе и заканчивая синхронизацией параллельных рабочих процессов и развязкой распределенных систем. Вы способны распознавать проблемы, которые элегантно решают очереди.


Вы можете реализовать очереди FIFO, LIFO и очереди с приоритетом с нуля, используя различные структуры данных в Python, понимая их компромиссы. В то же время вы знаете каждую очередь, встроенную в стандартную библиотеку, включая потокобезопасные очереди, асинхронные очереди и очередь для параллелизма на основе процессов . Вы также знаете о библиотеках, позволяющих интегрировать Python с популярными очередями брокера сообщений в облаке.