Рецепты Kafka

Description
Практика приготовления Apache Kafka

@aleksey_kc
Advertising
We recommend to visit

Last updated 7 months, 3 weeks ago

Last updated 1 year, 6 months ago

Единственные админы этого канала::
@ultr4mxrine / @egor_krytoi_pots

По всем вопросам:
@ultr4mxrine

Отзывы:
t.me/feedbackshiva

Last updated 1 month, 1 week ago

1 year, 4 months ago

Где брокер хранит сообщения и как их удалить?При создании топика партиции распределяются между брокерам (здесь мы рассматривали это подробнее). Брокер разбивает каждую партицию на сегменты. Каждый сегмент хранится в отдельном файле с расширением .log.

Параметр log.dirs задаёт список директорий для хранения этих файлов.

Для каждой партиции брокер держит открытые дескрипторы в режиме чтения для всех сегментов и только один дескриптор в режиме записи. Этот файл, открытый в режиме записи, является активным сегментом, то есть сегментом, в который производится запись сообщений. В один момент времени может быть только один активный сегмент. Он не может быть удалён.

По умолчанию каждый сегмент содержит или 1ГБ данных log.segment.bytes, или данные за одну неделю log.roll.ms, в зависимости от того, что наступит раньше. При достижении этого лимита текущий сегмент перестаёт быть активным, его файл переоткрывается в режиме чтения, создаётся новый файл, который соответствует новому активному сегменту.

Формат данных в log файлах соответствует формату сообщений, передаваемых между брокерами и клиентами. Поэтому брокер может применять оптимизацию zero-copy (здесь мы рассматривали это подробнее).

Потребители могут вычитывать сообщения с любого офсета и с любого времени. Чтобы брокер мог быстро находить эти сообщения во множестве сегментов, существует два типа индексов.
Первый задает соответствие офсета файлу сегмента и месту в этом файле. Хранится в файле с расширением .index.
Второй - сопоставляет временные метки с офсетами. Хранится в файле с расширением .timeindex.

Существует три политики очистки, задаваемых параметром log.cleanup.policy.

  1. delete. Является политикой по умолчанию. Удаляет неактивные сегменты или по времени log.retention.ms, или по размеру log.retention.bytes, в зависимости от того, что наступит раньше.

log.retention.ms (7 дней) удаляет неактивные сегменты, когда разница между текущим временем и временем последнего изменения сегмента (обычно соответствует времени закрытия сегмента) достигло указанного предела.

log.retention.bytes (-1) удаляет неактивные сегменты, когда общий размер сообщений в партиции достиг указанного предела.

При настройке этих параметров нужно обращать внимание на настройки сегмента log.segment.bytes, log.roll.ms и т.п. Иначе может получиться ситуация, когда сегмент всё ещё активный, в нём присутствуют сообщения за 5 дней, а мы настроили удаление за 1 день. В этом случае никакого удаления не произойдёт.

  1. compact. Удаляет сообщения в неактивных сегментах с одинаковыми ключами за исключением самого последнего. Отсюда и название - сжатие. Нельзя использовать null ключи.

Выполняется в фоне на log.cleaner.threads по достижении min.cleanable.dirty.ratio (отношение несжатых сообщений к общему размеру всех сообщений).

Позволяет полностью удалить все сообщения по определённому ключу, если в качестве value передать null. Это особое сообщение, называемое tombstone, будет храниться в течении log.cleaner.delete.retention.ms (24 часа).

Политика compact бывает полезна, например, когда приложение хранит своё состояние в kafka и при перезапуске его восстанавливает. В этом случае приложению не нужна вся история состояний, а нужно только последнее актуальное состояние.

  1. compact,delete. Объединяет в себе две политики. Сегменты будут сжиматься по политике compact, а после удаляться по политике delete. В какой-то мере страхует нас, если мы ошиблись и стали отправлять все уникальные ключи.

#broker #log

@kafka_cooker

1 year, 4 months ago

Асинхронный ли send на самом деле?

Когда мы смотрим на сигнатуру метода и видим, что он возвращает Future, то считаем его асинхронным и, возможно, даже неблокирующим. Но с методом Producer.send(...) всё обстоит немного сложнее.

Прежде чем будет создан объект Future и произойдёт отправка данных брокеру, будут выполнены следующие шаги:

  1. Получение metadata кластера
  2. Сериализация key и value
  3. Определение партиции (здесь мы подробнее рассматривали этот шаг)
  4. Выделение памяти под новый batch, если существующий уже полностью заполнен
  5. Добавление сообщения в batch

Все эти шаги выполняются не только синхронно, но и, в некоторых случаях, могут блокировать thread, на котором был вызван метод send.

Metadata кластера берётся из кэша, но может выполниться запрос на её обновление.

При использовании schema registry для сериализации, схемы тоже берутся из кэша, но могут выполняться запросы на их получение. Библиотека от confluent использует блокирующий http-клиент.

Если пользователь реализовал свою стратегию определения партиций, то, в зависимости от его фантазии, не исключены запросы во внешние системы.

Если доступной памяти под новый batch недостаточно, то будет ожидание её освобождения.

Существует несколько параметров, влияющих на работу send.

max.block.ms (60сек) - общее время ожидания получения metadata кластера и выделения памяти под новый batch. Если получение metadata заняло X, то выделение памяти должно занять не более max.block.ms - X. Время затраченное на сериализацию и определение партиции не учитывается.

buffer.memory (32МБ) - размер памяти, используемой для хранения сообщений, прежде чем они будут отправлены брокеру. Если памяти недостаточно, то будет ожидание max.block.ms её освобождения.

delivery.timeout.ms (120сек) - максимальный интервал времени между возвратом из метода send и вызовом callback-функции (переданной в этот метод) с сообщением об успехе или ошибке. Включает время, затраченное на повторные попытки отправки сообщения. Не должен превышать сумму параметров request.timeout.ms и linger.ms.

request.timeout.ms (30сек) - время ожидания ответа от брокера.

retry.backoff.ms (100мс) - время между повторными попытками отправки сообщения.

В большинстве случаев, синхронные шаги, выполняемые до старта асинхронной логики, пройдут без блокировок. Но мы от них не застрахованы, поэтому лучше не вызывать метод send на epoll.

#producer

@kafka_cooker

We recommend to visit

Last updated 7 months, 3 weeks ago

Last updated 1 year, 6 months ago

Единственные админы этого канала::
@ultr4mxrine / @egor_krytoi_pots

По всем вопросам:
@ultr4mxrine

Отзывы:
t.me/feedbackshiva

Last updated 1 month, 1 week ago