Блог CosySoft
2025-05-14 18:17 Разработка

Знакомство с Nifi для занятых инженеров. Часть 4

Привет, я Антон, backend-разработчик в CosySoft. В этом цикле статей (1, 2, 3) я рассказываю про Apache NiFi — инструмент для работы с потоками данных, который мы используем в проекте.
Это финальная, четвертая часть цикла про NiFi. Здесь расскажу, почему NiFi не всегда подходит: чем опасен перенос бизнес-логики в него, особенно в крупных проектах, и какие ограничения стоит учитывать. Поделюсь практическими кейсами: баги процессоров, проблемы с кластером, подводные камни конфигурации.

Когда не рекомендуется использовать Nifi

Реализация части бизнес логики в enterprise-проекте

В предыдущей части я уже объяснял свою позицию по поводу реализации части бизнес логики в Nifi (при наличии сервисов занимающихся бизнес логикой). Если вы работаете на проектах уровня enterprise, принципы чистой архитектуры должны превалировать над сиюминутной выгодой.

Реализация всей бизнес логики в enterprise-проекте

Теоретически, Nifi можно использовать как полную замену традиционным сервисам с типовой CRUD логикой – у него есть для этого все возможности, есть специальные процессоры, которые создают API endpoint-ы, есть возможности почти любых преобразований текстовых данных, и есть возможность сохранять результат в постоянное хранилище. Кто то даже пробовал так делать [^].
Но меня сложно убедить в том, что такой проект будет таким же доступным для понимания и гибким к требованиям бизнес-логики, как разработка на любом программном фреймворке. Рано или поздно ваш проект выйдет за рамки простого CRUD сервиса и тут у вас появятся проблемы.
Хороший чистый код гораздо более лаконичен нежели аналогичное по функционалу описание потока данных в Nifi на UI, что очень важно, так как разработчики гораздо чаще читают кодовую базу чем что-то в ней меняют. В Nifi вы ограничены коробочным инструментарием, тогда как в программном коде вы имеете полную свободу в реализации решения. Вся конфигурация Nifi статична, тогда как бизнес логика может требовать изменений алгоритма и настроек на лету. Этот список можно продолжать бесконечно.

Проблемы и особенности Nifi, с которыми я сталкивался на практике

Изучение нового – это всегда прогулка по граблям. Таков путь.
Здесь я приведу несколько проблемных ситуаций при использовании Nifi, с которыми я сталкивался в реальной работе. Эти небольшие зарисовки – еще один повод поговорить о некоторых не самых очевидных на первый взгляд особенностях работы Nifi.

Кластер и его особенности

Если вы ведете разработку потоков на локальной машине в standalone-режиме, а выкатываете результат работы на Nifi кластер, могут понадобиться отдельные доработки. Например мне пришлось вносить изменения в поток из-за того, что по-умолчанию процессоры настроены с Execution: All nodes. В результате процессоры-источники данных создавали дубликаты. Частая проблема всех новичков, судя по комментариям в интернете.
Некоторые процессоры действительно могут работать в режиме All nodes и вычитывать только свою порцию данных из-за особенностей модели потребления, например процессоры-потребители данных из Kafka пользуются преимуществом модели партицирования (kafka partitioning). Хотя и там есть свои особенности, читайте про Consumer Partition Assignment.
Если ваш источник лишен таких преимуществ, нужно настраивать чтение из него через Execution: Primary node. После этого вы возможно захотите распределять данные на все ноды в кластере – это нужно включить явно через настройки исходящего соединения-очереди (но не нужно это делать во всех нижележащих очередях).

Баги реализации процессоров

Однажды я хотел настроить процессор-потребитель Kafka так, чтобы он запускался раз в час и вычитывал накопившиеся данные небольшими порциями. Оказалось, что при Run Schedule со Scheduling strategy: Timer driven, если выставить больше 10 секунд, вообще не происходит чтения данных при запусках.
Помогла альтернатива Scheduling strategy: CRON driven.
Мораль: тестируйте ваши потоки прежде чем катить их на продакшн.

Подводные камни реализации процессоров

Лимитирование потребления из Kafka в процессоре не взлетело

Мне не удалось с помощью настроек процессора-потребителя, даже по расписанию, вычитывать лимитированный объем данных из Kafka. В процессоре-потребителе есть настройка Max Poll Records. Можно было бы предположить, что если процессор запускается с перерывами, он делает лишь один poll и останавливается до следующего раза. Но нет, похоже процессор не останавливается, пока не прочитает все что накопилось, или пока не включится back pressure исходящего соединения очереди.
Наверное лимит действительно можно было бы установить на уровне очереди, но я воспользовался процессором ControlRate, у него больше возможностей в плане настройки, к тому же у меня была потребность лимитировать не потребление из Kafka как таковое, а скорость с которой данные передаются ниже по течению.

На процессоры-потребители Kafka действуют те же законы что и на любых клиентов-потребителей

Однажды я создал новый процессор-потребитель Kafka и он после запуска стал возвращать лишь половину данных.
А все потому, что он был скопирован с уже существующего где-то процессора, и после копирования в нем не менялись ни Topic Name(s) (и не должен был меняться) ни Group ID (он же consumer group, следовало бы изменить).
Да, если вы создадите два (или больше) процессора-потребителя с одинаковой группой, топиком и адресом брокера – они действительно будут связаны одной группой потребления и партиции распределятся между ними, как произошло бы с любыми другими Kafka клиентами. В Nifi с его подчеркнутой независимостью процессоров друг от друга об этом легко забыть (и еще легче – скопировать процессоры, забыв поменять конфигурацию).

Ответ с ошибкой от InvokeHTTP пишется не в контент, а в атрибут с ограничением по длине строки

Мне нужно было отправлять данные из Nifi в REST API и получать результат success / error, при этом в случае error надо было вернуть изначальное тело запроса (JSON) с дополнительными данными о возникшей ошибке. Далее ответ передавался как FlowFile ниже по течению.
Конечно же я посмотрел в документацию процессора InvokeHTTP и от моего внимания не ускользнуло, что в случае любого response кода кроме 2хх, ответ пишется не в контент результирующего FlowFile, а в его атрибут invokehttp.response.body. “Не очень удобно, но не смертельно” подумал я, и реализовал ответ в случае ошибки с 500 кодом в сервисе.
Но от моего внимания ускользнуло, что этот атрибут имеет ограничение на размер строки. В рез-те JSON с ошибкой просто не влезал в ограничение и обрезался. Причем настройку размера строки даже нельзя поменять точечно в одном процессоре (настройка Response Body Attribute Size на него не влияет, читайте внимательно документацию). Похоже, что настройку можно изменить только глобально, для всех. Чего конечно я делать не стал, да и не помогло бы это, ведь даже увеличив допустимый размер мы все равно рискуем не влезть в установленное ограничение если в каком-то случае нам попадется достаточно большой JSON. К тому же атрибуты загружаются в оперативную память, так что увеличивая эту настройку глобально, вы рискуете в какой-то момент получить OutOfMemoryError на одной или нескольких нодах кластера.
Пришлось перехватывать все исключения и отправлять в том числе ошибочные ответы от API c 200 кодом, в таком случае JSON в любом случае оказывается в контенте FlowFile.

Быстрое разрастание Content Repository

Как я уже писал, контент FlowFile-ов при редактировании не меняется, создается его новая копия, а старая версия удаляется не сразу. У нас однажды занятый объем Content Repository стремительно (за несколько часов) вырос на несколько сотен гигабайт.
У нас в Nifi было много команд разработки и много разных независимых потоков данных, поэтому я до конца не уверен в причинах и виновных, но мне кажется наиболее вероятным, что где-то в одной точке сошлись два фактора:
  • интенсивный поток FlowFile-ов с объемным контентом проходил через немалое количество этапов редактирования контента
  • из-за медленной обработки на сервере образовался затор в одной из очередей ниже по течению, который не позволял FlowFile-ам дойти до конца и уничтожиться, этот затор в свою очередь вызвал затор выше, тот еще выше, и так до самого верха.
Мораль – следите, чтобы не образовывалось больших заторов в соединениях-очередях. Вы можете управлять ограничением вместительности каждой отдельной очереди – как по количеству FlowFile-ов (Object threshold) так и по объему данных (Size threshold). По умолчанию кстати Size threshold установлен в 1 Гб, что не всегда оптимально. Также, можно настроить время жизни FlowFile-ов в очереди, если для вас допустима потеря данных.
На задержку очистки контента уже обработанных и удаленных FlowFile-ов может влиять особенность хранения контента в файловой системе. Дело в том, что контенты нескольких независимых FlowFile-ов при хранении могут объединяться в единый файл [^] [^] [^], и чтобы этот файл можно было удалить, Nifi должен дождаться завершения обработки всех таких FlowFile-ов [^] (не говоря уже о том, что контент не удаляется сразу при включенной архивации [^]). Советую хорошо осознать эту особенность и её последствия, к сожалению из-за неё в UI вы не видите корректных показателей объема, занятого FlowFile-ми в Content Repository [^].
Принимайте также во внимание, что при перераспределении FlowFile-ов между нодами кластера происходит пересылка всего FlowFile-a вместе с контентом. Большой объем контента может приводить к повышенным нагрузкам на сеть. Есть возможность использовать сжатие контента, оно настраивается в соединении-очереди, но это дорогая операция, используйте её только если контент каждого FlowFile-а действительно объемный.
Если вы в какой-то момент собираете объемные FlowFile-ы для обработки на Primary Node, советую заложить запас на случай увеличения объема Content Repository на всех нодах, т.к. у вас нет контроля за тем, какая из нод будет выбрана в качестве Primary.

Плюсы и минусы Nifi

Подведем итоги в виде плюсов и минусов Nifi.

Плюсы Nifi

  • Уменьшение степени зацепления (coupling) между сервисами, улучшение SRP из SOLID. Происходит это за счет того, что зависимости, направленные от одних компонентов к другим, замещаются на Nifi, все зависимости которого обычно направлены к сервисам. Как следствие – Nifi может помочь реализовать очень гранулированные, самодостаточные микросервисы (или даже FaaS).
  • Быстрая автоматизация рутинных операций (если знать как это делать)
  • Вынос части операций в Nifi может уменьшить нагрузку на ваши сервисы
  • Хорошая документация (хотя могла бы быть еще лучше)
  • Стандартизированные компоненты и, следовательно, предсказуемость
  • Open source, Apache License 2.0, кроссплатформенность (Java), возможность кастомизации
  • Горизонтально масштабируется
  • Встроенное security
  • Независимость от стека, т.к. преимущественно оперируем над текстовыми / битовыми данными и стандартизированным API
  • Контент FlowFile-а может быть любым – JSON строка, CSV данные, мультимедиа файл, файловый архив и т.д. Конечно, возможностей обработки текстового контента у Nifi гораздо больше
  • Визуальное программирование т.к. работа преимущественно через UI. Несложно научиться работать не-программисту, но все же требует технических знаний. 80% усилий занимает конфигурирование готовых компонентов
  • Полезные фичи из коробки, которые мне особенно понравились:
  1. удобная, интуитивно-понятная визуализация потока данных на UI
  2. очереди и back pressure между процессорами (хотя back pressure и может выйти боком, о чем я писал выше)
  3. возможность запуска процессоров по расписанию

Минусы Nifi

  • Направления зависимостей обычно направлены от Nifi к другим компонентам (сервисам, базам, брокерам сообщений и т.д.). Законы архитектуры учат нас, что чем больше компонент имеет исходящих зависимостей, тем более он хрупок (нуждается в постоянной поддержке, изменении конфигурации и логики если происходят значимые изменения в компонентах-зависимостях)
  • Работа преимущественно через UI, недостаток shortcuts, визуальное программирование. Nifi слишком визуален.
  1. Как следствие – очень громоздкие "how to" статьи и форумы, в которых примеры настроек часто отображается скриншотами
  2. Как следствие – повышенный риск проблем из-за человеческого фактора при эксплуатации (например, выключили процессор для изменения конфигурации, а затем забыли его включить обратно)
  3. Нужно очень хорошо знать возможности и ограничения конкретных процессоров и контроллеров чтобы быстро и корректно реализовывать нужную логику обработки.
  • Легко поддаться соблазну размазать свою бизнес логику в Nifi (хотя скорее это проблема пользователя, а не самого Nifi)
  • Для сложных схем потоков сложно автоматизировать все настолько, чтобы это работало без периодического ручного контроля
  • Нужно хорошо разбираться в настройках, есть нюансы. Неправильная конфигурация может привести к серьезным проблемам
  • Не так легко кастомизировать
  • Не очень удобен для динамической логики
  • Обычные проблемы system-in-the-middle:
  1. Вносит дополнительную точку отказа в ваш продакшн
  2. Дополнительный артефакт, который надо готовить
  3. Дополнительный элемент в вашем CI/CD, с которым надо как то жить
  4. Требует операционных ресурсов (людей, мощностей; Nifi кластер – не самая скромная по железу система)
  5. Может увеличивать задержки (latency)
  6. Усложняется онбординг новичков
  7. Сложнее изучать систему в целом

Дополнительные материалы которые я рекомендую

Видео-обучающие материалы на youtube:

Steven Koon
InsightByte
Официальная документация