Last updated 8 months, 3 weeks ago
Единственные админы этого канала::
@ultr4mxrine / @egor_krytoi_pots
По всем вопросам:
@ultr4mxrine
Отзывы:
t.me/feedbackshiva
Last updated 2 months, 1 week ago
Где брокер хранит сообщения и как их удалить?При создании топика партиции распределяются между брокерам (здесь мы рассматривали это подробнее). Брокер разбивает каждую партицию на сегменты. Каждый сегмент хранится в отдельном файле с расширением .log
.
Параметр log.dirs
задаёт список директорий для хранения этих файлов.
Для каждой партиции брокер держит открытые дескрипторы в режиме чтения для всех сегментов и только один дескриптор в режиме записи. Этот файл, открытый в режиме записи, является активным сегментом, то есть сегментом, в который производится запись сообщений. В один момент времени может быть только один активный сегмент. Он не может быть удалён.
По умолчанию каждый сегмент содержит или 1ГБ данных log.segment.bytes
, или данные за одну неделю log.roll.ms
, в зависимости от того, что наступит раньше. При достижении этого лимита текущий сегмент перестаёт быть активным, его файл переоткрывается в режиме чтения, создаётся новый файл, который соответствует новому активному сегменту.
Формат данных в log файлах соответствует формату сообщений, передаваемых между брокерами и клиентами. Поэтому брокер может применять оптимизацию zero-copy (здесь мы рассматривали это подробнее).
Потребители могут вычитывать сообщения с любого офсета и с любого времени. Чтобы брокер мог быстро находить эти сообщения во множестве сегментов, существует два типа индексов.
Первый задает соответствие офсета файлу сегмента и месту в этом файле. Хранится в файле с расширением .index
.
Второй - сопоставляет временные метки с офсетами. Хранится в файле с расширением .timeindex
.
Существует три политики очистки, задаваемых параметром log.cleanup.policy
.
log.retention.ms
, или по размеру log.retention.bytes
, в зависимости от того, что наступит раньше. log.retention.ms
(7 дней) удаляет неактивные сегменты, когда разница между текущим временем и временем последнего изменения сегмента (обычно соответствует времени закрытия сегмента) достигло указанного предела.
log.retention.bytes
(-1) удаляет неактивные сегменты, когда общий размер сообщений в партиции достиг указанного предела.
При настройке этих параметров нужно обращать внимание на настройки сегмента log.segment.bytes
, log.roll.ms
и т.п. Иначе может получиться ситуация, когда сегмент всё ещё активный, в нём присутствуют сообщения за 5 дней, а мы настроили удаление за 1 день. В этом случае никакого удаления не произойдёт.
Выполняется в фоне на log.cleaner.threads
по достижении min.cleanable.dirty.ratio
(отношение несжатых сообщений к общему размеру всех сообщений).
Позволяет полностью удалить все сообщения по определённому ключу, если в качестве value передать null. Это особое сообщение, называемое tombstone, будет храниться в течении log.cleaner.delete.retention.ms
(24 часа).
Политика compact бывает полезна, например, когда приложение хранит своё состояние в kafka и при перезапуске его восстанавливает. В этом случае приложению не нужна вся история состояний, а нужно только последнее актуальное состояние.
Асинхронный ли send на самом деле?
Когда мы смотрим на сигнатуру метода и видим, что он возвращает Future
, то считаем его асинхронным и, возможно, даже неблокирующим. Но с методом Producer.send(...)
всё обстоит немного сложнее.
Прежде чем будет создан объект Future
и произойдёт отправка данных брокеру, будут выполнены следующие шаги:
Все эти шаги выполняются не только синхронно, но и, в некоторых случаях, могут блокировать thread, на котором был вызван метод send
.
Metadata кластера берётся из кэша, но может выполниться запрос на её обновление.
При использовании schema registry для сериализации, схемы тоже берутся из кэша, но могут выполняться запросы на их получение. Библиотека от confluent использует блокирующий http-клиент.
Если пользователь реализовал свою стратегию определения партиций, то, в зависимости от его фантазии, не исключены запросы во внешние системы.
Если доступной памяти под новый batch недостаточно, то будет ожидание её освобождения.
Существует несколько параметров, влияющих на работу send
.
max.block.ms
(60сек) - общее время ожидания получения metadata кластера и выделения памяти под новый batch. Если получение metadata заняло X, то выделение памяти должно занять не более max.block.ms - X. Время затраченное на сериализацию и определение партиции не учитывается.
buffer.memory
(32МБ) - размер памяти, используемой для хранения сообщений, прежде чем они будут отправлены брокеру. Если памяти недостаточно, то будет ожидание max.block.ms
её освобождения.
delivery.timeout.ms
(120сек) - максимальный интервал времени между возвратом из метода send
и вызовом callback-функции (переданной в этот метод) с сообщением об успехе или ошибке. Включает время, затраченное на повторные попытки отправки сообщения. Не должен превышать сумму параметров request.timeout.ms
и linger.ms
.
request.timeout.ms
(30сек) - время ожидания ответа от брокера.
retry.backoff.ms
(100мс) - время между повторными попытками отправки сообщения.
В большинстве случаев, синхронные шаги, выполняемые до старта асинхронной логики, пройдут без блокировок. Но мы от них не застрахованы, поэтому лучше не вызывать метод send
на epoll.
Last updated 8 months, 3 weeks ago
Единственные админы этого канала::
@ultr4mxrine / @egor_krytoi_pots
По всем вопросам:
@ultr4mxrine
Отзывы:
t.me/feedbackshiva
Last updated 2 months, 1 week ago