Использование пользовательских заголовков в Kafka
Apache Kafka — это распределенное хранилище событий с открытым исходным кодом и отказоустойчивая система обработки потоков. Kafka — это, по сути, платформа потоковой передачи событий, где клиенты могут публиковать поток событий и подписываться на него. Как правило, приложения-производители публикуют события в Kafka, в то время как потребители подписываются на эти события, реализуя тем самым модель издатель-подписчик. В этом уроке мы узнаем, как добавлять собственные заголовки в сообщение Kafka с помощью производителя Kafka.
Настройка
Kafka предоставляет простую в использовании библиотеку Java, которую мы можем использовать для создания клиентов-производителей Kafka (производителей) и клиентов-потребителей (потребителей).
Зависимости
Для начала давайте добавим зависимость Maven библиотеки Kafka Clients Java в файл pom.xml нашего проекта :
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.4.0</version> </dependency>
Инициализация соединения
В руководстве предполагается, что в нашей локальной системе работает кластер Kafka. Дополнительно нам необходимо создать тему и установить соединение с кластером Kafka.
Во-первых, давайте начнем с создания темы Kafka в нашем кластере. Мы можем создать тему « baeldung ».
Во-вторых, давайте создадим новый экземпляр Properties с минимальной конфигурацией, необходимой для подключения производителя к нашему локальному брокеру:
Properties producerProperties = new Properties(); producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Наконец, давайте создадим экземпляр KafkaProducer , который мы будем использовать для публикации сообщений:
KafkaProducer <String, String> producer = new KafkaProducer<>(producerProperties);
Конструктор класса KafkaProducer принимает объект Properties (или Map) со свойством bootstrap.servers и возвращает экземпляр KafkaProducer.
Аналогичным образом давайте создадим экземпляр KafkaConsumer , который будем использовать для получения сообщений:
Properties consumerProperties = new Properties(); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerGroup1"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
Мы будем использовать эти экземпляры производителя и потребителя для демонстрации всех наших примеров кодирования.
Теперь, когда у нас настроены все необходимые зависимости и соединения, мы можем написать простое приложение для добавления пользовательских заголовков в сообщение Kafka.
Публикация сообщений с настраиваемыми заголовками
Поддержка пользовательских заголовков в сообщениях Kafka была добавлена в Kafka версии 0.11.0.0 . Чтобы создать сообщение Kafka (Record), мы создаем экземпляр ProducerRecord<K,V> . ProducerRecord в основном определяет значение сообщения и тему , в которой сообщение должно быть опубликовано, а также другие метаданные.
Класс ProducerRecord предоставляет различные конструкторы для добавления пользовательских заголовков в сообщение Kafka. Давайте посмотрим на пару конструкторов, которые мы можем использовать:
- ProducerRecord (строковая тема, целочисленный раздел, ключ K, значение V, заголовки Iterable<Header>)
- ProducerRecord (строковая тема, целочисленный раздел, длинная временная метка, ключ K, значение V, заголовки Iterable<Header>)
Оба конструктора класса ProducerRecord принимают пользовательские заголовки в форме типа Iterable <Header> .
Чтобы понять это, давайте создадим ProducerRecord , который публикует сообщение в теме «baeldung» вместе с некоторыми настраиваемыми заголовками:
List <Header> headers = new ArrayList<>(); headers.add(new RecordHeader("website", "baeldung.com".getBytes())); ProducerRecord <String, String> record = new ProducerRecord <>("baeldung", null, "message", "Hello World", headers); producer.send(record);
Здесь мы создаем список типов заголовков для передачи в качестве заголовков конструктору. Каждый заголовок представляет собой экземпляр RecordHeader(ключ String, значение byte[]) , который принимает ключ заголовка как строку , а значение заголовка — как массив байтов.
Аналогичным образом мы можем использовать второй конструктор, который дополнительно принимает временную метку публикуемой записи:
List <Header> headers = new ArrayList<>(); headers.add(new RecordHeader("website", "baeldung.com".getBytes())); ProducerRecord <String, String> record = new ProducerRecord <>("baeldung", null, System.currentTimeMillis(), "message", "Hello World", headers); producer.send(record);
На данный момент мы создали сообщение с настраиваемыми заголовками и опубликовали его в Kafka.
Далее давайте реализуем потребительский код для приема сообщения и проверки его настраиваемых заголовков.
Использование сообщений с пользовательскими заголовками
Во-первых, мы подписываем наш потребительский экземпляр на тему Kafka «baeldung» , чтобы получать сообщения от:
consumer.subscribe(Arrays.asList("baeldung"));
Во-вторых, мы используем механизм опроса для поиска новых сообщений от Kafka:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMinutes(1));
Методы KafkaConsumer.poll(Duration period) опрашивают новые сообщения в теме Kafka до времени, указанного параметром Duration . Метод возвращает экземпляр ConsumerRecords , содержащий полученные сообщения. ConsumerRecords — это, по сути, Iterable экземпляр типа ConsumerRecord .
Наконец, мы просматриваем полученные записи и получаем пользовательские заголовки вместе с каждым сообщением:
for (ConsumerRecord<String, String> record : records) { System.out.println(record.key()); System.out.println(record.value()); Headers consumedHeaders = record.headers(); for (Header header : consumedHeaders) { System.out.println(header.key()); System.out.println(new String(header.value())); } }
Здесь мы используем различные методы получения из класса ConsumerRecord для получения ключей, значений и пользовательских заголовков сообщений. Метод ConsumerRecord.headers() возвращает экземпляр заголовков , содержащий пользовательские заголовки . Заголовки — это, по сути, Iterable экземпляр типа Header . Затем мы просматриваем каждый экземпляр заголовка и извлекаем ключ и значение заголовка, используя методы Header.key() и Header.value() соответственно.
Вывод
В этой статье мы узнали, как добавлять собственные заголовки в сообщение Kafka. Мы рассмотрели различные доступные конструкторы, которые принимают пользовательские заголовки, с соответствующими реализациями. Затем мы увидели, как можно использовать сообщение с настраиваемыми заголовками и проверять их.