Использование пользовательских заголовков в Kafka

Apache Kafka — это распределенное хранилище событий с открытым исходным кодом и отказоустойчивая система обработки потоков. Kafka — это, по сути, платформа потоковой передачи событий, где клиенты могут публиковать поток событий и подписываться на него. Как правило, приложения-производители публикуют события в Kafka, в то время как потребители подписываются на эти события, реализуя тем самым модель издатель-подписчик. В этом уроке мы узнаем, как добавлять собственные заголовки в сообщение Kafka с помощью производителя Kafka.

Настройка

Kafka предоставляет простую в использовании библиотеку Java, которую мы можем использовать для создания клиентов-производителей Kafka (производителей) и клиентов-потребителей (потребителей).

Зависимости

Для начала давайте добавим зависимость Maven библиотеки Kafka Clients Java в файл pom.xml нашего проекта :

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.4.0</version>
</dependency>

Инициализация соединения

В руководстве предполагается, что в нашей локальной системе работает кластер Kafka. Дополнительно нам необходимо создать тему и установить соединение с кластером Kafka.


Во-первых, давайте начнем с создания темы Kafka в нашем кластере. Мы можем создать тему « baeldung ».

Во-вторых, давайте создадим новый экземпляр Properties с минимальной конфигурацией, необходимой для подключения производителя к нашему локальному брокеру:

Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

Наконец, давайте создадим экземпляр KafkaProducer , который мы будем использовать для публикации сообщений:

KafkaProducer <String, String> producer = new KafkaProducer<>(producerProperties);

Конструктор класса KafkaProducer принимает объект Properties (или Map) со свойством bootstrap.servers и возвращает экземпляр KafkaProducer.


Аналогичным образом давайте создадим экземпляр KafkaConsumer , который будем использовать для получения сообщений:

Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerGroup1");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);

Мы будем использовать эти экземпляры производителя и потребителя для демонстрации всех наших примеров кодирования.


Теперь, когда у нас настроены все необходимые зависимости и соединения, мы можем написать простое приложение для добавления пользовательских заголовков в сообщение Kafka.

Публикация сообщений с настраиваемыми заголовками

Поддержка пользовательских заголовков в сообщениях Kafka была добавлена ​​в Kafka версии 0.11.0.0 . Чтобы создать сообщение Kafka (Record), мы создаем экземпляр ProducerRecord<K,V> . ProducerRecord в основном определяет значение сообщения и тему , в которой сообщение должно быть опубликовано, а также другие метаданные.


Класс ProducerRecord предоставляет различные конструкторы для добавления пользовательских заголовков в сообщение Kafka. Давайте посмотрим на пару конструкторов, которые мы можем использовать:

  • ProducerRecord (строковая тема, целочисленный раздел, ключ K, значение V, заголовки Iterable<Header>)
  • ProducerRecord (строковая тема, целочисленный раздел, длинная временная метка, ключ K, значение V, заголовки Iterable<Header>)

Оба конструктора класса ProducerRecord принимают пользовательские заголовки в форме типа Iterable <Header> .


Чтобы понять это, давайте создадим ProducerRecord , который публикует сообщение в теме «baeldung» вместе с некоторыми настраиваемыми заголовками:

List <Header> headers = new ArrayList<>();
headers.add(new RecordHeader("website", "baeldung.com".getBytes()));
ProducerRecord <String, String> record = new ProducerRecord <>("baeldung", null, "message", "Hello World", headers);

producer.send(record);

Здесь мы создаем список типов заголовков для передачи в качестве заголовков конструктору. Каждый заголовок представляет собой экземпляр RecordHeader(ключ String, значение byte[])  , который принимает ключ заголовка как строку , а значение заголовка — как массив байтов.


Аналогичным образом мы можем использовать второй конструктор, который дополнительно принимает временную метку публикуемой записи:

List <Header> headers = new ArrayList<>();
headers.add(new RecordHeader("website", "baeldung.com".getBytes()));
ProducerRecord <String, String> record = new ProducerRecord <>("baeldung", null, System.currentTimeMillis(), "message", "Hello World", headers);

producer.send(record);

На данный момент мы создали сообщение с настраиваемыми заголовками и опубликовали его в Kafka.


Далее давайте реализуем потребительский код для приема сообщения и проверки его настраиваемых заголовков.

Использование сообщений с пользовательскими заголовками

Во-первых, мы подписываем наш потребительский экземпляр на тему Kafka «baeldung» , чтобы получать сообщения от:

consumer.subscribe(Arrays.asList("baeldung"));

Во-вторых, мы используем механизм опроса для поиска новых сообщений от Kafka:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMinutes(1));

Методы KafkaConsumer.poll(Duration period) опрашивают новые сообщения в теме Kafka до времени, указанного параметром Duration . Метод возвращает экземпляр ConsumerRecords , содержащий полученные сообщения. ConsumerRecords — это, по сути, Iterable экземпляр типа ConsumerRecord .


Наконец, мы просматриваем полученные записи и получаем пользовательские заголовки вместе с каждым сообщением:

for (ConsumerRecord<String, String> record : records) {
    System.out.println(record.key());
    System.out.println(record.value());

    Headers consumedHeaders = record.headers();
    for (Header header : consumedHeaders) {
        System.out.println(header.key());
        System.out.println(new String(header.value()));
    }
}

Здесь мы используем различные методы получения из класса ConsumerRecord для получения ключей, значений и пользовательских заголовков сообщений. Метод ConsumerRecord.headers() возвращает экземпляр заголовков , содержащий пользовательские заголовки . Заголовки — это, по сути, Iterable экземпляр типа Header . Затем мы просматриваем каждый экземпляр заголовка и извлекаем ключ и значение заголовка, используя методы Header.key() и Header.value() соответственно.

Вывод

В этой статье мы узнали, как добавлять собственные заголовки в сообщение Kafka. Мы рассмотрели различные доступные конструкторы, которые принимают пользовательские заголовки, с соответствующими реализациями. Затем мы увидели, как можно использовать сообщение с настраиваемыми заголовками и проверять их.