В сети очень много информации о kafka, особенно теоритических разборов тех или иных ее возможностей. Одной из возможностей kafka является exactly once delivery, которая появилась в ней начиная с версии 0.11. По умолчанию exactly once delivery является выключенной. Чтобы ее использовать, необходимо конфигурировать как Producer (приложение, которое отправляет сообщения), так и Consumer (приложение, которое читает сообщения). Далее я хочу разобрать, как именно это сделать.

Без приведенных ниже настроек kafka поддерживает at least once semantics, при которой сообщения могут дублироваться в случае возникновения ошибок сети или при попытке переотправки Producer-ом. В некоторых приложениях появление дубликатов сообщений приведет к некорректному состоянию системы, что может быть очень критично. От дубликатов можно защититься поставив базу данных для дедупликации для каждого Consumer-а, но это не выглядит как элегантное решение. Давайте попробуем решить эту проблему настроив kafka для работы в режиме exactly once.

Для начала изменим на Consumer значение параметра isolation.level. Эта настройка имеет по умолчанию значение read_uncommitted. Это значит, что Consumer прочитает сообщение из топика как только оно в нем появится. При этом если Producer, положивший это сообщение, откатит транзакцию и попытается повторить процесс отправки, то Consumer получит дубликат ранее обработанного сообщения. Чтобы предотвратить чтение uncommitted сообщений необходимо выставить isolation.level в read_committed. Однако, Consumer с isolation.level=read_committed сможет читать сообщения из топика даже от не транзакционных Producer-ов, следовательно только этой настройки недостаточно.

Изменим параметры Producer-а. Наша цель - включить поддержку транзакций. Для этого должны быть соблюдены следующие условия:

  1. max.in.flight.requests.per.connection должно быть <= 5. Значение по умолчанию удовлетворяет этому условию (по умолчанию 5).
  2. acks должен быть равен all (значение по умолчанию 1). Этот параметр отвечает за количество acknowledgments сообщений, полученных лидером kafka кластера, перед отправкой подтверждения об успешной записи на Producer. Значение all говорит о том, что все узлы kafka кластера должны подтвердить получение сообщения.
  3. retries должно иметь значение > 0 (по умолчанию 0). Значение этого параметра показывает, сколько раз Producer должен повторить отправку сообщения в случае возникновения проблем на kafka кластере.
  4. enable.idempotence необходимо выставить в true (по умолчанию false). Идемпотентность гарантирует, что сообщение, которое записывается в топик, не будет дублироваться в рамках стрима. Такое может произойти, например, во время переотправок на Producer, когда сообщение было записано в кластер, а подтверждение об этом потерялось. Однако, идемпотентность не гарантирует отсутствие дубликатов при записи в несколько партиций одного топика. Для этих гарантий нужны транзакции.
  5. Теперь мы можем включить поддержку транзакций путем присвоения значения для параметра transactional.id (по умолчанию null). Необходимо гарантировать уникальность значения transactional.id даже для разных экземпляров одного и того же приложения. В противном случае kafka может посчитать первый запущенный экземпляр приложения зомби процессом и его работа будет нарушена. Для этого, например, можно добавить в transactional.id идентификатор хоста, на котором запущено приложение.

После того, как транзакционный Producer настроен, осталось только добавить транзакционную отправку сообщений в код вашего приложения. Например, в spring для этого можно воспользоваться аннотацией @Transactional с указанием имени бина типа KafkaTransactionManager над методом или использовать TransactionTemplate, если вы не любите AOP.