Reactive Programming

What is Reactive Programming?

Reactive programming is a programming paradigm focused on asynchronous data streams and the propagation of change. In Java, reactive programming enables building highly scalable, non-blocking applications that efficiently handle thousands of concurrent operations.

Key characteristics:

  • Non-blocking: Operations don’t block threads while waiting for I/O
  • Asynchronous: Results arrive via callbacks or reactive streams
  • Event-driven: Components react to events (data arrival, errors, completion)
  • Backpressure-aware: Consumers can signal producers to slow down

When to use reactive programming:

Good fit:

  • High-concurrency web services (handling thousands of simultaneous connections)
  • Real-time data processing (streaming analytics, IoT data pipelines)
  • Event-driven systems (notification services, chat applications)
  • I/O-intensive operations (multiple database/API calls per request)

Poor fit:

  • CPU-intensive computations (image processing, scientific calculations)
  • Simple CRUD applications with low concurrency
  • Batch processing with sequential operations
  • Teams unfamiliar with reactive paradigm (steep learning curve)

Pedagogical Approach: Standard Library First

This content follows the progression from fundamentals to production frameworks:

  1. CompletableFuture - Standard library asynchronous programming
  2. Reactive Streams - Understand the specification and contracts
  3. Project Reactor - Production-ready reactive library (Spring ecosystem)
  4. RxJava - Alternative reactive library with rich operators

Why this approach?

  • Understanding CompletableFuture reveals async/non-blocking fundamentals
  • Reactive Streams specification clarifies the contracts frameworks implement
  • Project Reactor builds on these foundations with powerful abstractions
  • Comparing implementations reveals trade-offs and design decisions

Foundation: CompletableFuture

Before reactive libraries, Java 8 introduced CompletableFuture for asynchronous programming.

Basic Async Operations

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureBasics {
    public static void main(String[] args) {
        // Simple async computation
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {  // => Creates async task (type: CompletableFuture<String>)
                                                                                   // => Lambda runs in background thread
            // Runs in ForkJoinPool.commonPool()
            return "Hello from " + Thread.currentThread().getName();  // => Returns thread name (type: String)
                                                                      // => Thread name like "ForkJoinPool.commonPool-worker-1"
        });  // => Returns immediately (non-blocking)
            // => Main thread continues without waiting

        // Non-blocking callback
        future.thenAccept(result ->  // => Registers callback for when future completes
                                    // => result is computed value (type: String)
            System.out.println("Result: " + result)  // => Prints result when available
                                                    // => Callback runs on completing thread
        );

        // Block to see output (don't do this in production)
        future.join();  // => Blocks main thread until future completes
                       // => Only for demo - defeats purpose of async!
                       // => In production: use callbacks or reactive streams
    }
}

Why this matters: CompletableFuture demonstrates async execution without blocking the calling thread. However, it’s designed for single values, not streams of data.

Composing Async Operations

public class UserService {
    public CompletableFuture<User> getUser(long userId) {  // => userId is user ID (type: long)
        return CompletableFuture.supplyAsync(() -> {  // => Returns CompletableFuture<User> immediately
                                                      // => Actual work runs asynchronously
            // Simulate database call
            return fetchUserFromDb(userId);  // => Expensive I/O operation (type: User)
                                            // => Runs on ForkJoinPool thread
        });
    }

    public CompletableFuture<List<Order>> getOrders(long userId) {  // => Returns orders future (type: CompletableFuture<List<Order>>)
        return CompletableFuture.supplyAsync(() -> {
            // Simulate API call
            return fetchOrdersFromApi(userId);  // => External API call (type: List<Order>)
                                               // => Runs in parallel with getUser()
        });
    }

    public CompletableFuture<UserProfile> getUserProfile(long userId) {
        // Compose two async operations
        CompletableFuture<User> userFuture = getUser(userId);  // => Start async user fetch (type: CompletableFuture<User>)
        CompletableFuture<List<Order>> ordersFuture = getOrders(userId);  // => Start async orders fetch (type: CompletableFuture<List<Order>>)
                                                                          // => Both execute in parallel

        return userFuture.thenCombine(ordersFuture, (user, orders) -> {  // => Combine when BOTH complete (type: CompletableFuture<UserProfile>)
                                                                         // => user is User, orders is List<Order>
            // Combine results when both complete
            return new UserProfile(user, orders);  // => Create profile with both results (type: UserProfile)
                                                  // => Only executes when both futures complete
        });  // => Returns future immediately (non-blocking)
    }
}

Problem with CompletableFuture: Works well for single async values, but becomes unwieldy for:

  • Streams of data (e.g., consuming Kafka topics, WebSocket messages)
  • Backpressure (what if producer is faster than consumer?)
  • Cancellation and resource cleanup
  • Complex error handling across streams

Solution: Reactive Streams specification and libraries like Project Reactor.

Reactive Streams Specification

Reactive Streams is a JVM specification (adopted in Java 9 as java.util.concurrent.Flow) defining standard interfaces for asynchronous stream processing with non-blocking backpressure.

The Four Interfaces

import java.util.concurrent.Flow.*;

// 1. Publisher - produces items
public interface Publisher<T> {
    void subscribe(Subscriber<? super T> subscriber);
}

// 2. Subscriber - consumes items
public interface Subscriber<T> {
    void onSubscribe(Subscription subscription);
    void onNext(T item);
    void onError(Throwable throwable);
    void onComplete();
}

// 3. Subscription - link between Publisher and Subscriber
public interface Subscription {
    void request(long n);  // Request n items (backpressure)
    void cancel();         // Cancel subscription
}

// 4. Processor - both Publisher and Subscriber
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

The Reactive Streams Contract

Critical rules:

  1. Backpressure: Subscriber calls request(n) to signal how many items it can handle
  2. No concurrent calls: Publisher must not call onNext() concurrently
  3. Sequential signals: onSubscribe()onNext()* → (onError() | onComplete())
  4. Cancel safety: Subscription can be cancelled at any time

Example flow:

  %%{init: {'theme':'base', 'themeVariables': { 'primaryColor':'#0173B2','primaryTextColor':'#fff','primaryBorderColor':'#0173B2','lineColor':'#029E73','secondaryColor':'#DE8F05','tertiaryColor':'#CC78BC','fontSize':'16px'}}}%%
sequenceDiagram
    participant Subscriber
    participant Publisher

    Subscriber->>Publisher: subscribe(subscriber)
    Publisher->>Subscriber: onSubscribe(subscription)
    Subscriber->>Publisher: request(10)
    Publisher->>Subscriber: onNext(item1)
    Publisher->>Subscriber: onNext(item2)
    Publisher->>Subscriber: ...
    Publisher->>Subscriber: onNext(item10)

    Note over Subscriber: Processed 10 items,<br/>request more

    Subscriber->>Publisher: request(10)
    Publisher->>Subscriber: onNext(item11)
    Publisher->>Subscriber: ...
    Publisher->>Subscriber: onComplete()

Why this matters: Understanding these contracts is essential for using reactive libraries correctly and debugging reactive code.

Project Reactor: Production-Ready Reactive

Project Reactor is the reactive library used by Spring Framework 5+ (Spring WebFlux). It implements Reactive Streams with powerful operators and Spring integration.

Core dependencies:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.6.0</version>
</dependency>

Mono: Zero or One Element

Mono<T> represents an asynchronous computation that completes with zero or one element.

import reactor.core.publisher.Mono;

public class MonoExamples {
    // Simple value
    Mono<String> mono = Mono.just("Hello");  // => Creates Mono with immediate value (type: Mono<String>)
                                             // => Emits "Hello" when subscribed
                                             // => Completes after emitting value

    // Empty Mono
    Mono<String> empty = Mono.empty();  // => Creates Mono that emits nothing (type: Mono<String>)
                                       // => Completes immediately without emitting value
                                       // => Represents "no result" case

    // Mono from Callable
    Mono<User> user = Mono.fromCallable(() -> {  // => Deferred execution (type: Mono<User>)
                                                 // => Callable NOT invoked until subscription
        // Deferred execution - runs when subscribed
        return userRepository.findById(123);  // => Expensive operation deferred (type: User)
                                             // => Executes on subscribing thread
    });

    // Mono with error
    Mono<String> error = Mono.error(new RuntimeException("Failed"));  // => Creates Mono that emits error (type: Mono<String>)
                                                                      // => Error signal sent to subscriber
                                                                      // => No value emitted, only error

    public Mono<User> getUserById(long id) {  // => id is user ID (type: long)
        return Mono.fromCallable(() -> userRepository.findById(id))  // => Wrap blocking call (type: Mono<User>)
                                                                     // => Returns Mono immediately
                   .subscribeOn(Schedulers.boundedElastic());  // => Execute on elastic I/O thread pool
                                                               // => Prevents blocking main thread
                                                               // => boundedElastic for blocking operations
    }
}

Key insight: Mono is lazy - nothing executes until you subscribe. This allows building complex pipelines before execution.

Flux: Zero to Many Elements

Flux<T> represents an asynchronous sequence of zero to many elements.

import reactor.core.publisher.Flux;
import java.time.Duration;

public class FluxExamples {
    // Fixed sequence
    Flux<String> flux = Flux.just("A", "B", "C");  // => Creates Flux with 3 elements (type: Flux<String>)
                                                   // => Emits "A", "B", "C" when subscribed
                                                   // => Completes after emitting all values

    // From collection
    Flux<Integer> numbers = Flux.fromIterable(List.of(1, 2, 3, 4, 5));  // => Creates Flux from collection (type: Flux<Integer>)
                                                                        // => Emits each element sequentially
                                                                        // => Useful for converting existing collections

    // Infinite sequence
    Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));  // => Creates infinite Flux (type: Flux<Long>)
                                                                 // => Emits 0, 1, 2, 3... every second
                                                                 // => Never completes (infinite stream)
                                                                 // => Use with take() to limit

    // Range
    Flux<Integer> range = Flux.range(1, 100); // 1 to 100  // => Creates Flux of 100 integers (type: Flux<Integer>)
                                                           // => Emits 1, 2, 3... 100 sequentially
                                                           // => Completes after emitting 100

    public Flux<Product> getAllProducts() {  // => Returns all products (type: Flux<Product>)
        return Flux.fromIterable(productRepository.findAll())  // => Convert List<Product> to Flux (type: Flux<Product>)
                                                               // => findAll() is blocking operation
                   .subscribeOn(Schedulers.boundedElastic());  // => Execute on elastic thread pool
                                                              // => Prevents blocking main thread
    }
}

Transforming Streams with Operators

Reactor provides operators for transforming reactive streams.

Mapping operators:

Flux<String> names = Flux.just("alice", "bob", "charlie");  // => Creates Flux with 3 names (type: Flux<String>)
                                                            // => Source stream for transformations

// map - transform each element
Flux<String> uppercase = names.map(String::toUpperCase);  // => Transform each element (type: Flux<String>)
                                                          // => Synchronous 1-to-1 transformation
                                                          // => name -> name.toUpperCase()
// Emits: "ALICE", "BOB", "CHARLIE"

// filter - keep elements matching predicate
Flux<String> longNames = names.filter(name -> name.length() > 3);  // => Keep only matching elements (type: Flux<String>)
                                                                   // => name is element being tested (type: String)
                                                                   // => Predicate returns boolean
// Emits: "alice", "charlie"  // => "bob" filtered out (length = 3)

// flatMap - async transformation returning Publisher
Flux<Order> orders = Flux.just(1L, 2L, 3L)  // => Creates Flux of user IDs (type: Flux<Long>)
    .flatMap(userId -> orderService.getOrders(userId));  // => userId is each ID (type: Long)
                                                         // => getOrders() returns Flux<Order> or Mono<List<Order>>
                                                         // => Flattens nested Publishers into single Flux
                                                         // => Executes async calls in parallel
// Flattens Mono<List<Order>> results into single Flux<Order>

**Combining streams:**

```java
Flux<String> flux1 = Flux.just("A", "B");  // => First flux (type: Flux<String>)
                                           // => Emits "A", "B"
Flux<String> flux2 = Flux.just("C", "D");  // => Second flux (type: Flux<String>)
                                           // => Emits "C", "D"

// concat - sequential (flux1 completes, then flux2)
Flux<String> sequential = Flux.concat(flux1, flux2);  // => Concatenate sequentially (type: Flux<String>)
                                                      // => Waits for flux1 to complete
                                                      // => Then subscribes to flux2
// Emits: "A", "B", "C", "D"  // => Guaranteed order

// merge - interleaved (subscribe to both, emit as items arrive)
Flux<String> merged = Flux.merge(flux1, flux2);  // => Merge concurrently (type: Flux<String>)
                                                 // => Subscribes to both immediately
                                                 // => Emits items as they arrive
// Emits: "A", "C", "B", "D" (order depends on timing)  // => Non-deterministic order

// zip - pair corresponding elements
Flux<String> zipped = Flux.zip(flux1, flux2, (a, b) -> a + b);  // => Combine paired elements (type: Flux<String>)
                                                                // => a is from flux1, b is from flux2 (both type: String)
                                                                // => Waits for both elements before emitting
                                                                // => Completes when shortest flux completes
// Emits: "AC", "BD"  // => ("A" + "C"), ("B" + "D")`

**Error handling:**

```java
Flux<String> flux = Flux.just("A", "B", "C")  // => Creates source flux (type: Flux<String>)
    .map(s -> {  // => s is each element (type: String)
        if (s.equals("B")) throw new RuntimeException("Error on B");  // => Throws on "B"
        return s;  // => Returns unchanged for "A", "C"
    })
    .onErrorReturn("DEFAULT")      // Fallback value  // => Replaces error with fallback (type: String)
                                                      // => Terminates stream with this value
                                                      // => Stream completes after error
    .onErrorResume(e -> Flux.empty()) // Fallback publisher  // => e is the error (type: Throwable)
                                                              // => Returns fallback Flux on error
                                                              // => Flux.empty() completes without values
    .onErrorContinue((err, item) -> {  // => err is error, item is failing element
                                      // => Does NOT terminate stream
        // Log and skip failing items
        log.error("Failed on item: " + item, err);  // => item is "B" (type: String)
                                                    // => Stream continues with "C"
    });
```

### Backpressure Strategies

When producer is faster than consumer, **backpressure** controls flow.

```java
Flux<Integer> fast = Flux.range(1, 1000);  // => Creates fast producer (type: Flux<Integer>)
                                           // => Emits 1000 items rapidly
                                           // => Faster than consumer can process

// Strategy 1: Buffer (store in memory)
fast.onBackpressureBuffer(100) // Buffer up to 100 items  // => Creates buffer of size 100
                                                          // => Stores excess items in memory
                                                          // => Throws error if buffer overflows
    .subscribe(item -> slowConsumer(item));  // => item is buffered integer (type: Integer)
                                            // => Consumer processes at own pace

// Strategy 2: Drop (discard excess items)
fast.onBackpressureDrop()  // => Drops items when consumer can't keep up
                          // => No buffering, immediate discard
                          // => Data loss acceptable (metrics, monitoring)
    .subscribe(item -> slowConsumer(item));  // => item is Integer, some values skipped

// Strategy 3: Latest (keep only latest item)
fast.onBackpressureLatest()  // => Keeps only most recent item
                            // => Drops intermediate values
                            // => Always processes latest state
    .subscribe(item -> slowConsumer(item));  // => item is latest Integer (e.g., jumps 1→500→1000)

// Strategy 4: Error (fail if consumer too slow)
fast.onBackpressureError()  // => Signals error immediately when overflow detected
                           // => No buffering or dropping
                           // => Fails fast strategy
    .subscribe(item -> slowConsumer(item));  // => Throws MissingBackpressureException if too slow
```

**Production choice:** Use `onBackpressureBuffer` with size limit for bursty loads, `onBackpressureDrop` for metrics/monitoring where data loss is acceptable.

### Schedulers: Threading in Reactive Code

Reactive operations are **non-blocking** but still need threads. **Schedulers** control where operations execute.

```java
import reactor.core.scheduler.Schedulers;

// Schedulers.immediate() - current thread (default)  // => No thread switching
                                                      // => Executes on calling thread
// Schedulers.single() - single reusable thread  // => Single shared thread for all subscribers
                                                 // => Useful for sequential tasks
// Schedulers.parallel() - fixed pool (CPU-bound, size = CPU cores)  // => Thread pool sized to CPU count
                                                                     // => Optimized for CPU-intensive work
// Schedulers.boundedElastic() - elastic pool (I/O-bound, grows/shrinks)  // => Thread pool grows dynamically (max ~10x CPU cores)
                                                                          // => For blocking I/O operations
                                                                          // => Threads released when idle

Mono<User> user = Mono.fromCallable(() -> {  // => Lambda NOT executed until subscription
    // This blocks! Use boundedElastic for blocking I/O
    return jdbcTemplate.queryForObject(sql, User.class);  // => BLOCKING JDBC call (type: User)
                                                          // => Must run on boundedElastic
}).subscribeOn(Schedulers.boundedElastic());  // => Subscribes on I/O thread pool
                                             // => Prevents blocking main thread

Flux<String> flux = Flux.just("A", "B", "C")  // => Creates source flux (type: Flux<String>)
    .publishOn(Schedulers.parallel())  // Downstream runs on parallel scheduler  // => Switches thread for downstream operators
                                                                                 // => map() executes on parallel thread
    .map(s -> heavyCpuWork(s));  // => s is element (type: String)
                                // => CPU-intensive transformation
                                // => Runs on parallel scheduler thread
```

**Critical rules:**

- **subscribeOn**: Controls where source/subscription executes
- **publishOn**: Controls where downstream operators execute
- **boundedElastic** for blocking I/O (JDBC, file I/O, legacy APIs)
- **parallel** for CPU-intensive operations
- Never block in reactive pipeline without appropriate scheduler

## Spring WebFlux: Reactive Web Services

**Spring WebFlux** is Spring's reactive web framework (alternative to Spring MVC).

**Dependencies:**

```xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
```

### Reactive REST Controller

```java
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController  // => Spring REST controller
@RequestMapping("/api/users")  // => Base path for all endpoints
public class UserController {
    private final UserService userService;  // => Reactive service (returns Mono/Flux)

    public UserController(UserService userService) {  // => Constructor injection
        this.userService = userService;
    }

    @GetMapping("/{id}")  // => GET /api/users/123
    public Mono<User> getUser(@PathVariable Long id) {  // => id from URL path (type: Long)
        return userService.findById(id);  // => Returns Mono<User> (type: Mono<User>)
                                         // => WebFlux subscribes automatically
                                         // => Response sent when Mono completes
    }

    @GetMapping  // => GET /api/users
    public Flux<User> getAllUsers() {  // => Returns stream of users (type: Flux<User>)
        return userService.findAll();  // => Returns Flux<User> (type: Flux<User>)
                                      // => WebFlux collects to JSON array
    }

    @PostMapping  // => POST /api/users
    public Mono<User> createUser(@RequestBody User user) {  // => user from request body (type: User)
        return userService.save(user);  // => Returns saved user (type: Mono<User>)
                                       // => Returns 200 OK with created user
    }

    @GetMapping(value = "/stream", produces = "text/event-stream")  // => Server-Sent Events endpoint
    public Flux<User> streamUsers() {  // => Continuous stream (type: Flux<User>)
        // Server-Sent Events - pushes updates to clients
        return userService.findAll()  // => Get all users (type: Flux<User>)
            .delayElements(Duration.ofSeconds(1));  // => Emit one user per second
                                                   // => Simulates real-time updates
                                                   // => Connection stays open
    }
}
```

**Why WebFlux?** Handles thousands of concurrent connections with small thread pool (non-blocking I/O).

### Reactive Repository with R2DBC

**R2DBC** (Reactive Relational Database Connectivity) provides non-blocking database access.

```xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-postgresql</artifactId>
</dependency>
```

```java
import org.springframework.data.r2dbc.repository.R2dbcRepository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public interface UserRepository extends R2dbcRepository<User, Long> {  // => R2dbcRepository for reactive database access
                                                                       // => User is entity, Long is ID type
    Flux<User> findByLastName(String lastName);  // => lastName is search parameter (type: String)
                                                // => Returns stream of matching users (type: Flux<User>)
                                                // => Query derived from method name
    Mono<User> findByEmail(String email);  // => email is search parameter (type: String)
                                          // => Returns single user or empty (type: Mono<User>)
                                          // => Email typically unique
}

@Service  // => Spring service component
public class UserService {
    private final UserRepository repository;  // => Reactive repository (returns Mono/Flux)

    public UserService(UserRepository repository) {  // => Constructor injection
        this.repository = repository;
    }

    public Mono<User> findById(Long id) {  // => id is user ID (type: Long)
        return repository.findById(id)  // => Returns Mono<User> (type: Mono<User>)
                                       // => Non-blocking database query
            .switchIfEmpty(Mono.error(new NotFoundException("User not found")));  // => Replace empty with error
                                                                                  // => Signals 404 Not Found
    }

    public Flux<User> findAll() {  // => Returns all users (type: Flux<User>)
        return repository.findAll();  // => Returns Flux<User> (type: Flux<User>)
                                     // => Non-blocking database query
                                     // => Streams results as they arrive
    }
}
```

**Critical difference from JDBC:**

- JDBC blocks thread while waiting for database  wastes threads under high concurrency
- R2DBC returns Mono/Flux immediately  thread free to handle other requests

## RxJava: Alternative Reactive Library

**RxJava** is another popular reactive library (predates Reactor, used in Android development).

**Dependencies:**

```xml
<dependency>
    <groupId>io.reactivex.rxjava3</groupId>
    <artifactId>rxjava</artifactId>
    <version>3.1.8</version>
</dependency>
```

### RxJava Core Types

```java
import io.reactivex.rxjava3.core.*;

// Single - exactly one item or error
Single<String> single = Single.just("Hello");  // => Creates Single with value (type: Single<String>)
                                               // => Must emit exactly one value
                                               // => No onComplete signal (value IS completion)

// Maybe - zero or one item
Maybe<String> maybe = Maybe.empty();  // => Creates empty Maybe (type: Maybe<String>)
                                     // => Emits 0 or 1 item
                                     // => Similar to Mono in Project Reactor

// Observable - zero to many items (no backpressure)
Observable<String> observable = Observable.just("A", "B", "C");  // => Creates Observable (type: Observable<String>)
                                                                 // => Emits 0 to N items
                                                                 // => NO backpressure support
                                                                 // => Can overwhelm slow consumers

// Flowable - zero to many items (with backpressure)
Flowable<String> flowable = Flowable.just("A", "B", "C");  // => Creates Flowable (type: Flowable<String>)
                                                           // => Emits 0 to N items
                                                           // => WITH backpressure support
                                                           // => Similar to Flux in Project Reactor

// Completable - no items, just completion or error signal
Completable completable = Completable.complete();  // => Creates completed signal (type: Completable)
                                                  // => No values emitted
                                                  // => Only signals completion or error
                                                  // => Useful for side effects (save, delete)
```

**Project Reactor vs RxJava:**

| Feature                | Project Reactor      | RxJava                         |
| ---------------------- | -------------------- | ------------------------------ |
| **0-1 items**          | Mono                 | Single, Maybe                  |
| **0-N items**          | Flux                 | Observable, Flowable           |
| **Completion signal**  | Built into Mono/Flux | Completable                    |
| **Backpressure**       | Always               | Flowable only (not Observable) |
| **Spring integration** | Native (WebFlux)     | Manual                         |
| **Learning curve**     | Moderate             | Steeper (more types)           |

**When to use RxJava:** Android development, existing RxJava codebases, preference for explicit type distinctions (Single vs Maybe vs Observable).

**When to use Reactor:** Spring ecosystem, new projects, simpler API with Mono/Flux.

## Testing Reactive Code

Testing asynchronous code requires specialized approaches.

### Testing with StepVerifier

**StepVerifier** (from reactor-test) validates reactive sequences.

```xml
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>
```

```java
import reactor.test.StepVerifier;

@Test
void testMonoSuccess() {
    Mono<String> mono = Mono.just("Hello");  // => Creates test Mono (type: Mono<String>)

    StepVerifier.create(mono)  // => Creates verifier for mono
        .expectNext("Hello")  // => Expects single emission with value "Hello"
                             // => Fails test if different value or no emission
        .verifyComplete();  // => Expects onComplete signal
                           // => Subscribes and blocks until completion
}

@Test
void testFluxSequence() {
    Flux<Integer> flux = Flux.just(1, 2, 3);  // => Creates test Flux (type: Flux<Integer>)

    StepVerifier.create(flux)  // => Creates verifier for flux
        .expectNext(1)  // => Expects first emission = 1
        .expectNext(2)  // => Expects second emission = 2
        .expectNext(3)  // => Expects third emission = 3
        .verifyComplete();  // => Expects onComplete signal
                           // => Verifies exact sequence and completion
}

@Test
void testFluxError() {
    Flux<String> flux = Flux.error(new RuntimeException("Error"));  // => Creates error-emitting Flux

    StepVerifier.create(flux)  // => Creates verifier for error flux
        .expectError(RuntimeException.class)  // => Expects RuntimeException signal
                                             // => Fails if different error or no error
        .verify();  // => Subscribes and verifies error
}

@Test
void testBackpressure() {
    Flux<Integer> flux = Flux.range(1, 100);  // => Creates Flux of 100 integers

    StepVerifier.create(flux, 10) // Request only 10 items  // => Initial request = 10 (backpressure)
                                                            // => Doesn't request all items upfront
        .expectNextCount(10)  // => Expects exactly 10 emissions
                             // => Verifies backpressure respected
        .thenRequest(10)           // Request 10 more  // => Request 10 additional items
        .expectNextCount(10)  // => Expects another 10 emissions
        .thenCancel()              // Cancel subscription  // => Cancels before completion
                                                          // => Remaining 80 items not requested
        .verify();  // => Executes verification
}
```

### Testing Delays and Virtual Time

```java
@Test
void testDelayWithVirtualTime() {
    // Without virtual time, this test would take 10 seconds
    Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(10);  // => Creates flux emitting 0-9 every second
                                                                      // => Would take 10 seconds in real time

    StepVerifier.withVirtualTime(() -> flux)  // => Uses virtual time scheduler (type: StepVerifier.FirstStep<Long>)
                                             // => Lambda defers flux creation
                                             // => Allows scheduler replacement before subscription
        .expectSubscription()  // => Expects onSubscribe signal
                              // => Verifies subscription happened
        .thenAwait(Duration.ofSeconds(10))  // => Advances virtual time by 10 seconds
                                           // => Happens instantly (not real time)
                                           // => Triggers 10 interval emissions
        .expectNextCount(10)  // => Expects 10 emissions (0, 1, 2... 9)
                             // => Verifies all interval ticks happened
        .verifyComplete();  // => Expects onComplete signal
                           // => Test completes instantly
}
```

### Testing WebFlux Controllers

```java
import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest;
import org.springframework.test.web.reactive.server.WebTestClient;

@WebFluxTest(UserController.class)  // => Loads only UserController (slice test)
                                    // => Autoconfigures WebTestClient
class UserControllerTest {
    @Autowired  // => Injected test client
    private WebTestClient webClient;  // => Non-blocking HTTP client for tests (type: WebTestClient)

    @MockBean  // => Mock service in Spring context
    private UserService userService;  // => Mocked dependency (returns Mono/Flux)

    @Test
    void testGetUser() {
        User user = new User(1L, "Alice");  // => Test user (type: User)
        when(userService.findById(1L)).thenReturn(Mono.just(user));  // => Mock returns Mono with user
                                                                     // => Simulates successful database lookup

        webClient.get()  // => Start GET request builder
            .uri("/api/users/1")  // => Request URI (ID = 1)
            .exchange()  // => Execute request (type: WebTestClient.ResponseSpec)
                        // => Non-blocking execution
            .expectStatus().isOk()  // => Expects HTTP 200 OK
            .expectBody(User.class)  // => Expects User in response body
                                    // => Deserializes JSON to User
            .isEqualTo(user);  // => Verifies response equals expected user
    }

    @Test
    void testGetAllUsers() {
        Flux<User> users = Flux.just(  // => Test flux with 2 users (type: Flux<User>)
            new User(1L, "Alice"),
            new User(2L, "Bob")
        );
        when(userService.findAll()).thenReturn(users);  // => Mock returns Flux with users
                                                       // => Simulates database query

        webClient.get()  // => Start GET request
            .uri("/api/users")  // => Request all users endpoint
            .exchange()  // => Execute request
            .expectStatus().isOk()  // => Expects HTTP 200 OK
            .expectBodyList(User.class)  // => Expects List<User> in response body
                                        // => Collects Flux to List
            .hasSize(2);  // => Verifies exactly 2 users
                         // => Validates collection size
    }
}
```

## Performance Considerations

### When Reactive Improves Performance

**High I/O concurrency:**

```java
// Imperative (Spring MVC + JDBC) - blocks thread per request
@GetMapping("/users/{id}")  // => Traditional Spring MVC endpoint
public User getUser(@PathVariable Long id) {  // => id from URL path (type: Long)
                                              // => Returns User immediately (type: User)
    // Thread blocked waiting for database
    return userRepository.findById(id).orElseThrow();  // => BLOCKING JDBC call
                                                       // => Thread waits for database response
                                                       // => Thread cannot handle other requests
}
// 1000 concurrent requests = 1000 blocked threads = high memory, context switching  // => 1000 threads × ~1MB stack = ~1GB memory
                                                                                     // => High context switching overhead

// Reactive (WebFlux + R2DBC) - non-blocking
@GetMapping("/users/{id}")  // => Reactive WebFlux endpoint
public Mono<User> getUser(@PathVariable Long id) {  // => id from URL path (type: Long)
                                                    // => Returns Mono immediately (type: Mono<User>)
    // Thread released immediately, callback when data arrives
    return userRepository.findById(id)  // => Non-blocking R2DBC query
                                       // => Thread released while waiting for database
                                       // => Can handle other requests
        .switchIfEmpty(Mono.error(new NotFoundException()));  // => Error if not found
}
// 1000 concurrent requests = small fixed thread pool = low memory, minimal context switching  // => Maybe 8-16 threads total
                                                                                               // => Low memory footprint
                                                                                               // => Minimal context switching
```

**Performance gain:** 10-100x higher concurrency with same resources.

### When Reactive Doesn't Help

**CPU-bound operations:**

```java
// Reactive doesn't help here - CPU is the bottleneck, not I/O
Flux<BufferedImage> images = Flux.fromIterable(imageFiles)  // => Creates flux from image files (type: Flux<File>)
    .map(file -> expensiveImageProcessing(file)); // Still blocks CPU  // => file is File (type: File)
                                                                       // => expensiveImageProcessing() uses CPU
                                                                       // => Non-blocking I/O doesn't help CPU work
                                                                       // => Should use parallel() or Schedulers.parallel()
```

**Simple CRUD with low concurrency:** Reactive complexity not justified if you're handling <100 concurrent requests.

### Memory Considerations

Reactive streams are memory-efficient for **bounded streams** but can leak with **infinite streams**:

```java
// Memory leak - buffers infinite stream
Flux.interval(Duration.ofMillis(1))  // => Creates infinite flux (type: Flux<Long>)
                                    // => Emits every 1ms (1000 items/second)
    .buffer(Duration.ofSeconds(10)) // Buffers 10000 items every 10 seconds  // => Collects 10 seconds of items
                                                                             // => 10000 items buffered
                                                                             // => Memory grows unbounded
    .subscribe();  // => Subscribes but doesn't process fast enough
                  // => Buffer grows continuously

// Fixed - use backpressure strategies
Flux.interval(Duration.ofMillis(1))  // => Same infinite flux
    .onBackpressureDrop() // Drop excess items  // => Drops items when consumer slow
                                               // => No buffering, constant memory
    .subscribe();  // => Subscribes safely
                  // => Memory usage bounded
```

## Common Pitfalls

### Pitfall 1: Forgetting to Subscribe

```java
// ❌ WRONG - Nothing executes!
Mono<User> user = userService.findById(1L);  // => Creates Mono but DOESN'T execute (type: Mono<User>)
                                            // => Only builds reactive pipeline
// Code never runs because Mono is lazy  // => Database query NEVER happens
                                         // => Common beginner mistake

// ✅ CORRECT - Subscribe to trigger execution
Mono<User> user = userService.findById(1L);  // => Creates Mono (type: Mono<User>)
user.subscribe(u -> System.out.println(u));  // => Subscribes and triggers execution
                                            // => u is emitted User (type: User)
                                            // => NOW database query happens

// ✅ In WebFlux controllers, framework subscribes automatically
@GetMapping("/{id}")
public Mono<User> getUser(@PathVariable Long id) {  // => id from path (type: Long)
    return userService.findById(id); // Framework subscribes  // => WebFlux subscribes automatically
                                                              // => No manual subscribe() needed
                                                              // => Framework handles subscription lifecycle
}
```

### Pitfall 2: Blocking in Reactive Pipeline

```java
// ❌ WRONG - Defeats purpose of reactive (blocks thread)
Mono<User> user = Mono.fromCallable(() -> {  // => Creates Mono (type: Mono<User>)
    return jdbcTemplate.queryForObject(sql, User.class); // BLOCKS!  // => BLOCKING JDBC call
                                                                     // => Runs on subscribing thread
                                                                     // => Blocks event loop thread
                                                                     // => Kills reactive performance
});

// ✅ CORRECT - Use boundedElastic scheduler for blocking code
Mono<User> user = Mono.fromCallable(() -> {  // => Creates Mono (type: Mono<User>)
    return jdbcTemplate.queryForObject(sql, User.class);  // => Still BLOCKING JDBC
                                                          // => But runs on separate thread
}).subscribeOn(Schedulers.boundedElastic());  // => Executes on I/O thread pool
                                             // => Event loop thread not blocked
                                             // => Bridge pattern for legacy code

// ✅ BETTER - Use reactive database driver (R2DBC)
Mono<User> user = r2dbcRepository.findById(id); // Non-blocking  // => id is user ID (type: Long)
                                                               // => Returns Mono immediately (type: Mono<User>)
                                                               // => TRUE non-blocking I/O
                                                               // => No thread blocking at all
```

### Pitfall 3: Swallowing Errors

```java
// ❌ WRONG - Error disappears silently
flux.subscribe(  // => flux is some Flux<T>
    item -> process(item),  // => item is each element (type: T)
                           // => onNext handler processes items
    error -> {} // Empty error handler swallows errors  // => error is Throwable (type: Throwable)
                                                        // => SILENTLY ignores errors
                                                        // => Impossible to debug
);

// ✅ CORRECT - Log errors
flux.subscribe(  // => Same flux
    item -> process(item),  // => Process each item
    error -> log.error("Processing failed", error)  // => error is Throwable
                                                    // => Logs error for debugging
                                                    // => At least visible in logs
);

// ✅ BETTER - Handle errors in pipeline
flux.onErrorResume(error -> {  // => error is Throwable (type: Throwable)
                              // => Catches errors in pipeline
    log.error("Processing failed", error);  // => Log error
    return Flux.empty(); // Fallback  // => Returns fallback Flux (type: Flux<T>)
                                      // => Stream continues with empty result
}).subscribe();  // => Simple subscribe() without error handler
                // => Errors already handled in pipeline
```

### Pitfall 4: Creating Mono/Flux Eagerly

```java
// ❌ WRONG - Executes immediately, not lazily
public Mono<User> getUser(Long id) {  // => id is user ID (type: Long)
    User user = repository.findById(id).block(); // Executes here!  // => BLOCKS immediately
                                                                    // => Executes BEFORE method returns
                                                                    // => Not deferred
    return Mono.just(user); // Too late  // => Returns already-fetched user (type: Mono<User>)
                                         // => Defeats lazy evaluation
                                         // => Already blocked calling thread
}

// ✅ CORRECT - Defer execution
public Mono<User> getUser(Long id) {  // => id is user ID (type: Long)
    return Mono.fromCallable(() -> repository.findById(id).block());  // => Lambda defers execution (type: Mono<User>)
                                                                      // => Executes only when subscribed
                                                                      // => Lazy evaluation preserved
    // Or better: return repository.findById(id); if repository is reactive  // => If repository already returns Mono<User>
                                                                             // => No block() needed at all
}
```

## Migrating from Imperative to Reactive

### Step 1: Identify I/O Boundaries

Replace blocking I/O with reactive equivalents:

- **JDBC**  R2DBC
- **RestTemplate**  WebClient
- **Blocking file I/O**  AsynchronousFileChannel or reactive libraries

### Step 2: Start with Controllers

```java
// Before (Spring MVC)
@RestController  // => Traditional Spring MVC controller
public class UserController {
    @GetMapping("/users/{id}")
    public User getUser(@PathVariable Long id) {  // => id from path (type: Long)
                                                  // => Returns User immediately (type: User)
        return userService.findById(id);  // => Blocking call (type: User)
                                         // => Thread waits for service
    }
}

// After (Spring WebFlux)
@RestController  // => Reactive WebFlux controller
public class UserController {
    @GetMapping("/users/{id}")
    public Mono<User> getUser(@PathVariable Long id) {  // => id from path (type: Long)
                                                        // => Returns Mono immediately (type: Mono<User>)
        return userService.findById(id);  // => Non-blocking (type: Mono<User>)
                                         // => WebFlux subscribes automatically
                                         // => Thread released while waiting
    }
}
```

### Step 3: Convert Services Layer

```java
// Before
@Service  // => Traditional service
public class UserService {
    public User findById(Long id) {  // => id is user ID (type: Long)
                                     // => Returns User immediately (type: User)
        return repository.findById(id).orElseThrow();  // => Blocking repository call (type: Optional<User>)
                                                       // => orElseThrow() on Optional
                                                       // => Throws if user not found
    }
}

// After
@Service  // => Reactive service
public class UserService {
    public Mono<User> findById(Long id) {  // => id is user ID (type: Long)
                                           // => Returns Mono immediately (type: Mono<User>)
        return repository.findById(id)  // => Reactive repository (type: Mono<User>)
                                       // => Returns Mono, not Optional
            .switchIfEmpty(Mono.error(new NotFoundException()));  // => Replace empty with error
                                                                  // => Reactive equivalent of orElseThrow()
    }
}
```

### Step 4: Wrap Legacy Blocking Code

If you can't replace blocking libraries immediately:

```java
@Service  // => Service wrapping legacy code
public class LegacyService {
    private final Scheduler scheduler = Schedulers.boundedElastic();  // => Reusable I/O scheduler (type: Scheduler)
                                                                      // => For blocking operations

    public Mono<Data> getLegacyData() {  // => Returns reactive type (type: Mono<Data>)
        return Mono.fromCallable(() -> {  // => Defers execution (type: Mono<Data>)
                                         // => Lambda wraps blocking call
            // Legacy blocking JDBC call
            return jdbcTemplate.queryForObject(sql, Data.class);  // => BLOCKING JDBC (type: Data)
                                                                  // => But runs on separate thread
        }).subscribeOn(scheduler);  // => Executes on boundedElastic pool
                                   // => Event loop thread not blocked
                                   // => Bridge to legacy code
    }
}
```

**Warning:** This is a **bridge pattern**, not ideal. Migrate to R2DBC for true non-blocking benefits.

## When NOT to Use Reactive

Reactive programming adds complexity. Avoid it when:

1. **Simple applications**: Basic CRUD with <100 concurrent users
2. **Team unfamiliar**: Steep learning curve, harder to debug
3. **Primarily CPU-bound**: Reactive optimizes I/O, not computation
4. **Legacy integration heavy**: Wrapping blocking code defeats benefits
5. **Transactional consistency critical**: Reactive transactions are harder

**Rule of thumb:** Start with Spring MVC + JDBC. Migrate to WebFlux + R2DBC when you **measure** I/O-bound concurrency bottlenecks.

## Best Practices

### 1. Use Appropriate Return Types

```java
// Use Mono for single results
Mono<User> getUser(Long id)  // => id is user ID (type: Long)
                             // => Returns 0 or 1 user (type: Mono<User>)
                             // => Single entity lookup

// Use Flux for collections
Flux<User> getAllUsers()  // => Returns 0 to N users (type: Flux<User>)
                         // => Collection of entities
                         // => Can stream millions of records

// Use Mono<Void> for operations with no return
Mono<Void> deleteUser(Long id)  // => id is user ID (type: Long)
                               // => Returns completion signal (type: Mono<Void>)
                               // => No value, only success/error signal
```

### 2. Handle Errors Close to Source

```java
Mono<User> user = repository.findById(id)  // => id is user ID (type: Long)
                                          // => Returns Mono<User> (type: Mono<User>)
    .onErrorMap(SQLException.class, e -> new DatabaseException(e))  // => e is SQLException (type: SQLException)
                                                                    // => Converts to domain exception
                                                                    // => Hides infrastructure details
    .switchIfEmpty(Mono.error(new NotFoundException()));  // => Replace empty with error
                                                          // => Not found = business error
```

### 3. Use Operators, Not Imperative Logic

```java
// ❌ WRONG - Breaking reactive chain
List<User> users = userFlux.collectList().block();  // => BLOCKS thread (type: List<User>)
                                                    // => Defeats reactive purpose
                                                    // => Collects entire stream to memory
for (User u : users) {  // => u is each user (type: User)
                       // => Imperative loop
    if (u.isActive()) {  // => Imperative condition
        processUser(u);  // => Blocking processing
    }
}

// ✅ CORRECT - Stay in reactive pipeline
userFlux  // => userFlux is Flux<User>
    .filter(User::isActive)  // => Keeps only active users
                            // => Reactive operator
    .flatMap(u -> processUser(u))  // => u is active user (type: User)
                                   // => processUser() returns Mono/Flux
                                   // => Async processing
    .subscribe();  // => Subscribes to trigger execution
                  // => Never blocks
```

### 4. Limit Concurrency with flatMap

```java
// Can overwhelm downstream services
userFlux.flatMap(user -> externalApiCall(user));  // => user is each User (type: User)
                                                  // => externalApiCall() returns Mono/Flux
                                                  // => Unbounded concurrency
                                                  // => Can make 1000s of parallel API calls
                                                  // => Overwhelms external service

// Limit to 10 concurrent calls
userFlux.flatMap(user -> externalApiCall(user), 10);  // => user is each User (type: User)
                                                      // => 10 is concurrency limit (type: int)
                                                      // => Maximum 10 concurrent API calls
                                                      // => Protects downstream service
```

### 5. Use publishOn Sparingly

```java
// Too many publishOn calls hurt performance
flux.publishOn(Schedulers.parallel())  // => Switch to parallel thread
                                       // => Thread context switch overhead
    .map(this::step1)  // => Runs on parallel thread
    .publishOn(Schedulers.boundedElastic())  // => Switch to elastic thread
                                            // => Another context switch
    .map(this::step2)  // => Runs on elastic thread
    .publishOn(Schedulers.parallel())  // => Switch back to parallel
                                       // => Third context switch
    .map(this::step3);  // => Runs on parallel thread
                       // => Too much thread switching

// Better - one publishOn for entire chain
flux.publishOn(Schedulers.boundedElastic())  // => Single thread switch
                                            // => All downstream on elastic
    .map(this::step1)  // => Runs on elastic thread
    .map(this::step2)  // => Same elastic thread
    .map(this::step3);  // => Same elastic thread
                       // => Minimal context switching
```

## Related Topics

- [Concurrency and Parallelism](/en/learn/software-engineering/programming-languages/java/in-the-field/concurrency-and-parallelism) - Thread-based concurrency vs reactive
- [Web Services and REST APIs](/en/learn/software-engineering/programming-languages/java/in-the-field/web-services) - Spring MVC vs Spring WebFlux
- [Working with SQL Databases](/en/learn/software-engineering/programming-languages/java/in-the-field/sql-database) - JDBC vs R2DBC
- [Performance Optimization](/en/learn/software-engineering/programming-languages/java/in-the-field/performance) - Measuring reactive vs imperative performance
- [Testing](/en/learn/software-engineering/programming-languages/java/by-example/beginner/testing) - Testing reactive code patterns

## References

**Official Documentation:**

- [Project Reactor Documentation](https://projectreactor.io/docs)
- [Reactive Streams Specification](https://www.reactive-streams.org/)
- [Spring WebFlux Documentation](https://docs.spring.io/spring-framework/reference/web/webflux.html)
- [R2DBC Documentation](https://r2dbc.io/)
- [RxJava Documentation](https://github.com/ReactiveX/RxJava)

**Books:**

- "Hands-On Reactive Programming in Spring 5" by Oleh Dokuka, Igor Lozynskyi
- "Reactive Programming with RxJava" by Tomasz Nurkiewicz, Ben Christensen
Last updated