Блог CosySoft
Разработка Технологии

Spring WebFlux на реальных проектах: что нужно знать перед внедрением

Когда производительность и масштабируемость системы становится критически важной традиционные блокирующие подходы могут оказаться недостаточными. Именно для таких задач был создан Spring WebFlux — асинхронный неблокирующий фреймворк, основанный на реактивной парадигме программирования. Наш backend-разработчик Миша столкнулся с WebFlux на одном из проектов и решил глубже изучить его возможности. В статье он поделится своим опытом, раскроет ключевые особенности фреймворка и расскажет возможных проблемах интеграции.

Введение

Дисклеймер
В этой статье речь пойдет главным образом о WebFlux, работающем на базе Netty в рамках Spring, а не о чистом использовании Project Reactor.
Прежде чем углубляться в WebFlux, важно разобраться в ключевых концепциях, которые лежат в его основе: функциональном программировании, асинхронности и реактивности.

Функциональное программирование

Функциональное программирование — это стиль, в котором функции играют центральную роль, и весь код строится через них. Главные принципы функционального программирования:
  • Чистота функций: функция при одинаковых входных данных всегда возвращает один и тот же результат. Она не изменяет внешнее состояние и не зависит от него.
  • Функции высшего порядка: это функции, которые могут принимать другие функции в качестве аргументов или возвращать их.
Этот подход позволяет писать предсказуемый и тестируемый код. Но в реальном мире веб-приложений, чистое функциональное программирование встречается редко.
Функции высшего порядка — это функции, которые могут принимать в качестве аргументов другие функции или возвращать их как результат. Такой подход позволяет писать более гибкий и читаемый код, особенно при работе с обработкой данных или асинхронными операциями. В Java и Kotlin элементы функционального программирования уже присутствуют, однако строгий функциональный подход редко используется на практике. Тем не менее, даже небольшое использование таких возможностей делает код более выразительным и компактным.

Асинхронность

Асинхронность — это способность программы выполнять задачи, не останавливая исходный поток выполнения, т.е. делегировать выполнение другому исполнителю или условно воспользоваться свободными тиками процессора. Например, если система ожидает ответа от базы данных или другой долгой операции, она может выполнять другие задачи вместо того, чтобы висеть в ожидании. Это делается с помощью прерывания текущего метода и переключения потока на другую задачу. В результате ресурсы процессора используются более эффективно.
Таким образом, асинхронность позволяет эффективнее использовать потоки, переключая задачи между ними. Как это реализовано? Это сделано через прерываемость: в хорошем асинхронном коде любой метод может быть прерван в момент ожидания, а поток переключится на другую задачу, пока не получит ответ от первой.
Например: в обычном блокирующем коде каждый запрос к базе данных блокирует поток до получения ответа. В асинхронном коде поток может переключиться на другие задачи, пока ждет результат запроса.

Реактивное программирование

Реактивное программирование базируется на идее обработки потоков данных и событий в реальном времени. Реактивный манифест объясняет принципы этого подхода.
  • Отзывчивость: система должна быстро реагировать на события и возвращать ответы с минимальной задержкой. Например, даже если она упадет с ошибкой, это должно произойти быстро, чтобы не замедлять работу приложения.
  • Устойчивость: если компонент системы выходит из строя, это не должно нарушать работу всей системы. Например, компонент должен быть заменен репликой, восстановлен быстро, и снаружи это не должно быть очевидно.
  • Гибкость: система должна уметь адаптироваться к нагрузкам и эффективно распределять ресурсы. Например, создавать новые реплики или подключать дополнительные машины для обработки запросов, избегая узких мест вроде ожидания IO.
  • Основанная на сообщениях: компоненты системы общаются между собой через асинхронные сообщения. Это может быть передача порционных данных по http или RPC, сообщение по WebSocket или те же сообщения из Kafka.

Backpressure и порционная обработка

Одним из важнейших аспектов реактивного программирования является управление нагрузкой, так называемый backpressure. Когда система получает больше запросов, чем может обработать, она не должна пытаться сразу выполнить их все, иначе это приведет к перегрузке и сбоям. Вместо этого, реактивные системы могут запрашивать данные порциями и обрабатывают их поэтапно, что предотвращает зависания и позволяет лучше контролировать производительность.
Пример: предположим, сервер получает миллион запросов одновременно. Вместо того, чтобы пытаться обработать все сразу и перегрузить систему, он начнет сам запрашивать порции сообщений в том объеме, в котором сообщения могут быть обработаны без критических перегрузок, держа нагрузку под контролем.

Как работает WebFlux

WebFlux — это веб-фреймворк на базе Spring, который реализует все вышеперечисленные концепции. Сам по себе WebFlux является просто интерфейсом, под капотом которого уже может находиться реактивный фреймворк типа Reactor. Он позволяет создавать асинхронные и неблокирующие веб-приложения, которые легко масштабируются и эффективно работают под высокой нагрузкой. Весь код в таких приложениях будет построен в виде функциональных пайплайнов, описывающих обработку асинхронных сообщений.
В отличие от традиционного подхода, где каждая операция блокирует поток до завершения, WebFlux позволяет переключаться между задачами, обрабатывая только то, что действительно готово к выполнению. Это особенно важно для высоконагруженных систем, где множество клиентов могут отправлять запросы одновременно.
Если традиционное приложение может обслуживать ограниченное количество одновременных пользователей (в зависимости от количества потоков), WebFlux способен обрабатывать тысячи соединений, эффективно управляя потоками и избегая блокировок.
WebFlux создает архитектуру, основанную на асинхронности и реактивности, что позволяет разрабатывать высокопроизводительные приложения, способные эффективно обрабатывать большие объемы данных и запросов.

Как использовать WebFlux

По умолчанию Spring WebFlux запускается на Netty, реактивном серверном фреймворке. Он позволяет эффективно управлять потоками, нагружая их при блокировках IO, в отличие от того же «голого» Reactor, который просто реализует асинхронность через многопоточность или дефолтного в Spring MVC контейнера Jetty. Если при работе со Spring MVC у нас один поток на запрос, то с Netty можно эффективно управлять ограниченным количеством потоков, скажем, 12 потоков для 12 ядер процессора.
В основе работы с WebFlux лежат две основные сущности: Mono и Flux. Mono — это поток из одного элемента, а Flux — это поток из множества элементов. Рассмотрим простейший пример на Kotlin, где создается эндпоинт, возвращающий Mono, содержащий строку:
@GetMapping("/test")
fun test(): Mono<String> {
    return Mono.just("Hello, WebFlux")
        .map { value -> value.uppercase() }
}
Mono — это контейнер, который мы наполняем данными и можем изменять их с помощью таких операторов, как map. Важно, что выполнение кода не происходит сразу, а только при подписке на этот Mono.

Асинхронная работа с WebFlux

Одно из главных преимуществ WebFlux — это неблокирующий подход к работе с IO-операциями. Например, если в приложении необходимо выполнить несколько запросов к внешним сервисам, то WebFlux позволяет не блокировать поток при ожидании ответа от них, а переключить ожидающий поток на выполнение другой операции.
Я использую асинхронный HTTP-клиент из WebFlux для обращения к другим сервисам. В примере ниже мы делаем запросы на три заранее заготовленных эндпоинта, запущенных на другом приложении. Каждый из эндпоинтов просто делает задержку на 5, 10 и 0 секунд соответственно. Если бы мы делали эти запросы в блокирующем коде, они суммарно заняли бы 15 секунд (запрос на первый занял бы 10, на второй 5 и на третий 0 секунд). Но благодаря WebFlux все выполняется в неблокирующей парадигме, и итоговое время выполнения — всего 10 секунд, т.к. ожидающий ответа поток просто перешел бы сразу ко второму запросу.
fun getResults(): Flux<String> {
    val urls = listOf("/endpoint1", "/endpoint2", "/endpoint3")
    return Flux.fromIterable(urls)
        .flatMap { url -> 
            webClient.get()
                .uri(url)
                .retrieve()
                .bodyToMono(String::class.java)
        }
}
WebClient отправляет запросы, не блокируя основную нить выполнения, а поток просто переключается на другие задачи, пока ожидает ответа от сервера.

Event Loop: эффективность потоков

В основе реактивного программирования в WebFlux лежит концепция event loop. Это позволяет значительно эффективнее управлять потоками, особенно при высоких нагрузках.
Event loop — это поток, который постоянно сидит в ожидании новых задач. Когда он сталкивается с блокирующей операцией, он делегирует ее в пул ожидания и продолжает работать с другими запросами. Это не увеличивает количество потоков, но позволяет использовать их более рационально.
WebFlux и Netty работают с неблокирующими IO через Java NIO, что и обеспечивает неблокируемость при IO-операциях, что в свою очередь делает этот подход особенно полезным для высоконагруженных в контексте интеграций систем.

Управление блокирующими операциями в WebFlux

Хотя WebFlux предлагает множество преимуществ для асинхронного программирования, его эффективность напрямую зависит от правильного управления блокирующими операциями. Основная проблема заключается в том, что если event loop встречает блокирующую операцию, он просто «умрет» и остановит обработку других запросов.

Проблема блокировки

Если мы помещаем любую блокирующую операцию в event loop, он просто останавливается. Например, если вызвать обычный RestTemplate с синхронным вызовом, он будет ждать ответа 15 секунд. В итоге, все выполняется в одном потоке и блокирует event loop, из-за чего теряется вся производительность, которую предоставляет асинхронный подход.
Вот пример кода, который показывает эту проблему:
val restTemplate = RestTemplate()

fun getResults(): Flux<String> {
    val urls = listOf("/endpoint1", "/endpoint2", "/endpoint3")
    return Flux.fromIterable(urls)
        .map { url -> 
            client.getForObject(
                "http://localhost:8084$url",
                String::class.java
            )
        }
}
В этом случае, если endpoint задерживает ответ, весь event loop будет заблокирован, пока ожидается результат. Это приводит к снижению производительности сервера, поскольку другие запросы не могут быть обработаны.

Рекомендации по работе с блокирующими операциями

Чтобы избежать блокировки event loop, важно правильно управлять блокирующими вызовами. Одним из способов является использование специального пула потоков для выполнения блокирующих операций, размер которого по умолчанию соответствует количеству потоков event loop. Например, можно использовать subscribeOn и publishOn, чтобы перенаправить выполнение на другой поток:
val restTemplate = RestTemplate()

fun getResults(): Flux<String> {
    val urls = listOf("/endpoint1", "/endpoint2", "/endpoint3")
    return Flux.fromIterable(urls)
        .flatMap { url -> 
            Mono.fromCallable {
                client.getForObject(
                    "http://localhost:8084$url",
                    String::class.java
                )
            }.publishOn(Schedulers.boundedElastic())
        }
}
При использовании boundedElastic для блокирующих операций мы можем избежать блокировки event loop. Это позволяет эффективно использовать потоки для выполнения таких операций, не теряя преимуществ реактивной модели.
Если нам нужно работать с блокирующей библиотекой, как например клиент Kafka (да, на момент написания статьи в WebFlux все еще нет реактивного клиента для Kafka), стоит выделить отдельный поток или использовать подход с Schedulers, чтобы не останавливать весь event loop.
Однако важно помнить, что пул подобных дополнительных потоков ограничен, и если количество блокирующих операций станет слишком большим, это может привести к снижению производительности.

На что обратить внимание при работе с WebFlux

  1. Работа с WebFlux требует особого подхода, особенно при обработке транзакций и исключений. Декларативные аннотации, такие как @Transactional, могут не работать должным образом из-за особенностей AOP в реактивной среде, что приводит к проблемам с неявной демаркацией транзакций. Многие библиотеки не оптимизированы для WebFlux и их неправильное использование, например, чрезмерное использование flatMap или ручное управление потоками может привести к ухудшению производительности приложения.
  2. Использование WebFlux требует внимательного подхода и знаний о его внутренних механизмах. Например, оператор flatMap, хотя и широко используется для объединения потоков, может стать причиной высоких затрат ресурсов. Альтернативные операторы, такие как concatMap, могут оказаться более подходящими в определенных сценариях. Кроме того, разработчики должны быть внимательны к работе с контейнерами, такими как стримы и Optional, чтобы избежать неожиданных проблем.

Утечки памяти в WebFlux: диагностика и решение

Одной из самых распространенных и трудно диагностируемых проблем при работе с WebFlux является утечка памяти. Этот вопрос требует глубокого понимания особенностей реактивного программирования и его тонкостей. В своей практике я сталкивался с утечками памяти несколько раз, и каждый раз причины были связаны с неправильной реализацией стримов.

Причины утечек памяти

  1. Возникновение утечек памяти часто связано с неверным созданием реактивных стримов. Например, в одном из моих проектов мне нужно было имитировать бесконечный стрим данных. Я планировал периодически опрашивать базу данных для получения новых записей и добавлять их в стрим. Однако, создав стрим неправильно, я столкнулся с OutOfMemoryError. После тщательного мониторинга памяти с помощью Grafana и написания интеграционных тестов мне удалось воспроизвести проблему и, в конечном итоге, понять, что я неправильно создал стрим.
  2. Также утечка может произойти при применении лямбды для потоков. Например, вы создали лямбды для потоков, которые захватывали окружение. Это значит, что при создании лямбды она будет сохранять ссылки на локальные переменные, даже если метод, в котором они были объявлены, уже завершил свою работу. То есть, если в лямбде находилась final переменная с большими данными, то при хранении лямбды в памяти эта переменная продолжала существовать. В результате Garbage Collector не может удалить ее из памяти, поскольку она все еще доступна через лямбду, что приводит к накоплению памяти.
public Flux<Data> createDataStream() {
    final HeavyObject heavyObject = new HeavyObject();

    return Flux.create(sink -> {
        // Лямбда сохраняет ссылку на heavyObject
        sink.next(heavyObject);
        sink.complete();
    });
}
В данном случае heavyObject будет храниться в памяти, пока существует стрим, что может привести к утечке.
Чтобы избежать подобных проблем, старайтесь избегать захвата локальных переменных в лямбдах, особенно если они содержат тяжелые объекты. Вместо этого используйте более простые и легкие структуры данных, которые не будут сохраняться в памяти без необходимости.

Альтернативы WebFlux

Механизм ThreadPool

Наиболее простой и низкоуровневый способ — это использование ThreadPool. Он позволяет управлять потоками и эффективно распределять задачи, но требует значительных усилий для управления состоянием потоков и синхронизации.

Класс CompletableFuture

Другим вариантом является CompletableFuture, который используется в современных проектах. Однако этот подход имеет свои недостатки. Во-первых, CompletableFuture предлагает ограниченный функционал и не всегда удобно чейнится. Например, операции, которые в Stream API называются map или flatMap, здесь имеют менее очевидные названия, такие как thenApply и thenCompose. Это может вызвать путаницу у разработчиков, привыкших к более привычной семантике.

Корутины

Корутины — это легковесные потоки, которые позволяют писать асинхронный код в императивном стиле, делая его проще для понимания и поддержки. Их основное преимущество — легковесность, что делает корутины менее ресурсоемкими по сравнению с традиционными потоками Java, особенно в высоконагруженных приложениях.
Код с корутинами проще читать и поддерживать, поскольку он не требует дополнительных операторов, таких как map и flatMap (опять же, код может быть написан в обычном императивном стиле). В Kotlin корутины также поддерживают работу с реактивными стримами через Flow, что упрощает работу с потоками данных.

Выводы

WebFlux — это мощный инструмент, но он требует тщательной оценки перед внедрением и понимания того, когда и где его использование оправдано. Он значительно упрощает работу с асинхронностью и многопоточностью благодаря своему удобному API и реактивной модели. WebFlux хорошо подходит для задач, связанных с I/O, например, для интеграций с внешними сервисами или базами данных, так как позволяет эффективно обрабатывать потоки данных.
Однако WebFlux не является универсальным решением. Например, если вам необходимо выполнить сложные вычисления или процессорные задачи, такие как шифрование или хеширование, лучше использовать отдельные потоки через Scheduler или даже специализированные решения на уровне оборудования, такие как CUDA для GPU.