Использование параллельных вычислений в Node.js
Node.js уже давно является популярным инструментом для разработки веб-приложений с интенсивными операциями, связанными с вводом-выводом, благодаря своему интерфейсу цикла обработки событий и асинхронному характеру, который позволяет делегировать операции ввода-вывода, не блокируя основной поток.
Хотя такая конструкция упрощает разработку и позволяет избежать распространенных проблем с потоковой передачей, таких как условия гонки и взаимоблокировки, ее однопоточный характер накладывает существенные ограничения, особенно в эпоху, когда многоядерные процессоры и приложения с высокой вычислительной нагрузкой являются стандартными.
В этой статье, weâÂÂЛл обсуждать преимущества параллельных вычислений и как использовать его возможности в вашей Node.js приложения.
Однопоточный характер Node.js
Node.js работает на движке Google JavaScript версии 8, который выполняет код в одном потоке. По умолчанию все вычисления выполняются последовательно в одном процессе с использованием только одного ядра процессора.
Эта модель хорошо подходит для операций ввода-вывода, но не так эффективна для задач, требующих больших ресурсов процессора. Рассмотрим этот пример:
function calculatePrimes(max) { const primes = []; for (let i = 2; i < max; i++) { let isPrime = true; for (let j = 2; j < i; j++) { if (i % j === 0) { isPrime = false; break; } } if (isPrime) primes.push(i); } return primes; } calculatePrimes(1000000);
Этот код заблокирует цикл обработки событий при его запуске. Это связано с тем, что цикл обработки событий в JavaScript основан на асинхронных операциях, чтобы избежать блокировки основного потока.
Это достигается путем передачи задач, связанных с вводом-выводом (например, сетевых запросов, операций с файловой системой), в операционную систему. В то же время цикл обработки событий продолжает обрабатывать другие задачи, ожидая завершения операций ввода-вывода.
Однако функция calculatePrimes в этом примере является синхронной и выполняется построчно, что означает, что она будет занимать поток до завершения, тем самым предотвращая обработку циклом обработки событий других событий и блокируя основной поток, в результате чего приложение не будет отвечать на запросы в течение этого времени.
Чтобы узнать больше о цикле обработки событий, ознакомьтесь с нашей подробной статьей на эту тему.
Потребность в параллельных вычислениях
Современное оборудование обычно оснащено несколькими процессорными ядрами. Использование этих ядер имеет решающее значение для повышения производительности и быстродействия приложений. Однако однопоточный дизайн Node.JS ограничивает его только одним из этих ядер, что оставляет неиспользованными значительные вычислительные мощности.
В то время как этого единственного потока достаточно для Node.js чтобы эффективно выполнять операции ввода-вывода, он борется с такими задачами, как:
- Обработка больших наборов данных
- Выполнение сложных вычислений
- Обработка операций, требующих больших затрат процессора
- Обслуживание множества одновременных пользователей
- Соответствие строгим требованиям к производительности
Параллельные вычисления позволяют разработчикам использовать многоядерное программное обеспечение для параллельного создания нескольких потоков и управления ими в рамках одного и того же процесса Node.js.
Существует несколько способов добиться параллельных вычислений, но "Рабочие потоки" считаются провозвестниками настоящих параллельных вычислений в Node.js благодаря их детализированному и упрощенному подходу к управлению потоками без сложностей низкоуровневого управления потоками.
Node.js рабочие потоки
Рабочие потоки были введены в Node.js версия 10.5 с единственной целью - разгрузить процессороемкие операции от цикла обработки событий. Они позволяют разработчикам создавать изолированные среды выполнения, которые могут работать одновременно на разных ядрах процессора.
По сути, каждый рабочий поток представляет собой отдельную среду выполнения JavaScript версии 8, которая имеет свое собственное пространство памяти, цикл обработки событий и контекст выполнения и полностью независима от основного потока.
Таким образом, рабочие потоки могут выполнять задачи, требующие больших затрат ресурсов процессора, в своей среде и взаимодействовать только с родительским потоком, используя канал обмена сообщениями, не влияя на обычную функцию родительского потока и не блокируя ее.
Ключевые компоненты рабочих потоков
Node.js предоставляет встроенную библиотеку worker_threads, которая предлагает высокоуровневые методы и функции для эффективной реализации рабочих потоков и управления ими.
Рабочие
Этот метод используется в основном потоке для создания новых рабочих элементов и управления ими. Он принимает путь к рабочему скрипту и необязательный объект для передачи данных рабочему элементу:
// index.js : Main Thread const { Worker } = require('worker_threads'); const worker = new Worker('./worker.js', { workerData: { task: 'exampleTask' } // Pass data to the worker });
Метод также прослушивает сообщения от работника, используя событие message, обрабатывает ошибки и завершает работу с помощью событий error и exit:
worker.on('message', (result) => { console.log('Worker result:', result); }); worker.on('error', (err) => { console.error('Worker error:', err); }); worker.on('exit', (code) => { console.log(`Worker exited with code ${code}`); });
Рабочие данные
Метод workerData обычно используется для передачи конфигурации или начальных параметров, зависящих от конкретной задачи, из основного потока в рабочий поток при создании worker. Эти данные доступны только при инициализации worker:
// index.js : Main Thread const { Worker } = require('worker_threads'); const worker = new Worker('./worker.js', { workerData: { name: 'Node.js', version: 18 } });
Доступ к данным можно получить в рабочем потоке через require('worker_threads').workerData в рабочем скрипте:
// worker.js const { workerData } = require('worker_threads'); console.log('Received data:', workerData); // Output: { name: 'Node.js', version: 18 }
Родительский порт
Этот метод используется в рабочем потоке для связи с основным потоком. Он устанавливает двусторонний канал обмена сообщениями для отправки и получения сообщений. Рабочий поток использует свой метод postMessage для отправки данных обратно в основной поток:
// worker.js const { parentPort } = require('worker_threads'); parentPort.on('message', (message) => { console.log('Received from main thread:', message); // Process the task and send a result const result = message * 2; parentPort.postMessage(result); });
Затем рабочий поток прослушивает сообщения из основного потока, используя событие message:
// main.js const { Worker } = require('worker_threads'); const worker = new Worker('./worker.js'); worker.on('message', (result) => { console.log('Result from worker:', result); // Output: 20 (for input 10) }); worker.postMessage(10); // Send a task to the worker
Обратите внимание, что вместо метода workerData используется метод worker.postMessage. Он используется, когда второй аргумент не передается классу Worker во время инициализации.
Реализация рабочих потоков: пример обработки изображений
Чтобы проиллюстрировать, как вы можете интегрировать рабочие потоки в свои приложения, давайте рассмотрим практический сценарий из реального мира.
Представьте, что вам нужно выполнить различные преобразования изображений, такие как изменение размера, оттенки серого и поворот, для большой коллекции файлов изображений. Без рабочих потоков это привело бы к перегрузке цикла обработки событий и блокировке основного потока.
Используя рабочие потоки, вы можете распределить эти задачи между несколькими ядрами процессора, чтобы гарантировать, что производительность вашего приложения остается стабильной и отзывчивой при эффективной обработке всех изображений.
Предпосылка
Я предполагаю, что у вас уже есть Node.js создание проекта. Если вы этого не сделаете, вы можете ознакомиться с нашим руководством о том, как его настроить.
Как только ваш проект будет готов, скопируйте и запустите следующую команду в своем терминале, чтобы установить sharp, высокопроизводительную библиотеку обработки изображений для Node.js:
npm install sharp
Вот структура нашего проекта:
image-processor-project/ â âÂÂâÂÂâ src/ â âÂÂâÂÂâ imageProcessor.js â âÂÂâÂÂâ imageWorker.js â âÂÂâÂÂâ images/ â âÂÂâÂÂâ image1.jpg â âÂÂâÂÂâ image2.png â âÂÂâÂÂâ image3.jpeg â âÂÂâÂÂâ package.json âÂÂâÂÂâ index.js
Такая структура проекта позволяет отделить проблемы в приложении, особенно коды рабочих потоков, от кода основного потока. Таким образом, мы избегаем двусмысленности и устраняем необходимость в подобных проверках:
if (isMainThread){ // Run main thread code }else{ // Run worker thread code }
После настройки структуры проекта откройте файл imageProcessor.js и укажите код, необходимый для создания рабочих элементов:
const { Worker } = require("worker_threads"); const path = require("path"); class ImageProcessor { constructor(maxConcurrency = require("os").cpus().length) { this.maxConcurrency = maxConcurrency; } async processImages(imagePaths, processingOptions) { return new Promise((resolve, reject) => { const results = []; let activeWorkers = 0; let completedWorkers = 0; const queue = [...imagePaths]; const processNextImage = () => { if (queue.length === 0 || activeWorkers >= this.maxConcurrency) { return; } const imagePath = queue.shift(); activeWorkers++; const worker = new Worker(path.resolve(__dirname, "imageWorker.js"), { workerData: { imagePath: imagePath, options: processingOptions, }, }); worker.on("message", (result) => { results.push(result); activeWorkers--; completedWorkers++; processNextImage(); if (completedWorkers === imagePaths.length) { resolve(results); } }); worker.on("error", (error) => { reject(error); }); }; while (activeWorkers < this.maxConcurrency && queue.length > 0) { processNextImage(); } }); } async processBatch(imagePaths, batchSize) { const results = []; for (let i = 0; i < imagePaths.length; i += batchSize) { const batchPaths = imagePaths.slice(i, i + batchSize); const batchResults = await this.processImages(batchPaths); results.push(...batchResults); } return results; } } module.exports = ImageProcessor;
Этот код определяет класс ImageProcessor, который принимает необязательный аргумент maxConcurrency, который определяет максимальное количество рабочих элементов, которые могут выполняться одновременно. Внутри этого класса есть два метода: processImages и processBatch.
processImages помещает изображения в массив в очередь и создает новый рабочий файл для каждого изображения с помощью скрипта imageWorker.js и передает ImagePath и processingOptions в качестве рабочих данных потоку.
Метод ProcessBatch выполняет итерацию путей к изображениям по частям. Для каждого пакета он вызывает processImages для обработки изображений и ожидает завершения каждого пакета, прежде чем переходить к следующему.
Затем перейдите к файлу imageWorker.js и добавьте следующий код:
const { parentPort, workerData } = require("worker_threads"); const fs = require("fs"); const path = require("path"); const sharp = require("sharp"); async function processImage() { const { imagePath, options } = workerData; try { if (!fs.existsSync(imagePath)) { throw new Error(`Image file not found: ${imagePath}`); } // Read image const inputBuffer = fs.readFileSync(imagePath); let sharpInstance = sharp(inputBuffer); if (options.width || options.height) { sharpInstance = sharpInstance.resize({ width: options.width, height: options.height, fit: options.fit || "cover", }); } if (options.rotation) { sharpInstance = sharpInstance.rotate(options.rotation); } if (options.grayscale) { sharpInstance = sharpInstance.grayscale(); } const processedImage = await sharpInstance.toBuffer(); const outputFilename = `processed_${path.basename(imagePath)}`; const outputPath = path.join(path.dirname(imagePath), outputFilename); // Save processed image await sharp(processedImage).toFile(outputPath); // Send processed image details back to main thread parentPort.postMessage({ originalPath: imagePath, outputPath: outputPath, processedSize: processedImage.length, success: true, }); } catch (error) { parentPort.postMessage({ originalPath: imagePath, error: error.message, success: false, }); } } processImage();
Этот код выполняется в рабочем потоке. После инициализации рабочий процесс немедленно выполняет основную функцию в скрипте: processImage. Эта функция использует библиотеку sharp и данные, отправленные из основного потока через функцию workerData, для преобразования изображения, расположенного по указанному пути. Затем генерируется выходной файл с исходным именем файла с префиксом "processed_".
Когда операция выполнена, результат передается обратно в основной поток с помощью метода parentPort.postMessage, включая:
- Путь к исходному изображению
- Выходной путь
- Размер обрабатываемого изображения (в байтах)
- Статус успеха
// Send processed image details back to main thread parentPort.postMessage({ originalPath: imagePath, outputPath: outputPath, processedSize: processedImage.length, success: true, });
Если возникает ошибка, функция отправляет сообщение об ошибке с подробной информацией и статусом сбоя через postMessage.
Для последнего шага перейдите к файлу index.js и добавьте код основного потока:
const path = require("path"); const ImageProcessor = require("./src/imageProcessor"); const processor = new ImageProcessor(2); // Limit to 2 concurrent workers // Get all image files in the images directory const imagePaths = [ path.resolve(__dirname, "images/image1.png"), path.resolve(__dirname, "images/image2.jpg"), path.resolve(__dirname, "images/image3.jpg"), ]; const processingOptions = { width: 800, height: 600, rotation: 90, grayscale: true, fit: "contain", }; async function processImageFile() { try { const results = await processor.processBatch( imagePaths, imagePaths.length, processingOptions ); console.log("Processing Results:", results); } catch (error) { console.error("Image processing failed:", error); } } processImageFile().catch(console.error);
Здесь создается экземпляр класса ImageProcessor с ограничением параллелизма, равным 2, что означает, что одновременно будут выполняться только два рабочих потока. Основная логика заключена в функции processImageFile, которая вызывает метод processBatch экземпляра ImageProcessor. Этот метод запускает рабочие потоки для одновременной обработки изображений, при этом одновременно выполняется не более двух потоков.
Соображения, касающиеся производительности
Как и любой другой инструмент, параллелизм в Node.js не лишен недостатков, и его реализация требует тщательного рассмотрения.
Управление потоками
Создание рабочих потоков сопряжено с определенными затратами. Поскольку каждый поток создает выделенный экземпляр движка V8, для этого требуется выделение памяти и инициализация, что может повлиять на производительность, если потоки часто создаются и уничтожаются. Чтобы свести к минимуму эти накладные расходы, лучше всего использовать пул потоков или повторно использовать рабочие потоки, когда это возможно.
Сведение к минимуму передачи данных
Передача больших объемов данных между рабочим и основным потоками может привести к значительным накладным расходам. Это связано с тем, что обмен данными осуществляется посредством передачи сообщений, что требует сериализации и десериализации. Эти операции могут быть дорогостоящими с точки зрения вычислений, особенно для больших или сложных объектов.
Чтобы оптимизировать производительность, рассмотрите возможность использования переносимых объектов (таких как ArrayBuffer), минимизации частоты и размера передаваемых данных и эффективного использования структурированного клонирования.
Ремонтопригодность
Как видно из примера с обработкой изображений, код рабочего потока может стать сложным и неоднозначным, что часто приводит к тому, что разработчики называют спагетти-кодом. Это может существенно повлиять на читаемость и удобство сопровождения.
Чтобы избежать этого, рекомендуется сохранять код рабочего потока чистым, хорошо структурированным и тщательно документированным. Это улучшает понимание и облегчает внесение изменений в будущем.
Обработка ошибок
При работе с рабочими потоками важно уделять приоритетное внимание обработке ошибок. Это происходит не только из-за сложности кода, но и из-за сложности распространения ошибок.
В однопоточном приложении необработанные ошибки естественным образом накапливаются в стеке вызовов и приводят к аварийному завершению работы приложения. Однако с рабочими потоками процесс меняется. Ошибки в рабочих потоках напрямую не влияют на основной поток, что может привести к их молчаливому игнорированию, что затрудняет их обнаружение.
Чтобы перехватить рабочие ошибки, вы должны прослушать событие error в рабочем объекте в главном потоке:
// Main thread const worker = new Worker('worker.js'); worker.on('error', (error) => { console.error('Worker encountered an error:', error); });
Другим ограничением обработки ошибок в рабочих потоках является ограниченная трассировка стека. Поскольку трассировка стека ограничена рабочим контекстом, становится трудно отследить основную причину и полностью понять контекст ошибки.
Например, предположим, что вы удалили параметр processingOptions из метода processBatch и не смогли передать его в качестве аргумента методу processImage, как показано ниже:
async processBatch(imagePaths, batchSize) { const results = []; for (let i = 0; i < imagePaths.length; i += batchSize) { const batchPaths = imagePaths.slice(i, i + batchSize); const batchResults = await this.processImages(batchPaths); results.push(...batchResults); } return results; }
В консоли появится следующая ошибка:
Эта ошибка практически не содержит никакой информации, например, о файле или строке кода, в которых возникает ошибка.
Для эффективной отладки рабочих потоков вам необходимо:
- Создание явных механизмов обработки ошибок
- Используйте блоки try-catch в workers
- Внедрить подробные отчеты об ошибках
- Отправляйте подробную информацию об ошибках в основной поток
Вывод
Параллельные вычисления в Node.js позволяют эффективно выполнять задачи, требующие больших затрат ресурсов процессора. Благодаря рабочим потокам разработчики больше не ограничены одним потоком и могут создавать масштабируемые высокопроизводительные приложения, в полной мере использующие возможности современного оборудования.