Kafka Console Producer

Рассматривали ли вы самый простой способ написания и чтения сообщений из Кафки? Если нет, то в этой статье вы узнаете, как использовать Kafka Console Producer для записи записей в Kafka Topic прямо из командной строки. Когда вы не генерируете данные для топиков, создание сообщений из командной строки является отличным способом быстрого тестирования новых пользовательских приложений.

Простые шаги для начала работы с платформой Kafka Console Producer

Производитель консоли Kafka — шаги

В этом разделе вы узнаете, как отправлять и получать сообщения из командной строки. Выполните следующие действия, чтобы работать с Kafka Console Producer и создавать сообщения.

Шаг 1. Настройте свой проект

Сначала создайте новый каталог в нужном вам месте, используя следующую команду:

mkdir console-consumer-producer-basic && cd console-consumer-producer-basic

Теперь вам нужно установить файл docker-compose.yml, чтобы получить платформу Confluent. Вы можете просто скопировать приведенный ниже скрипт в файл docker-compose.yml:

---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.2.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:6.2.1
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

Затем запустите платформу Confluent, используя следующую команду:

docker-compose up -d

Шаг 2. Создайте топик Kafka

После запуска сервисов Kafka и Zookeeper на платформе Confluent давайте создадим топик Kafka. Введите следующую команду:

docker-compose exec broker kafka-topics --create --topic orders --bootstrap-server broker:9092

Шаг 3. Запустите Kafka Console Consumer

Теперь вам нужно настроить Kafka Console consumer для чтения полученных записей, отправленных в топик, созданную выше. Итак, продолжайте работать в том же терминале и введите следующую команду, чтобы открыть терминал в контейнере брокера:

docker-compose exec broker bash

Теперь введите следующую команду в этом новом терминале, чтобы запустить Kafka Console Consumer:

kafka-console-consumer 
  --topic orders 
  --bootstrap-server broker:9092

Шаг 4. Создайте свои записи с помощью Kafka Console Producer

Теперь, когда ваш Kafka Console Consumer запущен, давайте опубликуем несколько записей с помощью Kafka Console Producer. Итак, откройте новый терминал и введите следующую команду, чтобы открыть еще одну оболочку в контейнере брокера:

docker-compose exec broker bash

В открывшемся новом терминале введите следующую команду, чтобы запустить Kafka Console Producer:

kafka-console-producer 
  --topic orders 
  --bootstrap-server broker:9092

Подождите несколько секунд, ваш Kafka Console Producer будет работать без сбоев. Затем введите несколько строк, которые считаются записями, как показано ниже:

hevo
is
a
no code
data pipeline

Отправьте все записи и проверьте окно consumer. Вы увидите тот же результат. Получив все записи от Kafka Console Producer, вы можете нажать клавиши Ctrl+C , чтобы остановить consumer.

Шаг 5. Отправьте новые записи из Kafka Console Producer

Вы можете заметить, что, поскольку Kafka Consumer уже запущен, вы легко получаете входящие записи. Однако некоторые записи были опубликованы до запуска Kafka Consumer. Чтобы опубликовать все эти записи, вы также можете использовать команду –from-beginning.

Итак, вернитесь в Kafka Console Producer и отправьте несколько записей, как показано ниже:

you are learning
to stream
all records
using Kafka Console Producer

Шаг 6: Запустите новый consumer

После отправки этих записей снова запустите Kafka Consumer и введите следующую команду:

kafka-console-consumer 
  --topic orders 
  --bootstrap-server broker:9092 
  --from-beginning

Подождите несколько секунд, пока запустится Kafka Consumer. Будет отображен следующий вывод:

hevo
is
a
no code
data pipeline
you are learning
to stream
all records
using Kafka Console Producer

После того, как вы получили все записи, вы можете закрыть этот consumer терминал с помощью клавиш Ctrl+C.

Шаг 7. Создание записей с парами ключ-значение

Если вы работали с Kafka, возможно, вы знаете, что Kafka работает с парами «ключ-значение». На предыдущих шагах вы только что отправили записи, имеющие значения. Следовательно, для всех этих записей ключи будут иметь значение null . Давайте посмотрим, как можно ввести несколько действительных ключей. Прежде чем начать, обязательно закройте предыдущий запущенный Kafka Console Producer с помощью клавиш Ctrl+C.

Теперь запустите новый producer консоли Kafka, используя следующую команду:

kafka-console-producer 
  --topic orders 
  --bootstrap-server broker:9092 
  --property parse.key=true 
  --property key.separator=":"

После запуска Kafka Console Producer введите следующие пары «ключ-значение»:

key1:programming languages
k1:python
k2:java

Шаг 8. Запустите consumer для отображения пар ключ-значение.

После отправки вышеуказанных записей запустите нового consumer консоли Kafka, используя следующую команду:

kafka-console-consumer 
  --topic orders 
  --bootstrap-server broker:9092 
  --from-beginning 
  --property print.key=true 
  --property key.separator=":"

Подождите несколько секунд, ваш Consumer запустится и отобразит следующий вывод:

null:hevo
null:is
null:a
null:no code
null:data pipeline
null:you are learning
null:to stream
null:all records
null:using Kafka Console Producer
key1:programming languages
k1:python
k2:java

Вы можете заметить, что для записей, которые были введены без ключей, ключи имеют значение null. Вы также заметили, что отображаются все записи с самого начала. Это было достигнуто с помощью команды –from-beginning.

Отличная работа! Вы получили базовое представление о том, как использовать Kafka Console Producer и Consumer в своих целях. Чтобы закрыть докер, вы можете использовать команду docker-compose down .

Как работает Kafka Console Producer?

Производитель консоли Kafka

В системе Kafka производитель отправляет данные в кластер Kafka , ориентированные на конкретную тему. С другой стороны, consumer подписываются на интересующую их тему и обрабатывают сообщения независимо от других consumer. Хотя производителями обычно являются приложения, утилита Kafka Console Producer позволяет пользователям вручную отправлять данные в тему через интерфейс командной строки.

Создание Kafka Console Consumer

Используя консольный интерфейс Kafka, в этом разделе блога мы подробно узнаем, как создать Kafka Consumer с помощью консольного интерфейса. Чтобы создать producer и consumer Kafka, необходимо использовать « bin/kafka-console-producer.sh » и « bin/kafka-console-consumer.sh », присутствующие в каталоге Kafka. Следуйте инструкциям, чтобы узнать, как:

Шаг 1. Начнем с запуска Zookeeper и Kafka Cluster.

Сначала перейдите в корень каталога Kafka и выполните следующую команду, каждую из них в отдельных терминалах, чтобы запустить Zookeeper и Kafka Cluster соответственно.

$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties

Альтернативно вы также можете запустить Apache Kafka с помощью KRaft. Ниже приведены шаги по запуску Kafka с использованием KRaft:

Начните с создания UUID кластера.

$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

Затем отформатируйте каталоги журналов.

$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

И запустите сервер Kafka.

$ bin/kafka-server-start.sh config/kraft/server.properties

Теперь вы будете готовы использовать среду Kafka после успешного запуска сервера.

Шаг 2: Теперь создайте тему Kafka.

Запустите приведенную ниже команду, чтобы создать тему с именем « sampleTopic ».

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sampleTopic

Шаг 3. Теперь пришло время создать producer консоли Kafka.

Запустите приведенную ниже команду. Команда подаст сигнал о запуске Kafka Producer, написав в sampleTopic.

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sampleTopic

Шаг 4. Создайте consumer консоли Kafka.

Запустите приведенную ниже команду. Команда подаст сигнал о запуске Kafka Producer, подписанного на sampleTopic.

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sampleTopic --from-beginning

Шаг 5: Наконец, отправьте сообщения.

Теперь вы можете начать отправлять сообщения от producer. Как только вы начнете рассылать сообщения, consumer начнет получать сообщения через Kafka Topic.

Производитель консоли Kafka: шаг 5

Некоторые ошибки, которых следует избегать при использовании kafka-console-producer.sh команды:

  • По умолчанию сообщения отправляются с null ключом (возможны альтернативные варианты).
  • Если целевой топик еще не существует, Kafka может автоматически создать ее при определенных условиях.
  • В частности, если вы укажете несуществующее имя топика, новый топик с этим именем будет создан с использованием количества разделов и коэффициента репликации по умолчанию, настроенных в кластере Kafka.

Как можно генерировать сообщения из файла с помощью интерфейса командной строки Kafka Console Producer?

Давайте возьмем пример файла kafka-console.txt (убедитесь, что каждое сообщение находится на новой строке).

This is a test
This is a test

Формировать сообщения в тему из файла (см. конец команды).

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first_topic < kafka-console.txt

Варианты использования Kafka Console Producer

1. Тестирование

В этом примере предположим, что вы развернули кластер Apache и хотите использовать Avro для сериализации данных. Как убедиться, что ваш Avro настроен успешно?

Начнем с отправки сообщения с помощью Kafka Console Avro Producer.

$ kafka-avro-console-producer \
  --bootstrap-server localhost:9092 \
  --topic example-topic \
  --property value.schema='{"type":"record","name":"random_record","fields":[{"name":"hello","type":"string"}]}'
>{"hello": "world"}
>^C

Теперь, используя consumer Avro консоли Kafka, мы можем проверить, дошло ли сообщение.

$ kafka-avro-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic example-topic \
  --property value.schema='{"type":"record","name":"random_record","fields":[{"name":"hello","type":"string"}]}'
  --from-beginning
{"hello": "world"}
Processed a total of 1 messages

2. Данные для заполнения

В этом примере давайте предположим, что у вас есть портфель заказов на определенный продукт, который должен был быть отправлен в тему под названием «Заказы». Однако до consumer топика эта партия заказов не дошла. Теперь у вас есть CSV-файл с именем backorder.csv, который выглядит следующим образом:

order_id,product_id,product_name,client_id,client_name
201,1242,54” TV,540,Michael Cassio
202,3245,Vacuum,354,Jan Zizka
203,3245,Vacuum,203,Carlos Castillo Armas
204,9812,Remote control,540,Michael Cassio
......

Этот файл состоит из тысяч строк. Внести эти резервы в консольный producer невозможно вручную. Producer консоли Kafka решает эту проблему, легко загружая данные из файла. Сначала начните с подготовки данных и удаления заголовка:

$ tail -n +2 backorder.csv > prepared_backorder.csv

Загрузите данные в producer консоли, используя следующую команду:

$ kafka-console-producer \
    --topic example-topic \
    --bootstrap-server localhost:9092 \
    < prepared_backorder.csv

Ваша запись в журнале теперь успешна.

Основные стратегии, которые должны знать разработчики Kafka при обработке данных на платформе Kafka Console Producer

Многие компании из списка Fortune 500 используют Apache Kafka в качестве платформы потоковой передачи событий. Kafka имеет множество функций, которые делают его фактическим стандартом для платформ потоковой передачи событий. В этой части вы узнаете о некоторых наиболее важных стратегиях, которые следует помнить при работе с Kafka Console Producer.

  • Понимание доставки, подтверждения и долговечности сообщений
  • Изучите Sticky Partitioner в API Producer.
  • Освойте инструменты командной строки

1) Понимание доставки, подтверждения и долговечности сообщений.

Kafka Console Producer — Благодарности

Kafka Producer имеет параметр конфигурации acks для обеспечения надежности данных. Параметр acks определяет, сколько подтверждений должен получить producer, прежде чем запись будет считаться доставленной брокеру. Предлагаются следующие возможности:

  • none: когда producer передает записи брокеру, он считает, что они доставлены эффективно. Это можно представить как стратегию «выстрелил и забыл».
  • one: Producer ожидает, пока ведущий брокер подтвердит, что запись была записана в его журнал.
  • all: Producer ожидает подтверждения от ведущего брокера и последующих брокеров об успешной записи записи в их журналы.

Ведущий брокер не будет пытаться добавить запись в свой журнал, если количество синхронизируемых реплик меньше заранее определенного количества. Producer вынужден повторить попытку записи, поскольку лидер вызывает исключение NotEnoughReplicasException или NotEnoughReplicasAfterAppendException . Поскольку рассинхронизация реплик с лидером — это нехорошо, Producer будет продолжать повторять попытки и отправлять записи, пока не истечет время ожидания доставки. Вы можете повысить надежность своих данных, настроив min.insync.replicas и подтверждения Producer для совместной работы таким образом.

2) Изучите Sticky Partitioner в API Producer.

Kafka Console Producer — липкий разделитель

За последние годы API-интерфейсы Kafka Producer и Consumer представили несколько новых функций, о которых должен знать каждый разработчик Kafka. Вместо использования метода циклического перебора для каждой записи Sticky Partitioner размещает записи в одном и том же разделе до тех пор, пока пакет не будет отправлен. После доставки пакета Sticky Partitioner увеличивает размер раздела, который будет использоваться для следующего пакета.

Вы будете отправлять меньше запросов на производство, если будете использовать один и тот же раздел до тех пор, пока партия не заполнится или не будет завершена иным образом. Это помогает снизить нагрузку на очередь запросов и снижает задержку системы . Стоит отметить, что Sticky Partitioner по-прежнему обеспечивает равномерное распределение записей. Поскольку Partitioner распределяет пакет по каждому разделу, равномерное распределение происходит с течением времени. Это похоже на циклический алгоритм « по пакету » или стратегию « в конечном счете даже ».

3) Освойте инструменты командной строки

Каталог bin в двоичной установке Apache Kafka содержит различные утилиты. Помимо Kafka Console Producer, описанного в этой статье, вы должны быть знакомы с командами console-consumer, dump-log и другими командами в этом каталоге.

  • Consumer консоли Kafka: вы можете использовать записи из топика Kafka прямо из командной строки с помощью consumer консоли Kafka. При разработке или отладке немедленный запуск consumer может оказаться весьма полезным. Просто выполните следующую команду, чтобы убедиться, что ваше приложение-Producer доставляет сообщения consumer консоли Kafka:
kafka-console-consumer --topic  
                          --bootstrap-server <broker-host:port>
  • Журнал дампа: при работе с Kafka вам может потребоваться время от времени вручную анализировать основные журналы топика. Команда kafka-dump-log — ваш помощник независимо от того, интересуетесь ли вы просто внутренними компонентами Kafka или вам нужно устранить проблему и проверить содержимое. Вот команда для просмотра журнала примера топика:
 kafka-dump-log 
  --print-data-log  
  --files  ./var/lib/kafka/data/example-0/00000000000000000000.log 

Kafka предоставляет различные другие функции и возможности для своего producer и consumer консоли Kafka.

Заключение

Эта статья помогла вам разобраться в Kafka Console Producer. В этой статье вы узнали о некоторых хороших стратегиях и возможностях, которые Kafka предлагает своим разработчикам для Kafka Console Producer и Consumer.