Kafka Console Producer
Рассматривали ли вы самый простой способ написания и чтения сообщений из Кафки? Если нет, то в этой статье вы узнаете, как использовать Kafka Console Producer для записи записей в Kafka Topic прямо из командной строки. Когда вы не генерируете данные для топиков, создание сообщений из командной строки является отличным способом быстрого тестирования новых пользовательских приложений.
Простые шаги для начала работы с платформой Kafka Console Producer
В этом разделе вы узнаете, как отправлять и получать сообщения из командной строки. Выполните следующие действия, чтобы работать с Kafka Console Producer и создавать сообщения.
Шаг 1. Настройте свой проект
Сначала создайте новый каталог в нужном вам месте, используя следующую команду:
mkdir console-consumer-producer-basic && cd console-consumer-producer-basic
Теперь вам нужно установить файл docker-compose.yml, чтобы получить платформу Confluent. Вы можете просто скопировать приведенный ниже скрипт в файл docker-compose.yml:
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.2.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:6.2.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
Затем запустите платформу Confluent, используя следующую команду:
docker-compose up -d
Шаг 2. Создайте топик Kafka
После запуска сервисов Kafka и Zookeeper на платформе Confluent давайте создадим топик Kafka. Введите следующую команду:
docker-compose exec broker kafka-topics --create --topic orders --bootstrap-server broker:9092
Шаг 3. Запустите Kafka Console Consumer
Теперь вам нужно настроить Kafka Console consumer для чтения полученных записей, отправленных в топик, созданную выше. Итак, продолжайте работать в том же терминале и введите следующую команду, чтобы открыть терминал в контейнере брокера:
docker-compose exec broker bash
Теперь введите следующую команду в этом новом терминале, чтобы запустить Kafka Console Consumer:
kafka-console-consumer
--topic orders
--bootstrap-server broker:9092
Шаг 4. Создайте свои записи с помощью Kafka Console Producer
Теперь, когда ваш Kafka Console Consumer запущен, давайте опубликуем несколько записей с помощью Kafka Console Producer. Итак, откройте новый терминал и введите следующую команду, чтобы открыть еще одну оболочку в контейнере брокера:
docker-compose exec broker bash
В открывшемся новом терминале введите следующую команду, чтобы запустить Kafka Console Producer:
kafka-console-producer
--topic orders
--bootstrap-server broker:9092
Подождите несколько секунд, ваш Kafka Console Producer будет работать без сбоев. Затем введите несколько строк, которые считаются записями, как показано ниже:
hevo is a no code data pipeline
Отправьте все записи и проверьте окно consumer. Вы увидите тот же результат. Получив все записи от Kafka Console Producer, вы можете нажать клавиши Ctrl+C , чтобы остановить consumer.
Шаг 5. Отправьте новые записи из Kafka Console Producer
Вы можете заметить, что, поскольку Kafka Consumer уже запущен, вы легко получаете входящие записи. Однако некоторые записи были опубликованы до запуска Kafka Consumer. Чтобы опубликовать все эти записи, вы также можете использовать команду –from-beginning.
Итак, вернитесь в Kafka Console Producer и отправьте несколько записей, как показано ниже:
you are learning
to stream
all records
using Kafka Console Producer
Шаг 6: Запустите новый consumer
После отправки этих записей снова запустите Kafka Consumer и введите следующую команду:
kafka-console-consumer
--topic orders
--bootstrap-server broker:9092
--from-beginning
Подождите несколько секунд, пока запустится Kafka Consumer. Будет отображен следующий вывод:
hevo
is
a
no code
data pipeline
you are learning
to stream
all records
using Kafka Console Producer
После того, как вы получили все записи, вы можете закрыть этот consumer терминал с помощью клавиш Ctrl+C.
Шаг 7. Создание записей с парами ключ-значение
Если вы работали с Kafka, возможно, вы знаете, что Kafka работает с парами «ключ-значение». На предыдущих шагах вы только что отправили записи, имеющие значения. Следовательно, для всех этих записей ключи будут иметь значение null . Давайте посмотрим, как можно ввести несколько действительных ключей. Прежде чем начать, обязательно закройте предыдущий запущенный Kafka Console Producer с помощью клавиш Ctrl+C.
Теперь запустите новый producer консоли Kafka, используя следующую команду:
kafka-console-producer
--topic orders
--bootstrap-server broker:9092
--property parse.key=true
--property key.separator=":"
После запуска Kafka Console Producer введите следующие пары «ключ-значение»:
key1:programming languages
k1:python
k2:java
Шаг 8. Запустите consumer для отображения пар ключ-значение.
После отправки вышеуказанных записей запустите нового consumer консоли Kafka, используя следующую команду:
kafka-console-consumer
--topic orders
--bootstrap-server broker:9092
--from-beginning
--property print.key=true
--property key.separator=":"
Подождите несколько секунд, ваш Consumer запустится и отобразит следующий вывод:
null:hevo
null:is
null:a
null:no code
null:data pipeline
null:you are learning
null:to stream
null:all records
null:using Kafka Console Producer
key1:programming languages
k1:python
k2:java
Вы можете заметить, что для записей, которые были введены без ключей, ключи имеют значение null. Вы также заметили, что отображаются все записи с самого начала. Это было достигнуто с помощью команды –from-beginning.
Отличная работа! Вы получили базовое представление о том, как использовать Kafka Console Producer и Consumer в своих целях. Чтобы закрыть докер, вы можете использовать команду docker-compose down .
Как работает Kafka Console Producer?
В системе Kafka производитель отправляет данные в кластер Kafka , ориентированные на конкретную тему. С другой стороны, consumer подписываются на интересующую их тему и обрабатывают сообщения независимо от других consumer. Хотя производителями обычно являются приложения, утилита Kafka Console Producer позволяет пользователям вручную отправлять данные в тему через интерфейс командной строки.
Создание Kafka Console Consumer
Используя консольный интерфейс Kafka, в этом разделе блога мы подробно узнаем, как создать Kafka Consumer с помощью консольного интерфейса. Чтобы создать producer и consumer Kafka, необходимо использовать « bin/kafka-console-producer.sh » и « bin/kafka-console-consumer.sh », присутствующие в каталоге Kafka. Следуйте инструкциям, чтобы узнать, как:
Шаг 1. Начнем с запуска Zookeeper и Kafka Cluster.
Сначала перейдите в корень каталога Kafka и выполните следующую команду, каждую из них в отдельных терминалах, чтобы запустить Zookeeper и Kafka Cluster соответственно.
$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties
Альтернативно вы также можете запустить Apache Kafka с помощью KRaft. Ниже приведены шаги по запуску Kafka с использованием KRaft:
Начните с создания UUID кластера.
$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
Затем отформатируйте каталоги журналов.
$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
И запустите сервер Kafka.
$ bin/kafka-server-start.sh config/kraft/server.properties
Теперь вы будете готовы использовать среду Kafka после успешного запуска сервера.
Шаг 2: Теперь создайте тему Kafka.
Запустите приведенную ниже команду, чтобы создать тему с именем « sampleTopic ».
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sampleTopic
Шаг 3. Теперь пришло время создать producer консоли Kafka.
Запустите приведенную ниже команду. Команда подаст сигнал о запуске Kafka Producer, написав в sampleTopic.
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sampleTopic
Шаг 4. Создайте consumer консоли Kafka.
Запустите приведенную ниже команду. Команда подаст сигнал о запуске Kafka Producer, подписанного на sampleTopic.
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sampleTopic --from-beginning
Шаг 5: Наконец, отправьте сообщения.
Теперь вы можете начать отправлять сообщения от producer. Как только вы начнете рассылать сообщения, consumer начнет получать сообщения через Kafka Topic.
Некоторые ошибки, которых следует избегать при использовании kafka-console-producer.sh
команды:
- По умолчанию сообщения отправляются с
null
ключом (возможны альтернативные варианты). - Если целевой топик еще не существует, Kafka может автоматически создать ее при определенных условиях.
- В частности, если вы укажете несуществующее имя топика, новый топик с этим именем будет создан с использованием количества разделов и коэффициента репликации по умолчанию, настроенных в кластере Kafka.
Как можно генерировать сообщения из файла с помощью интерфейса командной строки Kafka Console Producer?
Давайте возьмем пример файла kafka-console.txt
(убедитесь, что каждое сообщение находится на новой строке).
This is a test
This is a test
Формировать сообщения в тему из файла (см. конец команды).
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first_topic < kafka-console.txt
Варианты использования Kafka Console Producer
1. Тестирование
В этом примере предположим, что вы развернули кластер Apache и хотите использовать Avro для сериализации данных. Как убедиться, что ваш Avro настроен успешно?
Начнем с отправки сообщения с помощью Kafka Console Avro Producer.
$ kafka-avro-console-producer \
--bootstrap-server localhost:9092 \
--topic example-topic \
--property value.schema='{"type":"record","name":"random_record","fields":[{"name":"hello","type":"string"}]}'
>{"hello": "world"}
>^C
Теперь, используя consumer Avro консоли Kafka, мы можем проверить, дошло ли сообщение.
$ kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--topic example-topic \
--property value.schema='{"type":"record","name":"random_record","fields":[{"name":"hello","type":"string"}]}'
--from-beginning
{"hello": "world"}
Processed a total of 1 messages
2. Данные для заполнения
В этом примере давайте предположим, что у вас есть портфель заказов на определенный продукт, который должен был быть отправлен в тему под названием «Заказы». Однако до consumer топика эта партия заказов не дошла. Теперь у вас есть CSV-файл с именем backorder.csv, который выглядит следующим образом:
order_id,product_id,product_name,client_id,client_name 201,1242,54” TV,540,Michael Cassio 202,3245,Vacuum,354,Jan Zizka 203,3245,Vacuum,203,Carlos Castillo Armas 204,9812,Remote control,540,Michael Cassio ......
Этот файл состоит из тысяч строк. Внести эти резервы в консольный producer невозможно вручную. Producer консоли Kafka решает эту проблему, легко загружая данные из файла. Сначала начните с подготовки данных и удаления заголовка:
$ tail -n +2 backorder.csv > prepared_backorder.csv
Загрузите данные в producer консоли, используя следующую команду:
$ kafka-console-producer \
--topic example-topic \
--bootstrap-server localhost:9092 \
< prepared_backorder.csv
Ваша запись в журнале теперь успешна.
Основные стратегии, которые должны знать разработчики Kafka при обработке данных на платформе Kafka Console Producer
Многие компании из списка Fortune 500 используют Apache Kafka в качестве платформы потоковой передачи событий. Kafka имеет множество функций, которые делают его фактическим стандартом для платформ потоковой передачи событий. В этой части вы узнаете о некоторых наиболее важных стратегиях, которые следует помнить при работе с Kafka Console Producer.
- Понимание доставки, подтверждения и долговечности сообщений
- Изучите Sticky Partitioner в API Producer.
- Освойте инструменты командной строки
1) Понимание доставки, подтверждения и долговечности сообщений.
Kafka Producer имеет параметр конфигурации acks для обеспечения надежности данных. Параметр acks определяет, сколько подтверждений должен получить producer, прежде чем запись будет считаться доставленной брокеру. Предлагаются следующие возможности:
- none: когда producer передает записи брокеру, он считает, что они доставлены эффективно. Это можно представить как стратегию «выстрелил и забыл».
- one: Producer ожидает, пока ведущий брокер подтвердит, что запись была записана в его журнал.
- all: Producer ожидает подтверждения от ведущего брокера и последующих брокеров об успешной записи записи в их журналы.
Ведущий брокер не будет пытаться добавить запись в свой журнал, если количество синхронизируемых реплик меньше заранее определенного количества. Producer вынужден повторить попытку записи, поскольку лидер вызывает исключение NotEnoughReplicasException или NotEnoughReplicasAfterAppendException . Поскольку рассинхронизация реплик с лидером — это нехорошо, Producer будет продолжать повторять попытки и отправлять записи, пока не истечет время ожидания доставки. Вы можете повысить надежность своих данных, настроив min.insync.replicas и подтверждения Producer для совместной работы таким образом.
2) Изучите Sticky Partitioner в API Producer.
За последние годы API-интерфейсы Kafka Producer и Consumer представили несколько новых функций, о которых должен знать каждый разработчик Kafka. Вместо использования метода циклического перебора для каждой записи Sticky Partitioner размещает записи в одном и том же разделе до тех пор, пока пакет не будет отправлен. После доставки пакета Sticky Partitioner увеличивает размер раздела, который будет использоваться для следующего пакета.
Вы будете отправлять меньше запросов на производство, если будете использовать один и тот же раздел до тех пор, пока партия не заполнится или не будет завершена иным образом. Это помогает снизить нагрузку на очередь запросов и снижает задержку системы . Стоит отметить, что Sticky Partitioner по-прежнему обеспечивает равномерное распределение записей. Поскольку Partitioner распределяет пакет по каждому разделу, равномерное распределение происходит с течением времени. Это похоже на циклический алгоритм « по пакету » или стратегию « в конечном счете даже ».
3) Освойте инструменты командной строки
Каталог bin в двоичной установке Apache Kafka содержит различные утилиты. Помимо Kafka Console Producer, описанного в этой статье, вы должны быть знакомы с командами console-consumer, dump-log и другими командами в этом каталоге.
- Consumer консоли Kafka: вы можете использовать записи из топика Kafka прямо из командной строки с помощью consumer консоли Kafka. При разработке или отладке немедленный запуск consumer может оказаться весьма полезным. Просто выполните следующую команду, чтобы убедиться, что ваше приложение-Producer доставляет сообщения consumer консоли Kafka:
kafka-console-consumer --topic
--bootstrap-server <broker-host:port>
- Журнал дампа: при работе с Kafka вам может потребоваться время от времени вручную анализировать основные журналы топика. Команда kafka-dump-log — ваш помощник независимо от того, интересуетесь ли вы просто внутренними компонентами Kafka или вам нужно устранить проблему и проверить содержимое. Вот команда для просмотра журнала примера топика:
kafka-dump-log
--print-data-log
--files ./var/lib/kafka/data/example-0/00000000000000000000.log
Kafka предоставляет различные другие функции и возможности для своего producer и consumer консоли Kafka.
Заключение
Эта статья помогла вам разобраться в Kafka Console Producer. В этой статье вы узнали о некоторых хороших стратегиях и возможностях, которые Kafka предлагает своим разработчикам для Kafka Console Producer и Consumer.