个人技术分享

如果你想要使用一个轻量级的消息中间件,不需要分布式支持,那么可以选择RxJava或者Reactor,本文将讲述如何入门使用该框架,以及常用的一些功能

生产者

广播多消费者模式

  1. Sinks.Many: 创建一个允许我们将数据推送到一个Flux的sink。我们使用Sinks.many().multicast().onBackpressureBuffer()来创建一个支持背压的多播Sink(广播模式)。
  2. Flux flux = sink.asFlux(): 将sink转换为一个Flux,从而可以进行订阅。
  3. onBackpressureBuffer:缓存消息,当有多个消费者消费不同步时,可以保障消费滞后的消费者阻塞恢复时继续消费。例如:consumer1 -》消费到了offset=10,consumer2-〉消费到了offset=3,在consumer2完成offset=3的处理时,可以继续消费offset=4
  4. tryEmitNext:在buffer队列满时会按照实际调用的策略返回错误码,或者抛出异常
示例代码
// 创建一个Sinks.Many对象,允许我们将数据推送到一个Flux
Sinks.Many<Event> sink = Sinks.many().multicast().onBackpressureBuffer(1024);
// 启动一个生产者来发布数据
sink.tryEmitNext(message);

广播单消费者模式

Help building Sinks. Many that will broadcast signals to a single Subscriber

示例代码
Sinks.many().unicast().onBackpressureBuffer();

重放消息模式

示例代码
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;

public class ReplayLimitExample {
    public static void main(String[] args) throws InterruptedException {
        // 创建一个Sinks.Many对象,使用replay()和limit()来限制重放的消息数量
        Sinks.Many<String> sink = Sinks.many().replay().limit(3);

        // 创建一个Flux,将从sink接收数据
        Flux<String> flux = sink.asFlux().publishOn(Schedulers.parallel());

        // 订阅者1
        flux.subscribe(data -> System.out.println("Subscriber 1 received: " + data));

        // 发布一些数据
        sink.tryEmitNext("Message 1");
        sink.tryEmitNext("Message 2");
        sink.tryEmitNext("Message 3");
        sink.tryEmitNext("Message 4");
        sink.tryEmitNext("Message 5");

        // 让主线程等待一段时间,以确保上述数据都已发布
        Thread.sleep(1000);

        // 订阅者2在稍后订阅,将会接收到最近的3条消息
        flux.subscribe(data -> System.out.println("Subscriber 2 received: " + data));

        // 发布更多数据
        sink.tryEmitNext("Message 6");
        sink.tryEmitNext("Message 7");

        // 让主线程等待,以确保所有消息都已处理完毕
        Thread.sleep(1000);
    }
}

代码解释
  1. 创建Sink: 使用 Sinks.many().replay().limit(3) 创建一个多播的重放 Sink,并限制其重放缓存为最近的 3 条消息。
  2. 创建Flux: 将 Sink 转换为一个 Flux,并使用 publishOn(Schedulers.parallel()) 来在并行调度器上发布消息。
  3. 订阅者1: 首先订阅 Sink,并接收所有发布的消息。
  4. 发布消息: 发布 5 条消息到 Sink。
  5. 延时: 让主线程等待一段时间,以确保所有消息都已发布并被第一个订阅者接收到。
  6. 订阅者2: 在稍后订阅 Sink,并接收最近的 3 条消息(“Message 3”, “Message 4”, “Message 5”)。
  7. 发布更多消息: 继续发布更多的消息到 Sink。
  8. 等待: 再次让主线程等待,以确保所有消息都已处理完毕。
输出结果
Subscriber 1 received: Message 1
Subscriber 1 received: Message 2
Subscriber 1 received: Message 3
Subscriber 1 received: Message 4
Subscriber 1 received: Message 5
Subscriber 2 received: Message 3
Subscriber 2 received: Message 4
Subscriber 2 received: Message 5
Subscriber 1 received: Message 6
Subscriber 2 received: Message 6
Subscriber 1 received: Message 7
Subscriber 2 received: Message 7

消费者

示例代码

// 创建一个Flux,将从sink接收数据
Flux<Event> flux = sink.asFlux();
// 启动一个消费者来订阅和处理数据
flux.subscribe((Consumer<T>) myConsumer);

消息合并(类似Flink)

Java Reactor框架支持消息合并功能。Reactor是一个响应式编程库,允许处理异步数据流,提供强大的操作符来对数据流进行转换和合并。类似于Flink,Reactor可以实现消息的合并与处理。
下面是一些Reactor框架中用于消息合并的关键操作符和方法:

  1. buffer: 将一定数量的元素收集到一个列表中,然后发射该列表。
  2. window: 类似于buffer,但它返回的是Flux<Flux>,每个子Flux包含一个窗口的数据。
  3. groupBy: 将元素按指定的键进行分组,并返回一个Flux<GroupedFlux<K, V>>,每个GroupedFlux包含相同键的元素。
  4. merge: 将多个Flux合并到一个Flux中,按时间顺序发射元素。
  5. zip: 按顺序组合多个Flux的元素,直到其中一个Flux耗尽。
  6. combineLatest: 组合多个 Publisher(比如 Flux 或 Mono),每当其中任意一个 Publisher 产生新的数据时,它会把所有 Publisher 的最新数据组合成一个新的数据项。

下面是一个使用Reactor的例子,展示如何进行消息的合并:

示例代码

buffer
import reactor.core.publisher.Flux;

import java.time.Duration;
import java.util.List;

public class BufferExample {
    public static void main(String[] args) throws InterruptedException {
        // 使用 Flux.interval 创建两个生产者,分别每 100 毫秒和每 150 毫秒生成一条消息,并生成 10 条消息后停止。
        // 创建一个Flux,产生一些消息
        Flux<String> producer1 = Flux.interval(Duration.ofMillis(100))
                                     .map(i -> "Producer 1 - Message " + i)
                                     .take(10); // 生成10条消息

        Flux<String> producer2 = Flux.interval(Duration.ofMillis(150))
                                     .map(i -> "Producer 2 - Message " + i)
                                     .take(10); // 生成10条消息

        // 合并两个生产者的消息流
        Flux<String> mergedFlux = Flux.merge(producer1, producer2);

        // 使用buffer操作符按每5条消息进行分批
        Flux<List<String>> bufferedFlux = mergedFlux.buffer(Duration.ofSeconds(1));

        // 订阅并消费批量消息
        bufferedFlux.subscribe(batch -> {
            System.out.println("Received a batch of messages:");
            batch.forEach(System.out::println);
        });

        // 等待足够的时间以便查看输出结果
        Thread.sleep(5000);
    }
}

运行结果

Received a batch of messages:
Producer 1 - Message 0
Producer 2 - Message 0
Producer 1 - Message 1
Producer 1 - Message 2
Producer 2 - Message 1
Producer 1 - Message 3
Producer 2 - Message 2
Producer 1 - Message 4
Producer 1 - Message 5
Producer 2 - Message 3
Producer 1 - Message 6
Producer 2 - Message 4
Producer 1 - Message 7
Producer 1 - Message 8
Producer 2 - Message 5
Received a batch of messages:
Producer 1 - Message 9
Producer 2 - Message 6
Producer 2 - Message 7
Producer 2 - Message 8
Producer 2 - Message 9
zip

8个数据流以内都可以直接使用zip聚合为tuple,超过8个需要改用迭代器

import reactor.core.publisher.Flux;
import reactor.util.function.Tuple2;

import java.time.Duration;

public class ReactorExample {
    public static void main(String[] args) throws InterruptedException {
        // 创建两个Flux,用于合并示例
        Flux<String> flux1 = Flux.just("A", "B", "C")
                .delayElements(Duration.ofMillis(100));
        Flux<String> flux2 = Flux.just("1", "2", "3")
                .delayElements(Duration.ofMillis(150));

        // 使用zip合并两个Flux
        Flux<Tuple2<String, String>> combinedFlux = Flux.zip(flux1, flux2);

        combinedFlux.subscribe(tuple -> 
            System.out.println("Merged: " + tuple.getT1() + " - " + tuple.getT2())
        );

        // 让主线程等待,直到Flux处理完成
        Thread.sleep(1000);
    }
}

在这个例子中,flux1和flux2分别发射字符串和数字,zip操作符将它们的元素按顺序合并,并打印出组合后的结果。
Reactor提供了丰富的操作符来处理和合并消息流,满足类似于Flink的实时数据处理需求。如果需要更复杂的合并逻辑,可以结合使用不同的操作符,甚至自定义操作符来实现特定的处理逻辑。
超过8个数据流聚合,原Tuple就不再支持了需要改用迭代器+聚合Function方式

import reactor.core.publisher.Flux;

import java.util.Arrays;

public class FluxZipExample {
    public static void main(String[] args) {
        // 创建三个不同的Flux发布者
        Flux<String> flux1 = Flux.just("A", "B", "C");
        Flux<Integer> flux2 = Flux.just(1, 2, 3);
        Flux<Double> flux3 = Flux.just(0.1, 0.2, 0.3);

        // 使用Flux.zip合并发布者,并通过自定义函数处理
        Flux<String> zippedFlux = Flux.zip(
            Arrays.asList(flux1, flux2, flux3), 
            objects -> {
                String str = (String) objects[0];
                Integer num = (Integer) objects[1];
                Double dbl = (Double) objects[2];
                return str + "-" + num + "-" + dbl;
            }
        );

        // 订阅并打印合并后的Flux
        zippedFlux.subscribe(System.out::println);
    }
}

运行结果

A-1-0.1
B-2-0.2
C-3-0.3
window

在Reactor中,window操作符可以用来将流分割成多个较小的流,这些流称为窗口。下面是一个使用Reactor框架的window操作符的示例。
我们将创建一个模拟数据流,然后使用window操作符将其分割成多个窗口,并处理每个窗口的数据。

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;

public class ReactorWindowExample {
    public static void main(String[] args) throws InterruptedException {
        // 用于等待所有处理完成
        CountDownLatch latch = new CountDownLatch(1);

        // 模拟一个每秒钟发出一个数据的流
        Flux<Long> source = Flux.interval(Duration.ofSeconds(1))
                                .take(10); // 只取前10个元素

        source
            .window(3) // 每3个元素一个窗口
            .flatMap(window -> 
                window.collectList() // 收集窗口中的所有元素
                      .doOnNext(list -> System.out.println("Window: " + list))
            )
            .doOnComplete(latch::countDown) // 当处理完成时,减少latch计数
            .subscribe();

        // 等待所有处理完成
        latch.await();
    }
}

运行结果

Window: [0, 1, 2]
Window: [3, 4, 5]
Window: [6, 7, 8]
Window: [9]
combineLatest
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuple3;

import java.time.Duration;

public class CombineLatestExample {

    public static void main(String[] args) throws InterruptedException {
        Flux<String> flux1 = Flux.just("A", "B", "C")
                .delayElements(Duration.ofMillis(300));

        Flux<Integer> flux2 = Flux.just(1, 2, 3)
                .delayElements(Duration.ofMillis(400));

        Flux<Double> flux3 = Flux.just(10.0, 20.0, 30.0)
                .delayElements(Duration.ofMillis(500));

        Flux<Tuple3<String, Integer, Double>> combinedFlux = Flux.combineLatest(
            flux1,
            flux2,
            flux3,
            objects -> Tuples.of((String) objects[0], (Integer) objects[1], (Double) objects[2])
        );

        combinedFlux.subscribe(tuple -> System.out.println("Combined: " + tuple));

        // 为了让程序能看到输出,主线程需要等待一段时间
        Thread.sleep(5000);
    }
}

运行结果

Combined: [A,1,10.0]
Combined: [B,1,10.0]
Combined: [B,2,10.0]
Combined: [C,2,10.0]
Combined: [C,2,20.0]
Combined: [C,3,20.0]
Combined: [C,3,30.0]
groupBy

groupBy与zip组合使用案例,可以将一个group下的多个事件组装,避免不同group的事件交错组合,例如:股票数据,一定是希望同一个股票的数据拼接在一起,而不是将亚马逊的交易量与谷歌的换手率组装在一起
注意:groupBy类似shell脚本中的sort会阻塞整个管道,等管道数据全部到齐之后才会开始排序分组
三个案例

  1. groupBy -> consume -> zip
  2. groupBy -> zip -> consume
  3. zip -> groupBy -> consume

场景1

import java.util.Arrays;
import java.util.List;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.function.Tuple2;

import java.time.Duration;

public class ZipWithGroupByExample {

  public static void main(String[] args) throws InterruptedException {
    Sinks.Many<String> many = Sinks.many().multicast().onBackpressureBuffer(1024);
    Flux<String> flux = many.asFlux();
    for (String s : Arrays.asList("apple", "banana", "apricot", "blueberry", "avocado")) {
      System.out.println("success case1 : " + many.tryEmitNext(s));
    }
    // 需要发送完成信号,group by才会开始消费
    many.tryEmitComplete();

    // Group the flux by the first letter of each word
    Flux<GroupedFlux<Character, String>> groupedFlux = flux.groupBy(word -> word.charAt(0));

    // Process each group
    groupedFlux.subscribe(group -> {
      // Collect all elements of the group into a list
      Mono<List<String>> listMono = group.collectList();

      // Subscribe to the Mono and print the group
      listMono.subscribe(list -> {
        System.out.println("Group " + group.key() + ": " + list);

        // Create a flux from the list
        Flux<String> groupFlux = Flux.fromIterable(list);

        // Zip the elements of the group flux with an index
        Flux<Tuple2<Long, String>> zippedFlux = Flux.zip(Flux.interval(Duration.ofMillis(100)), groupFlux);

        // Subscribe to the zipped flux and print the elements
        zippedFlux.subscribe(tuple -> System.out.println("Zipped Group " + group.key() + ": " + tuple));
      });
    });
    for (String s : Arrays.asList("apple", "banana", "apricot", "blueberry", "avocado")) {
      // 操作失败,生产者已完成所有发送:tryEmitComplete
      System.out.println("failed case1 : " + many.tryEmitNext(s));
    }
    // 操作失败,生产者已完成所有发送:tryEmitComplete
    flux = many.asFlux();
    for (String s : Arrays.asList("apple", "banana", "apricot", "blueberry", "avocado")) {
      System.out.println("failed case2 : " + many.tryEmitNext(s));
    }
    // 重新创建生产者,发送成功
    many = Sinks.many().multicast().onBackpressureBuffer(1024);
    for (String s : Arrays.asList("apple", "banana", "apricot", "blueberry", "avocado")) {
      System.out.println("success case2 : " + many.tryEmitNext(s));
    }

    // Wait for the program to complete
    Thread.sleep(5000);
  }
}

运行结果

success case1 : OK
success case1 : OK
success case1 : OK
success case1 : OK
success case1 : OK
Group a: [apple, apricot, avocado]
Group b: [banana, blueberry]
failed case1 : FAIL_TERMINATED
failed case1 : FAIL_TERMINATED
failed case1 : FAIL_TERMINATED
failed case1 : FAIL_TERMINATED
failed case1 : FAIL_TERMINATED
failed case2 : FAIL_TERMINATED
failed case2 : FAIL_TERMINATED
failed case2 : FAIL_TERMINATED
failed case2 : FAIL_TERMINATED
failed case2 : FAIL_TERMINATED
success case2 : OK
success case2 : OK
success case2 : OK
success case2 : OK
success case2 : OK
Zipped Group b: [0,banana]
Zipped Group a: [0,apple]
Zipped Group b: [1,blueberry]
Zipped Group a: [1,apricot]
Zipped Group a: [2,avocado]

场景2

import java.util.Arrays;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Sinks;
import reactor.util.function.Tuple2;

public class ReactorTest {

  public static void main(String[] args) throws InterruptedException {
    Sinks.Many<String> many = Sinks.many().multicast().onBackpressureBuffer(1024);
    Flux<String> flux = many.asFlux();
    for (String s : Arrays.asList("apple", "banana", "apricot", "blueberry", "avocado")) {
      many.tryEmitNext(s);
    }
    // 需要发送完成信号,group by才会开始消费
    many.tryEmitComplete();

    Sinks.Many<String> group2 = Sinks.many().multicast().onBackpressureBuffer(1024);
    Flux<String> flux2 = group2.asFlux();
    for (String s : Arrays.asList("apple-group2", "avocado-group2", "blueberry-group2", "apricot-group2", "banana-group2")) {
      group2.tryEmitNext(s);
    }
    group2.tryEmitComplete();

    // Group the flux by the first letter of each word
    Flux<GroupedFlux<Character, String>> groupedFlux1 = flux.groupBy(word -> word.charAt(0));
    Flux<GroupedFlux<Character, String>> groupedFlux2 = flux2.groupBy(word -> word.charAt(0));

    Flux<Tuple2<GroupedFlux<Character, String>, GroupedFlux<Character, String>>> zippedFlux = Flux.zip(groupedFlux1, groupedFlux2);
    // Subscribe to the zipped flux and print the elements
    zippedFlux.subscribe(tuple -> {
      GroupedFlux<Character, String> groupedFlux11 = tuple.getT1();
      GroupedFlux<Character, String> groupedFlux22 = tuple.getT2();
      System.out.println("Zipped Group key pair " + groupedFlux11.key() + " : " + groupedFlux22.key());
      groupedFlux11.zipWith(groupedFlux22).subscribe(data -> System.out.println("Zipped Group " + ": " + data));
    });

    // Wait for the program to complete
    Thread.sleep(5000);
  }
}

运行结果

Zipped Group key pair a : a
Zipped Group : [apple,apple-group2]
Zipped Group : [apricot,avocado-group2]
Zipped Group key pair b : b
Zipped Group : [banana,blueberry-group2]
Zipped Group : [avocado,apricot-group2]
Zipped Group : [blueberry,banana-group2]

场景3

import java.util.Arrays;
import java.util.List;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.function.Tuple2;

public class ReactorTest {

  public static void main(String[] args) throws InterruptedException {
    Sinks.Many<String> many = Sinks.many().multicast().onBackpressureBuffer(1024);
    Flux<String> flux = many.asFlux();
    for (String s : Arrays.asList("apple", "banana", "apricot", "blueberry", "avocado")) {
      many.tryEmitNext(s);
    }
    // 需要发送完成信号,group by才会开始消费
    many.tryEmitComplete();

    Sinks.Many<String> group2 = Sinks.many().multicast().onBackpressureBuffer(1024);
    Flux<String> flux2 = group2.asFlux();
    for (String s : Arrays.asList("apple-group2", "avocado-group2", "blueberry-group2", "apricot-group2", "banana-group2")) {
      group2.tryEmitNext(s);
    }
    group2.tryEmitComplete();

    Flux.zip(flux, flux2).groupBy(tuple -> tuple.getT1().charAt(0)).subscribe(group -> {
      // Collect all elements of the group into a list
      Mono<List<Tuple2<String, String>>> listMono = group.collectList();

      // Subscribe to the Mono and print the group
      listMono.subscribe(list -> {
        System.out.println("Group " + group.key() + ": " + list);

        // Create a flux from the list
        Flux<Tuple2<String, String>> groupFlux = Flux.fromIterable(list);

        // Subscribe to the zipped flux and print the elements
        groupFlux.subscribe(tuple -> System.out.println("Zipped Group " + group.key() + ": " + tuple));
      });
    });

    // Wait for the program to complete
    Thread.sleep(5000);
  }
}

运行结果

Group a: [[apple,apple-group2], [apricot,blueberry-group2], [avocado,banana-group2]]
Zipped Group a: [apple,apple-group2]
Zipped Group a: [apricot,blueberry-group2]
Zipped Group a: [avocado,banana-group2]
Group b: [[banana,avocado-group2], [blueberry,apricot-group2]]
Zipped Group b: [banana,avocado-group2]
Zipped Group b: [blueberry,apricot-group2]

两个生产者不同步场景

在消息合并的场景下,如果使用 Flux.merge 或 Flux.zip 操作符,当一个生产者开始生成消息而另一个生产者在一段时间后才开始生成消息时,消费者接收到的消息取决于所使用的操作符。

使用 Flux.merge 的情况

Flux.merge 操作符会将多个 Flux 合并成一个 Flux,并按照消息到达的时间顺序来发射消息。如果一个生产者开始生成消息,而另一个生产者在稍后才开始生成消息,消费者会立即接收到已经有消息的生产者的消息,稍后再接收到另一个生产者的消息。

import reactor.core.publisher.Flux;

import java.time.Duration;

public class MergeExample {
    public static void main(String[] args) throws InterruptedException {
        // 生产者1,立即开始生成消息
        Flux<String> producer1 = Flux.interval(Duration.ofMillis(500))
                                     .map(i -> "Producer 1 - Message " + i);

        // 生产者2,在2秒后开始生成消息
        Flux<String> producer2 = Flux.interval(Duration.ofMillis(500))
                                     .map(i -> "Producer 2 - Message " + i)
                                     .delaySubscription(Duration.ofSeconds(2));

        // 合并两个生产者的消息流
        Flux<String> mergedFlux = Flux.merge(producer1, producer2);

        // 订阅并消费消息
        mergedFlux.subscribe(System.out::println);

        // 等待足够的时间以便查看输出结果
        Thread.sleep(5000);
    }
}
使用 Flux.zip 的情况

Flux.zip 操作符会将多个 Flux 的消息按索引位置一一配对,并且只有当所有参与合并的 Flux 都有消息时,才会发射一个配对消息。如果一个生产者开始生成消息,而另一个生产者在稍后才开始生成消息,消费者不会立即接收到消息,只有在第二个生产者也开始生成消息之后,消费者才会接收到配对好的消息。

import reactor.core.publisher.Flux;
import reactor.util.function.Tuple2;

import java.time.Duration;

public class ZipExample {
    public static void main(String[] args) throws InterruptedException {
        // 生产者1,立即开始生成消息
        Flux<String> producer1 = Flux.interval(Duration.ofMillis(500))
                                     .map(i -> "Producer 1 - Message " + i);

        // 生产者2,在2秒后开始生成消息
        Flux<String> producer2 = Flux.interval(Duration.ofMillis(500))
                                     .map(i -> "Producer 2 - Message " + i)
                                     .delaySubscription(Duration.ofSeconds(2));

        // 将两个生产者的消息流按顺序组合成一个消息流
        Flux<Tuple2<String, String>> zippedFlux = Flux.zip(producer1, producer2);

        // 订阅并消费消息
        zippedFlux.subscribe(tuple -> System.out.println("Combined: " + tuple.getT1() + " & " + tuple.getT2()));

        // 等待足够的时间以便查看输出结果
        Thread.sleep(5000);
    }
}
输出结果
Flux.merge 输出示例:
Producer 1 - Message 0
Producer 1 - Message 1
Producer 1 - Message 2
Producer 1 - Message 3
Producer 2 - Message 0
Producer 1 - Message 4
Producer 2 - Message 1
Producer 1 - Message 5
Producer 2 - Message 2
Producer 1 - Message 6
Producer 2 - Message 3
Producer 1 - Message 7
Producer 2 - Message 4
Flux.zip 输出示例:
Combined: Producer 1 - Message 0 & Producer 2 - Message 0
Combined: Producer 1 - Message 1 & Producer 2 - Message 1
Combined: Producer 1 - Message 2 & Producer 2 - Message 2
Combined: Producer 1 - Message 3 & Producer 2 - Message 3
Combined: Producer 1 - Message 4 & Producer 2 - Message 4

总结

  • Flux.merge: 消费者会接收到先产生消息的生产者的所有消息,然后接收到后产生消息的生产者的消息。消息是按时间顺序发射的。
  • Flux.zip: 消费者只有在两个生产者都有消息时,才会接收到配对好的消息。在第二个生产者开始生成消息之前,消费者不会收到任何消息。

选择哪种操作符取决于你的具体需求和希望如何处理消息流的合并。

延迟消息

在 Reactor 中,实现延迟消息传递可以通过使用 delayElements 或 delaySubscription 操作符来实现。这些操作符可以用来延迟每个元素的发布或延迟整个流的订阅。
以下是一个使用 delayElements 和 delaySubscription 的示例:

示例代码

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.time.LocalDateTime;

public class DelayedMessageExample {

    public static void main(String[] args) throws InterruptedException {
        // 示例1:使用 delayElements 延迟每个元素
        Flux<String> messageFlux = Flux.just("Message 1", "Message 2", "Message 3")
                .delayElements(Duration.ofSeconds(1))
                .doOnNext(message -> System.out.println(LocalDateTime.now() + " - Received: " + message));

        // 订阅消息流
        messageFlux.subscribe();

        // 示例2:使用 delaySubscription 延迟整个流的订阅
        Flux<String> delayedStartFlux = Flux.just("Delayed Message 1", "Delayed Message 2")
                .delaySubscription(Duration.ofSeconds(3))
                .doOnNext(message -> System.out.println(LocalDateTime.now() + " - Received: " + message));

        // 订阅延迟开始的消息流
        delayedStartFlux.subscribe();

        // 为了让程序能看到输出,主线程需要等待足够的时间
        Thread.sleep(7000);
    }
}

代码解释

  • 使用 delayElements 延迟每个元素
  • delayElements(Duration.ofSeconds(1)):每个元素的发布将被延迟 1 秒。
  • doOnNext(message -> System.out.println(LocalDateTime.now() + " - Received: " + message)):在每个元素发布时打印当前时间和消息内容。
  • 使用 delaySubscription 延迟整个流的订阅
  • delaySubscription(Duration.ofSeconds(3)):整个流的订阅将被延迟 3 秒。
  • doOnNext(message -> System.out.println(LocalDateTime.now() + " - Received: " + message)):在每个元素发布时打印当前时间和消息内容。
  • 主线程等待
  • Thread.sleep(7000):主线程等待 7 秒,以便可以看到所有消息的输出。

输出结果

运行此程序时,你会看到输出类似如下:

2024-06-10T14:23:45.123 - Received: Message 1
2024-06-10T14:23:46.123 - Received: Message 2
2024-06-10T14:23:47.123 - Received: Message 3
2024-06-10T14:23:48.123 - Received: Delayed Message 1
2024-06-10T14:23:48.123 - Received: Delayed Message 2

此示例展示了如何使用 Reactor 的延迟操作符实现延迟消息传递的需求。通过 delayElements 和 delaySubscription,可以灵活地控制消息的发布时机。

背压

onBackpressureBuffer 是 Reactor 中用于处理背压的一个策略。当生产者产生的数据速度超过消费者处理数据的速度时,onBackpressureBuffer 可以将这些数据临时存储在一个缓冲区中,以防止数据丢失或生产者被强制减速。
在响应式编程中,背压(Backpressure)是指在异步数据流中,当消费者处理能力不足以跟上生产者的速度时,需要一种机制来管理数据流量。Reactor 提供了多种处理背压的策略,其中包括:

  1. Drop: 丢弃多余的数据。
  2. Latest: 只保留最新的数据。
  3. Error: 抛出异常。
  4. Buffer: 缓冲数据,直到消费者能够处理。

onBackpressureBuffer 具体作用是创建一个缓冲区,存储那些暂时无法被消费者处理的数据。这个缓冲区可以无限制地增长,或者可以设置一个固定的容量,当缓冲区满时,可以指定进一步的处理策略,如丢弃、错误等。

关键点

  • 缓冲区: onBackpressureBuffer 创建了一个缓冲区,当消费者无法及时处理数据时,数据会被暂时存储在这个缓冲区中。
  • 背压管理: 通过这种方式,可以防止由于生产者过快而导致的数据丢失或异常,同时确保消费者能够最终处理所有数据。

在 Reactor 框架中,Sinks.MulticastSpec#onBackpressureBuffer(int) 方法允许你指定一个背压缓冲区的大小,用于处理生产者速度快于消费者的情况。如果背压缓冲区满了,根据 Reactor 文档和默认行为,消息处理方式如下:

  1. 消息丢弃:默认情况下,当背压缓冲区满了,新的消息将会被丢弃,并且会触发一个错误信号(onError),通常是一个 OverflowException。
  2. 阻塞等待:Sinks.MulticastSpec 并没有阻塞等待的机制。它的设计目的是在高并发和反应式编程中保持非阻塞性,因此不会因为背压而阻塞生产者。
    为了更好地管理这种情况,可以使用以下方法:
  3. 增加缓冲区大小:在 onBackpressureBuffer(int) 中设置一个更大的缓冲区大小来适应流量高峰。
  4. 消息丢弃策略:使用 onBackpressureBuffer(int, BufferOverflowStrategy) 来指定消息溢出策略,例如丢弃最新的消息(BufferOverflowStrategy.DROP_LATEST)、丢弃最老的消息(BufferOverflowStrategy.DROP_OLDEST)等。
  5. 背压策略:可以考虑设计应用逻辑来处理背压情况,例如减慢生产者速度,或增加消费者的消费速度。
    例子:
Sinks.many().multicast().onBackpressureBuffer(10).asFlux().onBackpressureBuffer(10, BufferOverflowStrategy.DROP_OLDEST);

上述代码示例中,如果缓冲区满了,新来的消息会丢弃最早的消息,确保缓冲区总是有最新的消息。不同的 BufferOverflowStrategy 提供了不同的背压处理策略,具体选择应根据应用需求而定。