Unlock a World of Free Content: Books, Music, Videos & More Await!

Рецепты Kafka

Description
Практика приготовления Apache Kafka

@aleksey_kc
Advertising
We recommend to visit

Last updated 11 months, 4 weeks ago

Last updated 1 month, 2 weeks ago

Dm: @chin4ganja @egor_rocketrage

Feedback:

t.me/feedbackshiva

Womens channel:

https://t.me/vtjclo

Last updated 2 weeks, 3 days ago

10 months ago
**GUI для Apache Kafka**

GUI для Apache Kafka

Во время разработки, отладки и тестирования часто возникает необходимость быстро посмотреть, а что же там вообще происходит в топиках, какие, откуда и куда сообщения отправляются.

Как бы ни было богато api библиотеки kafka-clients, делать это через код неудобно и немасштабируемо в рамках команды.

Существует немало продуктов, решающих эту проблему. Например, консольный kcat, web kafka-ui или десктопный conduktor.

Разработчики могут использовать и kcat, но для аналитиков и тестировщиков это не очень удобно.

При выборе GUI для использования в своей команде, мы отталкивались в первую очередь от удобства.

По этой причине мы отказались не только от консольных утилит, но и от разных web ui, запускаемых в docker.

Долгое время мы использовали conduktor и нас всё устраивало. После известных ограничений, мы попытались перейти на offset explorer, но, объективно, он уже устарел.

Поэтому мы решили сделать свой GUI для Apache Kafka, который мог бы подойти всем участникам команды.

Atom KTool, как мы его назвали, представляет собой desktop приложение, распространяемое для Linux (deb, rpm, AppImage, tar.gz), Windows (exe, msi, portable) и macOS (dmg, pkg, tar.gz).

Позволяет:

- настраивать подключение к Apache Kafka, Schema Registry и Kafka Connect
- фильтровать сообщения по времени, офсетам и партициям
- искать сообщения по ключу и телу с помощью JavaScript функций
- публиковать сообщения
- использовать Avro, Protobuf и JSON схемы
- создавать топики и редактировать конфиги
- отслеживать потребителей
- устанавливать acl и квоты
- управлять схемами и коннекторами
- и многое другое

Ознакомиться подробнее и загрузить можно здесь https://atomone.tech/ru/ktool

По всем вопросам и проблемам можно писать @aleksey_kc

#ktool

@kafka_cooker

10 months ago

Где брокер хранит сообщения и как их удалить?При создании топика партиции распределяются между брокерам (здесь мы рассматривали это подробнее). Брокер разбивает каждую партицию на сегменты. Каждый сегмент хранится в отдельном файле с расширением .log.

Параметр log.dirs задаёт список директорий для хранения этих файлов.

Для каждой партиции брокер держит открытые дескрипторы в режиме чтения для всех сегментов и только один дескриптор в режиме записи. Этот файл, открытый в режиме записи, является активным сегментом, то есть сегментом, в который производится запись сообщений. В один момент времени может быть только один активный сегмент. Он не может быть удалён.

По умолчанию каждый сегмент содержит или 1ГБ данных log.segment.bytes, или данные за одну неделю log.roll.ms, в зависимости от того, что наступит раньше. При достижении этого лимита текущий сегмент перестаёт быть активным, его файл переоткрывается в режиме чтения, создаётся новый файл, который соответствует новому активному сегменту.

Формат данных в log файлах соответствует формату сообщений, передаваемых между брокерами и клиентами. Поэтому брокер может применять оптимизацию zero-copy (здесь мы рассматривали это подробнее).

Потребители могут вычитывать сообщения с любого офсета и с любого времени. Чтобы брокер мог быстро находить эти сообщения во множестве сегментов, существует два типа индексов.
Первый задает соответствие офсета файлу сегмента и месту в этом файле. Хранится в файле с расширением .index.
Второй - сопоставляет временные метки с офсетами. Хранится в файле с расширением .timeindex.

Существует три политики очистки, задаваемых параметром log.cleanup.policy.

  1. delete. Является политикой по умолчанию. Удаляет неактивные сегменты или по времени log.retention.ms, или по размеру log.retention.bytes, в зависимости от того, что наступит раньше.

log.retention.ms (7 дней) удаляет неактивные сегменты, когда разница между текущим временем и временем последнего изменения сегмента (обычно соответствует времени закрытия сегмента) достигло указанного предела.

log.retention.bytes (-1) удаляет неактивные сегменты, когда общий размер сообщений в партиции достиг указанного предела.

При настройке этих параметров нужно обращать внимание на настройки сегмента log.segment.bytes, log.roll.ms и т.п. Иначе может получиться ситуация, когда сегмент всё ещё активный, в нём присутствуют сообщения за 5 дней, а мы настроили удаление за 1 день. В этом случае никакого удаления не произойдёт.

  1. compact. Удаляет сообщения в неактивных сегментах с одинаковыми ключами за исключением самого последнего. Отсюда и название - сжатие. Нельзя использовать null ключи.

Выполняется в фоне на log.cleaner.threads по достижении min.cleanable.dirty.ratio (отношение несжатых сообщений к общему размеру всех сообщений).

Позволяет полностью удалить все сообщения по определённому ключу, если в качестве value передать null. Это особое сообщение, называемое tombstone, будет храниться в течении log.cleaner.delete.retention.ms (24 часа).

Политика compact бывает полезна, например, когда приложение хранит своё состояние в kafka и при перезапуске его восстанавливает. В этом случае приложению не нужна вся история состояний, а нужно только последнее актуальное состояние.

  1. compact,delete. Объединяет в себе две политики. Сегменты будут сжиматься по политике compact, а после удаляться по политике delete. В какой-то мере страхует нас, если мы ошиблись и стали отправлять все уникальные ключи.

#broker #log

@kafka_cooker

10 months, 1 week ago

Асинхронный ли send на самом деле?

Когда мы смотрим на сигнатуру метода и видим, что он возвращает Future, то считаем его асинхронным и, возможно, даже неблокирующим. Но с методом Producer.send(...) всё обстоит немного сложнее.

Прежде чем будет создан объект Future и произойдёт отправка данных брокеру, будут выполнены следующие шаги:

  1. Получение metadata кластера
  2. Сериализация key и value
  3. Определение партиции (здесь мы подробнее рассматривали этот шаг)
  4. Выделение памяти под новый batch, если существующий уже полностью заполнен
  5. Добавление сообщения в batch

Все эти шаги выполняются не только синхронно, но и, в некоторых случаях, могут блокировать thread, на котором был вызван метод send.

Metadata кластера берётся из кэша, но может выполниться запрос на её обновление.

При использовании schema registry для сериализации, схемы тоже берутся из кэша, но могут выполняться запросы на их получение. Библиотека от confluent использует блокирующий http-клиент.

Если пользователь реализовал свою стратегию определения партиций, то, в зависимости от его фантазии, не исключены запросы во внешние системы.

Если доступной памяти под новый batch недостаточно, то будет ожидание её освобождения.

Существует несколько параметров, влияющих на работу send.

max.block.ms (60сек) - общее время ожидания получения metadata кластера и выделения памяти под новый batch. Если получение metadata заняло X, то выделение памяти должно занять не более max.block.ms - X. Время затраченное на сериализацию и определение партиции не учитывается.

buffer.memory (32МБ) - размер памяти, используемой для хранения сообщений, прежде чем они будут отправлены брокеру. Если памяти недостаточно, то будет ожидание max.block.ms её освобождения.

delivery.timeout.ms (120сек) - максимальный интервал времени между возвратом из метода send и вызовом callback-функции (переданной в этот метод) с сообщением об успехе или ошибке. Включает время, затраченное на повторные попытки отправки сообщения. Не должен превышать сумму параметров request.timeout.ms и linger.ms.

request.timeout.ms (30сек) - время ожидания ответа от брокера.

retry.backoff.ms (100мс) - время между повторными попытками отправки сообщения.

В большинстве случаев, синхронные шаги, выполняемые до старта асинхронной логики, пройдут без блокировок. Но мы от них не застрахованы, поэтому лучше не вызывать метод send на epoll.

#producer

@kafka_cooker

10 months, 2 weeks ago
**Зачем брокер использует zero-copy?** Часть 2

Зачем брокер использует zero-copy? Часть 2

Часть 1Подход zero-copy стремится решить эту проблему и повысить производительность за счет устранения избыточных копий данных.

Вместо двух системных вызовов read(...) и write(...) мы сделаем один системный вызов sendfile(...).

sendfile(...) не будет копировать данные в user space, а ограничится только kernel space.

Ядро Linux < 2.4:

  1. DMA копирует данные с диска в page cache.
  2. Данные из page cache копируются в socket buffer.
  3. DMA копирует данные из socket buffer в network interface controller.

Ядро Linux >= 2.4 и network interface controller поддерживает gather operations:

  1. DMA копирует данные с диска в page cache.
  2. В socket buffer записываются не данные, а только дескрипторы с информацией о расположении и размере данных.
  3. DMA на основе полученных дескрипторов копирует данные из page cache в network interface controller.

В итоге мы имеем 2 переключения контекста и 2 копии данных.

Если рассмотреть вариант когда данные уже были в page cache (например, несколько групп потребителей вычитывают одни и те же сообщения), то такой подход позволяет потребителям вычитывать сообщения со скоростью, приближающейся к пределу скорости сетевого подключения.

В java мы можем применять такой подход путём использования nio и, в частности, метода FileChannel.transferTo(...).

Kafka использует его в классе org.apache.kafka.common.network.PlaintextTransportLayer.

Возможность применения zero-copy достигается тем, что Kafka использует единый формат сообщений для брокеров и клиентов. Иначе преобразование сообщений приходилось бы делать в user space, выходя за рамки kernel space.

Поэтому если мы настроим SSL, то будет использоваться org.apache.kafka.common.network.SslTransportLayer, который будет шифровать данные в user space. Тем самым мы лишимся преимуществ zero-copy. На самом деле существует системный вызов SSL_sendfile(...), но текущая версия Kafka его не поддерживает.

#broker #internal

@kafka_cooker

10 months, 2 weeks ago
**Зачем брокер использует zero-copy?** Часть 1

Зачем брокер использует zero-copy? Часть 1

Apache Kafka хранит сообщения в файлах на жёстком диске.

Если потребитель хочет получить сообщения с определённого офсета, то брокер должен сначала прочитать эти сообщения из файла, а затем отправить их потребителю.

Как бы мы это реализовали? Получили InputStream для файла и OutputStream для сокета, создали временный массив байт, прочитали в него данные из файла InputStream.read(...) и записали из него данные в сокет OutputStream.write(...).

С точки зрения операционной системы такой подход выглядит немного сложнее:

  1. Системный вызов read(...) переключает контекст с user mode в kernel mode. DMA копирует данные с диска в kernel space (page cache).

  2. Данные из kernel space (page cache) копируются в user space, завершается вызов read(...) и контекст переключается обратно с kernel mode в user mode.

  3. Системный вызов write(...) вновь переключает контекст с user mode в kernel mode. Данные копируются из user space в kernel space (socket buffer).

  4. Завершается вызов write(...) и контекст переключается обратно с kernel mode в user mode. Асинхронно DMA копирует данные из kernel space (socket buffer) в network interface controller.

В итоге мы имеем 4 переключения контекста и 4 копии данных.

kernel space выглядит лишним звеном, но это не так. При чтении он значительно повышает производительность, если запрашиваемый объем данных меньше его размера. А при записи - позволяет завершить её асинхронно.

Но если размер запрашиваемых данных значительно превышает размер kernel space, то это может стать узким местом в производительности. Данные копируются несколько раз между диском, kernel space и user space.

#broker #internal

@kafka_cooker

10 months, 2 weeks ago
**Как определить количество реплик для топика?**

Как определить количество реплик для топика?

Репликация предназначена для обеспечения отказоустойчивости и доступности данных не только в случае аварийных ситуаций, но и в случае проведения технического обслуживания серверов.

Партиции могут реплицироваться на брокеры в разных серверных стойках, в разных дата-центрах и даже в разных географических регионах.

Очевидно, если мы будем использовать коэффициент репликации равный 1, то при сбое не сможем ни читать, ни писать данные.

С другой стороны, если укажем его равным 10, то никто не поблагодарит нас за увеличивающиеся затраты на хранение.

Чем больше коэффициент репликации, тем:

  1. Надёжнее наша система. Неважно, откажет диск у конкретного брокера или вся стойка, мы продолжим чтение и запись.

  2. Дольше происходит запись. При acks=all мы должны получить подтверждение от всех реплик, что влияет на задержку продюсеров.

  3. Больше трафика между брокерами.

  4. Дороже хранение.

Общепринятым для production среды является коэффициент репликации равный 3.

Он обеспечивает наилучшее соотношение между надёжностью системы и затратами на её поддержание.

Допустим мы запускаем кластер с коэффициентом репликации 3 в облачной среде, в которой есть 3 зоны доступности. Зоны доступности в облаке принято рассматривать как отдельные стойки. Значит для брокеров в конкретной зоне мы должны указать одинаковый broker.rack.

У любого облачного провайдера есть окна обслуживания и соглашение об уровне обслуживания (SLA). Например, у одного отечественного провайдера SLA на запись в кластер Apache Kafka составляет 99,95%.

Это значит, что в определённый момент какая-либо зона может быть недоступна из-за обслуживания. Но у нас остаётся ещё две зоны. И даже если одна из них выйдет из строя уже из-за какой-либо аварии, то мы всё равно сможем продолжить работу.

Конечно, никто не гарантирует, что и третья зона не выйдет из строя. Но как, и в любых распределённых системах, в Apache Kafka мы работаем с вероятностями и компромиссами.

#topic #partition #replica

@kafka_cooker

10 months, 3 weeks ago
**Как партиции распределяются между брокерами?**

Как партиции распределяются между брокерами?

При создании топика с несколькими партициями и несколькими репликами Apache Kafka старается достичь нескольких целей.

Две самые важные из них по приоритету:
1. распределить реплики как можно более равномерно по серверным стойкам (если кластер располагается на одной стойке, то данная цель не имеет смысла)
2. распределить реплики как можно более равномерно по брокерам

Так же, существует важное ограничение, что на одном брокере должна быть только одна реплика конкретной партиции.
Поэтому, невозможно создать топик с фактором репликации 4 в кластере из 3 брокеров.

Основная идея распределения реплик состоит в том, чтобы распределять их циклически (round-robin) по серверным стойкам и брокерам.

Например, у нас есть стойки A, B, C и D. В каждой стойке есть по 4 брокера, обозначим их A1, A2, A3, A4 и т.д.
Мы хотим создать топик с 5 партициями и фактором репликации 3.

Для того чтобы постоянно не начинать распределение со стойки A и с первого брокера в стойке, введём случайное начальное смещение как для стойки, так и для брокера в стойке.

Для простоты восприятия, предположим, что наши смещения равны 0.

Тогда распределение будет выглядеть следующим образом:
- партиция 1: A1 B1 C1
- партиция 2: B2 C2 D1
- партиция 3: C3 D2 A2
- партиция 4: D3 A3 B3
- партиция 5: A4 B4 C4

Как видно, даже если какая-либо стойка выйдет из строя, это никак не скажется на доступности какой-либо партиции.

Однако, следует учитывать, что количество брокеров в каждой стойке должно быть как можно более одинаковым.
Иначе, из-за природы циклического распределения, на брокеры в стойке с наименьшим количеством брокеров будет приходиться наибольшее количество реплик.

Более подробно ознакомиться с алгоритмом распределения можно в классе org.apache.kafka.metadata.placement.StripedReplicaPlacer

#broker #partition

@kafka_cooker

10 months, 3 weeks ago
**Как брокер обрабатывает запросы?**

Как брокер обрабатывает запросы?

Сетевой уровень брокера Apache Kafka представляет собой NIO сервер, который поддерживает два пайплайна обработки запросов:
1. data-plane
2. control-plane

data-plane обрабатывает запросы от клиентов и других брокеров в кластере.

Для каждого слушателя в параметре listeners создаётся один acceptor thread, который обрабатывает новые соединения.

Acceptor thread содержит несколько network threads. Их количество задаётся параметром num.network.threads (3).

Каждый network thread имеет свой селектор и вычитывает запросы из сокета.

Получив запрос, network thread помещает его в очередь. Размер этой очереди задаётся параметром queued.max.requests (500). Если очередь полностью заполнится, то network threads будут заблокированы.

Запросы из этой очереди разбирают и обрабатывают несколько io threads. Их количество задаётся параметром num.io.threads (8).

Как только io thread завершил обрабатывать запрос, он кладёт результат в ответную очередь. Из этой очереди его забирает network thread и отправляет клиенту.

control-plane обрабатывает запросы от контроллера кластера. Работает только, если задан параметр control.plane.listener.name. Если не задан, то запросы от контроллера будет обрабатывать data-plane.

Работает аналогично data-plane, за исключением того, что содержит только один network thread и один io thread.

#broker

@kafka_cooker

10 months, 4 weeks ago
**Как потребителю узнать о ребалансировке?**

Как потребителю узнать о ребалансировке?

Если приложение является stateful, то есть хранит состояние, например, кэш, а этот кэш зависит от конкретных партиций, то, в случае ребалансировки и переназначения партиций, было бы полезно этот кэш или очищать или заново инициализировать.

Чтобы узнать, какие партиции участвовали в ребалансировке, нужно создать реализацию интерфейса org.apache.kafka.clients.consumer.ConsumerRebalanceListener и передать её потребителю в методе subscribe.

ConsumerRebalanceListener предоставляет три метода.

  1. onPartitionsRevoked вызывается, когда потребитель должен отказаться от партиций, которые ему были назначены ранее.
    Принимает в качестве аргумента список отозванных партиций.
    При eager ребалансировке вызывается всегда (даже с пустым списком партиций) в начале ребалансировки и после того, как потребитель перестал вычитывать сообщения.
    При cooperative ребалансировке вызывается в её конце и только с непустым списком партиций.
    Так же, может вызываться при остановке потребителя (close, unsubscribe).

  2. onPartitionsAssigned вызывается при любой успешной ребалансировке.
    Принимает в качестве аргумента список новых партиций для потребителя или пустой список, если новых партиций для потребителя нет.
    Вызывается в конце ребалансировки и до того, как потребитель начнёт вычитывать сообщения.

Если конкретная партиция была переназначена от одного потребителя к другому, то в нормальных условиях гарантируется, что вызов метода onPartitionsRevoked первым потребителем, будет осуществлён до вызова метода onPartitionsAssigned вторым потребителем.

  1. onPartitionsLost вызывается в исключительных ситуациях, например, когда истекла сессия потребителя или потребитель потерял членство в группе в результате непредвиденной ошибки.
    Принимает в качестве аргумента список отозванных партиций. Вызывается только с непустым списком партиций.
    Вызов этого метода может быть осуществлён, когда указанные партиции уже назначены другому потребителю.

#consumer #partition #rebalance

@kafka_cooker

11 months ago
**Зачем нужен статический потребитель?**

Зачем нужен статический потребитель?

Apache Kafka использует два вида потребителей:
1. динамические (по-умолчанию)
2. статические

Когда динамический потребитель впервые присоединяется к группе потребителей, то происходит ребалансировка группы и ему назначаются определённые партиции.

Когда, в следующий раз, данный потребитель решит снова присоединиться к группе (например, из-за перезагрузки приложения), то он ничего не будет знать о ранее назначенных ему партициях, и, в результате ребалансировки группы, получит новые партиции.

Статический потребитель ведёт себя по другому.

Когда статический потребитель впервые присоединяется к группе потребителей, то ребалансировка группы происходит по тому же принципу, что и для динамического потребителя.

Но, когда данный потребитель выключится, то ребалансировки группы не произойдёт и назначенные ему партиции не будут перераспределены.

Координатор группы будет ждать до session.timeout.ms (45 сек), прежде, чем решит перераспределить партиции, назначенные на статического потребителя.

Если данный потребитель успеет переподключиться к группе до истечения session.timeout.ms, то ему будут назначены прежние партиции.

Важно понимать, что пока статический потребитель неактивен, то ни один другой потребитель не будет вычитывать сообщения из данных партиций.

Чтобы превратить динамический потребитель в статический, достаточно указать уникальный параметр group.instance.id.
Если два потребителя присоединятся к одной группе с одинаковым group.instance.id, то один из них получит ошибку.

Кроме уменьшения количества ребалансировок, статические потребители полезны в случае, если приложение является stateful, то есть хранит состояние, а инициализация этого состояния из конкретных партиций занимает продолжительное время.

#consumer #partition #rebalance

@kafka_cooker

We recommend to visit

Last updated 11 months, 4 weeks ago

Last updated 1 month, 2 weeks ago

Dm: @chin4ganja @egor_rocketrage

Feedback:

t.me/feedbackshiva

Womens channel:

https://t.me/vtjclo

Last updated 2 weeks, 3 days ago