Как использовать Kafka с Go

Apache Kafka — это отказоустойчивая система обмена сообщениями с открытым исходным кодом, предназначенная для эффективной обработки больших объемов данных. В связи с этим он чаще всего используется в больших данных. Довольно частый случай применения этой сисетмы обмена сообщениями встречается в Go, давайте рассмотрим на примере как это работает.

просмотрщик SVG

Применение

После настройки Kafka язык Go может взаимодействовать с сервером Kafka с помощью библиотеки Confluent Kafka Go.

Для импорта библиотеки в программе должен быть следующий скрипт:

import (
    "github.com/confluentinc/confluent-kafka-go/kafka"
)

Producer

Следующим шагом будет создание продюсера. Производитель в Kafka добавляет данные в поток:

producer, err := kafka.NewProducer(&kafka.ConfigMap{
    "bootstrap.servers": "host1:9092,host2:9092",
    "client.id": socket.gethostname(),
    "acks": "all"})

Этот скрипт устанавливает объект-производитель для сервера Kafka. Свойство bootstrap.servers представляет собой список пар хост-порт, разделенных запятыми. Свойство client.id может представлять собой любую уникальную строку, используемую для идентификации конкретного пользователя. В этом случае имя хоста текущего компьютера устанавливается в качестве идентификатора клиента:

del_chan := make(chan kafka.Event, 10000)
err = producer.Produce(&kafka.Message{
    TopicPartition: kafka.TopicPartition{Topic: "educative", Partition: kafka.PartitionAny},
    Value: []byte(value)},
    del_chan,
)

channel_out := <-del_chan
message_report := channel_out.(*kafka.Message)

if message_report.TopicPartition.Error != nil {
    fmt.Printf(message_report.TopicPartition.Error)
} else {
    fmt.Printf("Message delivered")
}

close(del_chan)

Этот скрипт демонстрирует, как отправлять данные в Kafka. Свойство TopicPartition определяет kafka.Message, в какую тему и раздел отправлять сообщение. Свойство Value содержит данные, которые необходимо отправить. В данном случае это переменная value. После доставки сообщения переменная channel_out получит своего рода журнал завершения сообщения. Код, следующий за этой строкой, отображает ошибку, если получено, или сообщение, указывающее, что сообщение было доставлено.

Consumer

Далее необходимо настроить потребительскую сторону. Consumer (Потребитель) получает данные из потоков Kafka и обрабатывает их по мере необходимости:

consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
     "bootstrap.servers":    "host1:9092,host2:9092",
     "group.id":             "foo",
     "auto.offset.reset":    "smallest"})

Как можно видеть, код установки для потребителя и производителя очень похож. Другим свойством для потребителя является собственность auto.offset.reset. Это свойство определяет начальную отправную точку в каждом разделе, с которой начинается потребление данных. В данном случае ему присвоено значение "smallest", что означает, что данные будут потребляться, начиная с самого раннего смещения.

retrieved := consumer.Poll(0)
switch message := retrieved.(type) {
case *kafka.Message:
    fmt.Printf(string(message.value()))
case kafka.Error:
    fmt.Printf("%% Error: %v\n", message)
default:
    fmt.Printf("Ignored %v\n", message)
}

Этот скрипт можно использовать для получения сообщения от Кафки. Здесь используется оператор переключения типа, чтобы определить, произошла ли ошибка, было ли получено сообщение или потребитель был проигнорирован.