Go Pipelines

Мы узнали, как использовать горутины и каналы , теперь давайте посмотрим, как собрать их в параллельные piplines (конвейеры)!

Утечка горутины

Вот функция, которая отправляет числа в указанном диапазоне в канал:

func rangeGen(start, stop int) <-chan int {
    out := make(chan int)
    go func() {
        for i := start; i < stop; i++ {
            out <- i
        }
        close(out)
    }()
    return out
}

Кажется, все работает нормально:

func main() {
    generated := rangeGen(41, 46)
    for val := range generated {
        fmt.Println(val)
    }
}

Давайте посмотрим, что произойдет, если мы выйдем из цикла раньше времени:

func main() {
    generated := rangeGen(41, 46)
    for val := range generated {
        fmt.Println(val)
        if val == 42 {
            break
        }
    }
}

На первый взгляд, все еще работает правильно. Но не совсем — rangeGen()горутина застряла:

func rangeGen(start, stop int) <-chan int {
    out := make(chan int)
    go func() {
        for i := start; i < stop; i++ {    // (1)
            out <- i                       // (2)
        }
        close(out)
    }()
    return out
}

Так как main()прерывает свой цикл на номере 42 и прекращает чтение из generatedканала, цикл внутри rangeGen()➊ не завершился. Он был навсегда заблокирован при попытке отправить номер 43 в outканал ➋. Горутина застряла. outКанал не закрылся, поэтому если бы другие горутины зависели от него, они бы тоже застряли.

В этом случае это не проблема: при main()выходе среда выполнения завершит все остальные горутины. Но если main()продолжить работу и вызывать их rangeGen()повторно, то утекшие горутины будут накапливаться. Это проблематично: горутины легковесны, но не полностью «бесплатны». В конце концов, у вас может закончиться память (сборщик мусора не собирает горутины).

Похоже, нам нужен способ завершить горутину раньше времени.

Отменить канал

Сначала создадим отдельный канал отмены , по которому main()будет передаваться сигнал rangeGen()о выходе:

func main() {
    cancel := make(chan struct{})    // (1)defer close(cancel)              // (2)

    generated := rangeGen(cancel, 41, 46)    // (3)for val := range generated {
        fmt.Println(val)
        if val == 42 {
            break
        }
    }
}

Мы создаем cancel канал ➊ и сразу же настраиваем отложенный close(cancel)➋. Это обычная практика, чтобы избежать отслеживания каждого места в коде, где канал должен быть закрыт. deferгарантирует, что канал будет закрыт при выходе из функции, поэтому вам не придется об этом беспокоиться.

Далее мы передаем cancelканал в goroutine ➌. Теперь, когда канал закрывается, goroutine должна это обнаружить и выйти. В идеале вы бы добавили такую ​​проверку:

func rangeGen(cancel <-chan struct{}, start, stop int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for i := start; i < stop; i++ {
            out <- i
            if <-cancel == struct{}{} {    // (1)return
            }
        }
    }()
    return out
}

Если cancelзакрыто, проверка ➊ пройдет (закрытый канал всегда возвращает нулевое значение, помните?), и горутина выйдет. Однако если cancelне закрыто, горутина заблокируется и не перейдет к следующей итерации цикла.

Нам нужен другой, неблокирующий подход:

  • Если cancelзакрыто, выйти из горутины;
  • В противном случае отправьте следующее значение в out.

В Go для этого есть оператор select :

func rangeGen(cancel <-chan struct{}, start, stop int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for i := start; i < stop; i++ {
            select {
            case out <- i:    // (1)case <-cancel:    // (2)return
            }
        }
    }()
    return out
}

selectчем-то похож на switch, но специально разработан для каналов. Вот что он делает:

  • Проверяет, какие дела не заблокированы.
  • Если готовы несколько дел, случайным образом выбирается одно из них для выполнения.
  • Если все дела заблокированы, ждет, пока одно из них не будет готово.

В нашем случае, пока cancelоткрыт, его case ➋ заблокирован (вы не можете читать из канала, если никто не пишет в него). Однако case out <- i➊ разблокирован, поскольку main()читает из out. Таким образом, selectбудет выполняться out <- iв каждой итерации цикла.

Затем main()достигнет числа 42 и прекратит чтение из out. После этого оба selectслучая будут заблокированы, а горутина (временно) зависнет.

Наконец, main()выполнит deferred close(cancel), который разблокирует второй selectслучай ➋, и goroutine завершит работу. outКанал также закроется, благодаря defer.

Если main()решить не останавливаться на 42 и продолжить считывать все значения, подход отмены канала все равно будет работать правильно:

func main() {
    cancel := make(chan struct{})
    defer close(cancel)

    generated := rangeGen(cancel, 41, 46)
    for val := range generated {
        fmt.Println(val)
    }
}

Здесь, rangeGen()закончится до main() вызовоа close(cancel). Что совершенно нормально.

Таким образом, благодаря каналу отмены и оператору select rangeGen() горутина завершит работу правильно, независимо от того, что произойдет в main(). Больше никаких утечек горутин!

Отмена или готово

Канал отмены похож на канал «Готово», который мы рассмотрели в предыдущей главе.

Готовый канал:

// Goroutine B receives a channel to signal
// when it has finished its work.
func b(done chan<- struct{}) {
    // do work...
    done <- struct{}{}
}

func a() {
    done := make(chan struct{})
    go b(done)
    // Goroutine A waits for B to finish its work.
    <-done
}

Отменить канал:

// Goroutine B receives a channel
// to get a cancel signal.
func b(cancel <-chan struct{}) {
    // do work...select {
    case <-cancel:
        return
    }
}

func a() {
    cancel := make(chan struct{})
    go b(cancel)
    // Goroutine A signals to B// that it is time to stop.close(cancel)
}

На практике и отмена, и выполненный каналы часто называются "done", так что не удивляйтесь. В книге я буду использовать "cancel" для отмены и "done" для завершения, чтобы избежать путаницы.

Объединение каналов (последовательно)

Иногда несколько независимых функций отправляют результаты в свои каналы. Но удобнее работать с одним каналом результата. Поэтому нужно объединить выходные каналы этих функций в один канал.

Функция rangeGen()отправляет числа в указанном диапазоне в канал:

func rangeGen(start, stop int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for i := start; i < stop; i++ {
            time.Sleep(50 * time.Millisecond)
            out <- i
        }
    }()
    return out
}
Для простоты мы будем работать с неотменяемыми горутинами в этом и следующих шагах. Вы уже знаете, как превратить любую неотменяемую горутину в отменяемую (добавив канал отмены).

Давайте выполним команду rangeGen()дважды, объединим выходные каналы и выведем результаты:

func main() {
    in1 := rangeGen(11, 15)
    in2 := rangeGen(21, 25)

    start := time.Now()
    merged := merge(in1, in2)
    for val := range merged {
        fmt.Print(val, " ")
    }
    fmt.Println()
    fmt.Println("Took", time.Since(start))
}

Теперь нам осталось только реализовать merge()функцию.

Вот первая идея, которая приходит в голову. Пройтись по первому каналу, затем по второму и отправить результаты в объединенный канал:

func merge(in1, in2 <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for val := range in1 {
            out <- val
        }
        for val := range in2 {
            out <- val
        }
    }()
    return out
}

Однако эта реализация не поддерживает параллелизм. Пока merge()читает результаты из первой rangeGen()горутины, вторая rangeGen()горутина блокируется — никто не готов читать из ее выходного канала. Вот почему потребовалось 350 мс вместо ожидаемых 200 мс (8 значений * 50 мс / 2 горутины = 200 мс).

Нам нужен другой подход.

Объединение каналов (одновременно)

Чтобы считывать входные каналы независимо, давайте запустим две горутины:

func merge(in1, in2 <-chan int) <-chan int {
    var wg sync.WaitGroup
    wg.Add(2)

    out := make(chan int)

    // The first goroutine reads from in1 to out.go func() {
        defer wg.Done()
        for val := range in1 {
            out <- val
        }
    }()

    // The second goroutine reads from in2 to out.go func() {
        defer wg.Done()
        for val := range in2 {
            out <- val
        }
    }()

    // Wait until both input channels are exhausted,// then close the output channel.go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

Благодаря merge()обработке выходных каналов двух rangeGen()одновременно, main()занимает 200 мс, как мы и ожидали. Здорово!

Объединение каналов (select)

Вы можете обойтись одной горутиной, сохранив производительность — используя selectоператор. Обратите внимание на его полезное свойство:

Если разблокировано несколько ветвей, он случайным образом выбирает и выполняет одну из них.

Таким образом, выбор одной ветви из , in1а другой из , in2должен быть почти таким же быстрым, как использование двух независимых горутин:

func merge(in1, in2 <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for {
            select {
            case out <- <-in1:
            case out <- <-in2:
            }
        }
    }()
    return out
}

Однако на практике это выглядит примерно так:

21 11 12 22 13 23 24 14 0 0 0 0 0 0 0 0 0 0... and so on

Эта реализация продолжает выбирать значения из входных каналов даже после их закрытия — бесконечно.

Вот идея, как это исправить:

  • Выбирать значения in1только из того, если он открыт.
  • Выбирать значения in2только из того, если он открыт.
  • Выйти из цикла, если оба in1и in2замкнуты.

Select может справиться с этим благодаря свойству каналов nil, которое обсуждалось в предыдущей главе:

Чтение из нулевого канала блокирует горутину навсегда.

Если вы установите in1значение nil после его закрытия, select прекратит чтение из него (так как он игнорирует заблокированные ветви). То же самое касается in2:

func merge(in1, in2 <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for in1 != nil || in2 != nil {
            select {
            case val1, ok := <-in1:
                if ok {
                    out <- val1
                } else {
                    in1 = nil
                }

            case val2, ok := <-in2:
                if ok {
                    out <- val2
                } else {
                    in2 = nil
                }
            }
        }
    }()
    return out
}

Теперь каждая ветвь select отключается после закрытия соответствующего ей канала. Когда оба канала закрыты, цикл for останавливается.

Работает как часы!

Помните, мы говорили, что нулевые каналы полезны в некоторых особых случаях? Это один из них.

Pipeline

Pipeline — это последовательность операций, где каждый шаг принимает входные данные, обрабатывает их определенным образом и выводит их. Вход и выход каждой операции — это канал.

На самом деле, мы строили конвейеры в течение последних трех глав. Давайте закрепим эту концепцию.

Типичный конвейер выглядит так:

  • Читатель : считывает входные данные из файла, базы данных или сети.
  • N процессоров : преобразуют, фильтруют, агрегируют или обогащают данные с использованием внешних источников.
  • Writer : записывает обработанные данные в файл, базу данных или сеть.

Может быть любое количество стадий обработки: например, сначала фильтр, затем преобразование, затем агрегация. На каждой стадии может быть несколько параллельных процессоров, но часто есть только один читатель и один писатель.

Рассмотрим 5-ступенчатый конвейер:

  • rangeGen()генерирует числа в заданном диапазоне (считыватель).
  • takeLucky()выбирает «счастливые» числа (процессор).
  • merge()объединяет независимые каналы (процессор).
  • sum()суммирует числа (процессор).
  • printTotal()печатает результат (писатель).
┌─────────────┐
│   rangeGen  │
└─────────────┘
       │
  readerChan─┬────────┬──────────────┬──────────────┐
┌─────────────┐┌─────────────┐┌─────────────┐┌─────────────┐
│  takeLucky  ││  takeLucky  ││  takeLucky  ││  takeLucky  │
└─────────────┘└─────────────┘└─────────────┘└─────────────┘
       │               │              │              │
 luckyChans[0]   luckyChans[1]  luckyChans[2]  luckyChans[3]
       │               │              │              │
┌──────────────────────────────────────────────────────────┐
│                           merge                          │
└──────────────────────────────────────────────────────────┘
       │
   mergedChan
       │
┌─────────────┐
│     sum     │
└─────────────┘
       │
   totalChan
       │
┌─────────────┐
│ printTotal  │
└─────────────┘

В коде:

// Total represents the count
// and the sum of the lucky numbers.
type Total struct {
    count  int
    amount int
}

// rangeGen generates numbers within a given range.
func rangeGen(start, stop int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for i := start; i < stop; i++ {
            out <- i
        }
    }()
    return out
}

// takeLucky selects lucky numbers.
func takeLucky(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for num := range in {
            if num%7 == 0 && num%13 != 0 {
                out <- num
            }
        }
    }()
    return out
}

// sum sums the numbers.
func sum(in <-chan int) <-chan Total {
    out := make(chan Total)
    go func() {
        defer close(out)
        total := Total{}
        for num := range in {
            total.amount += num
            total.count++
        }
        out <- total
    }()
    return out
}

// printTotal prints the result.
func printTotal(in <-chan Total) {
    total := <-in
    fmt.Printf("Total of %d lucky numbers = %d\n", total.count, total.amount)
}

func main() {
    readerChan := rangeGen(1, 1000)
    luckyChans := make([]<-chan int, 4)
    for i := range 4 {
        luckyChans[i] = takeLucky(readerChan)
    }
    mergedChan := merge(luckyChans)
    totalChan := sum(mergedChan)
    printTotal(totalChan)
}

Даже для такой игрушечной задачи конвейер удобнее, чем одна большая функция:

  • Каждый этап выполняет одну задачу, что упрощает понимание кода.
  • Вы можете добавлять или удалять этапы, не влияя на остальную логику.
  • Вы можете регулировать уровень параллелизма на каждом этапе независимо.
  • Этапы можно повторно использовать в других конвейерах.

Задачи «чтение-обработка-запись» встречаются на практике часто, и конвейеры для них подходят довольно хорошо.

Предотвращение утечек горутин

Утечка горутин — вторая по распространенности проблема в параллельных программах после взаимоблокировок. Go не жалуется на них, поэтому они часто остаются незамеченными.

В этой книге зависшие горутины вызывают ошибку в упражнениях:

ERROR: there are leaked goroutines

Распространенные причины зависания горутин:

  • Вы забыли создать канал отмены и проверить его через select.
  • Канал отмены есть, но горутина застревает внутри выбора (да, действительно!)

Первую причину мы рассмотрели ранее в этой главе, так что давайте рассмотрим вторую. Она часто застает людей врасплох.

Предположим, что есть функция, которая отправляет числа в канал:

func generate(cancel <-chan struct{}) chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for i := 0; ; i++ {
            select {
            case out <- i:
            case <-cancel:
                return
            }
        }
    }()
    return out
}

И функция, изменяющая числа:

func modify(cancel <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer fmt.Println("modify done")    // (1)defer close(out)
        for {
            select {
            case num := <-in:
                out <- num * 2case <-cancel:
                return
            }
        }
    }()
    return out
}

Благодаря ➊ горутина выводит сообщение о завершении.

Вместе эти две функции будут работать бесконечно, поэтому давайте добавим третью, которая считывает первые 10 чисел и останавливается:

func print(in <-chan int) {
    for range 10 {
        <-in
        fmt.Printf(".")
    }
    fmt.Println()
}

Давайте запустим его:

func main() {
    cancel := make(chan struct{})
    c1 := generate(cancel)
    c2 := modify(cancel, c1)
    print(c2)

    close(cancel)
    // Wait some time for the goroutines to finish// after the cancel channel is closed.
    time.Sleep(50 * time.Millisecond)
}
..........
modify done

Кажется, работает. В конце концов, мы везде использовали select + cancel, так что ошибки быть не должно.

Но есть. Чтобы увидеть это, просто добавьте задержку в 10 мс ➊:

func modify(cancel <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer fmt.Println("modify done")
        defer close(out)
        for {
            select {
            case num := <-in:
                time.Sleep(10 * time.Millisecond)    // (1)
                out <- num * 2                       // (2)case <-cancel:
                return
            }
        }
    }()
    return out
}

Текст "modify done" никогда не печатается. Это потому, что modify()goroutine застряла в точке ➋. Когда cancelканал закрылся, он ждал записи в outвыбранную ветку, поэтому select больше не мог помочь.

Есть два способа справиться с этим. Первый — оставить выбранные ветви пустыми:

func modify(cancel <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)

    multiply := func(num int) int {
        time.Sleep(10 * time.Millisecond)
        return num * 2
    }

    go func() {
        defer fmt.Println("modify done")
        defer close(out)
        for num := range in {
            select {
            case out <- multiply(num):
            case <-cancel:
                return
            }
        }
    }()
    return out
}

Какой бы медленной ни multiply()была функция, мы не попадем в ветвь select, поэтому закрытие cancelканала гарантированно вызовет возврат.

Второй способ — использовать вложенные выборки везде, где вы читаете или пишете в канал:

func modify(cancel <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer fmt.Println("modify done")
        defer close(out)
        for {
            select {
            case num, ok := <-in:
                if !ok {
                    return
                }
                time.Sleep(10 * time.Millisecond)
                select {
                case out <- num * 2:
                case <-cancel:
                    return
                }
            case <-cancel:
                return
            }
        }
    }()
    return out
}

Здесь вложенный select защищает нас при записи в out, поэтому ничего не может зависнуть.

Второй пример показывает еще один важный принцип: если вы используете обычный цикл for вместо for-range, всегда проверяйте, закрыт ли входной канал. А еще лучше используйте for-range, когда это возможно, чтобы он проверял за вас.

Параллельное программирование — это сложно. Go предоставляет полезные примитивы, такие как channels и select, но они не панацея. Всегда проверяйте свой код: как отдельные горутины, так и их состав.

Чтобы проверить наличие утечек горутин в тестах, используйте runtime.NumGoroutine()более сложное решение, например goleaks .

Обработка ошибок

Предположим, у нас есть простой конвейер с тремя этапами:

  1. Генерировать числа от 1 до N.
  2. Вычислите «ответы» на эти числа.
  3. Распечатайте числа и ответы.

Стадия «генерации»:

// generate produces numbers from 1 to stop inclusive.
func generate(stop int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for i := range stop {
			out <- i + 1
		}
	}()
	return out
}

Этап «вычисления»:

// Answer represents the result of a calculation.
type Answer struct {
	x, y int
}

// calculate produces answers for the given numbers.
func calculate(in <-chan int) <-chan Answer {
	out := make(chan Answer)
	go func() {
		defer close(out)
		for n := range in {
			out <- fetchAnswer(n)
		}
	}()
	return out
}

Стадия «печати» (в основной программе для простоты):

func main() {
	inputs := generate(5)
	answers := calculate(inputs)
	for ans := range answers {
		fmt.Printf("%d -> %d\n", ans.x, ans.y)
	}
}

Функция fetchAnswerотвечает за получение ответа для заданного числа из удаленного API:

func fetchAnswer(n int) Answer {
	// ...
}

Надеяться, что удаленный сервис всегда будет работать правильно, немного нереалистично. Мы должны учитывать ошибки:

func fetchAnswer(n int) (Answer, error) {
	// ...
}

Но что делать с этими ошибками (если они есть)? Им нет места в calculate.

Как оказалось, у нас есть три варианта.

Возврат при первой ошибке

Если мы не терпим ошибок, то проще всего вернуться из calculateкак только fetchAnswerстолкнемся с ошибкой. Поскольку outканал принимает только Answers, давайте добавим отдельный errcканал с местом для одной ошибки:

// calculate produces answers for the given numbers.
func calculate(in <-chan int) (<-chan Answer, <-chan error) {
	out := make(chan Answer)
	errc := make(chan error, 1)  // (1)
	go func() {
		defer close(out)
		for n := range in {
			ans, err := fetchAnswer(n)
			if err != nil {
				errc <- err      // (2)
				return
			}
			out <- ans
		}
		errc <- nil              // (3)
	}()
	return out, errc
}

Канал ошибок буферизуется с емкостью один ➊. В результате calculateвыполнения он будет содержать либо фактическую ошибку ➋, либо ноль ➌, в зависимости от результатов удаленного вызова в fetchAnswer.

Поскольку errcгарантированно содержит значение (ошибку или ноль), мы можем прочитать его на следующем шаге конвейера без использования select:

func main() {
	inputs := generate(5)
	answers, errs := calculate(inputs)

	for ans := range answers {
		fmt.Printf("%d -> %d\n", ans.x, ans.y)
	}
	if err := <-errs; err != nil {
		fmt.Println("error:", err)
	}
}

Но что, если мы не хотим останавливать весь конвейер из-за одной ошибки? Введите следующий вариант.

Тип составного результата

Давайте избавимся от канала ошибок и вернем ошибку вместе с ответом. Для этого введем отдельный тип результата :

// Result contains an answer or an error.
type Result struct {
	answer Answer
	err    error
}

Теперь calculateможно отправлять значения результатов в выходной канал:

// calculate produces answers for the given numbers.
func calculate(in <-chan int) <-chan Result {
	out := make(chan Result)
	go func() {
		defer close(out)
		for n := range in {
			ans, err := fetchAnswer(n)
			out <- Result{ans, err}
		}
	}()
	return out
}

И следующий шаг конвейера может обрабатывать эти результаты так, как ему нужно:

func main() {
	inputs := generate(5)
	results := calculate(inputs)
	for res := range results {
		if res.err == nil {
			fmt.Printf("%d -> %d\n", res.answer.x, res.answer.y)
		} else {
			fmt.Printf("%d -> error: %s\n", res.answer.x, res.err)
		}
	}
}

Нам не нужно вводить отдельный тип результата для каждого возможного шага конвейера в нашей программе. ResultДостаточно одного обобщенного типа:

// Result contains a value or an error.
type Result[T any] struct {
	val T
	err error
}

func (r Result[T]) OK() bool {
	return r.err == nil
}
func (r Result[T]) Val() T {
	return r.val
}
func (r Result[T]) Err() error {
	return r.err
}
// calculate produces answers for the given numbers.
func calculate(in <-chan int) <-chan Result[Answer] {
	// ...
}

func main() {
	inputs := generate(5)
	results := calculate(inputs)
	for res := range results {
		if res.OK() {
			fmt.Printf("✓ %v\n", res.Val())
		} else {
			fmt.Printf("✗ %v\n", res.Err())
		}
	}
}

Собирайте ошибки отдельно

Допустим, мы не хотим беспокоиться об обработке ошибок на отдельных этапах конвейера. Нам нужен единый сборщик ошибок для всего конвейера. Для простоты он будет просто регистрировать все ошибки:

// collectErrors prints all incoming errors.
func collectErrors(in <-chan error) <-chan struct{} {
	done := make(chan struct{})
	go func() {
		defer close(done)
		for err := range in {
			fmt.Printf("error: %s\n", err)
		}
	}()
	return done
}

Поскольку для всех этапов конвейера будет один канал ошибок, мы создадим его mainи передадим на каждый из этапов конвейера:

func main() {
	errc := make(chan error)
	done := collectErrors(errc)

	inputs := generate(5, errc)
	answers := calculate(inputs, errc)

	for ans := range answers {
		fmt.Printf("%d -> %d\n", ans.x, ans.y)
	}

	close(errc)
	<-done
}

На каждом этапе конвейера мы будем отправлять любые обнаруженные ошибки в канал ошибок:

// calculate produces answers for the given numbers.
func calculate(in <-chan int, errc chan<- error) <-chan Answer {
	out := make(chan Answer)
	go func() {
		defer close(out)
		for n := range in {
			ans, err := fetchAnswer(n)
			if err == nil {
				out <- ans
			} else {
				errc <- err
			}
		}
	}()
	return out
}

Работает как по маслу. Но есть одно предостережение: поскольку ошибки больше не привязаны к ответам, мы не знаем, какие числа привели к сбою удаленного API. Конечно, мы можем добавить необходимую информацию в текст ошибки или даже создать более богатый тип ошибки, так что это, вероятно, не такая уж большая проблема.

Кроме того, сборщик ошибок должен быть достаточно быстрым, чтобы не замедлять (или даже не блокировать) нормальный поток конвейера в случае случайных ошибок. Мы можем добавить буфер к каналу ошибок и использовать select, просто чтобы быть уверенными:

func main() {
    // A buffered channel to queue up to 100 errors.
	errc := make(chan error, 100)
	// ...
}

// calculate produces answers for the given numbers.
func calculate(in <-chan int, errc chan<- error) <-chan Answer {
	out := make(chan Answer)
	go func() {
		defer close(out)
		for n := range in {
			ans, err := fetchAnswer(n)
			if err == nil {
				out <- ans
			} else {
				select {
				case errc <- err:
				default:
					// If errc is full, drop the error.
				}
			}
		}
	}()
	return out
}

Такой подход встречается довольно редко. На практике чаще используется тип результата.

Заключение

Pipelines — одно из самых распространенных применений параллелизма в реальных программах. Теперь вы знаете, как:

  • Объединить Pipelines из самостоятельных блоков.
  • Разделение и объединение потоков данных.
  • Отменить этапы конвейера.
  • Предотвращайте утечки горутин.
  • Обрабатывать ошибки на этапах конвейера.