本文目标
- 理解响应式编程
前言
之前的 《聊聊 IO 多路复用》 中,我们理解了非阻塞 IO 的意义。但是 Spring MVC 并不能完美的应用非阻塞编程,于是 Spring 团队开发了 WebFlux,而 WebFlux 的基础正是本文要讲到的 Project Reactor(下文简称为 Reactor)
本文以 Reactor 为例带大家入门响应式编程
版本
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.6</version>
</dependency>
什么是 Reactor
Reactor 是 JVM 的非阻塞响应式编程基础,支持背压。 它直接与 Java 8 函数式 API 集成,特别是 CompletableFuture、Stream 和 Duration。 它提供了可组合的异步序列 API — Flux(用于 [N] 个元素)和 Mono(用于 [0|1] 个元素),并实现了 Reactive Streams 规范。 在 Reactor 的基础上还演化出了适合微服务架构的 Reactor Netty 。为 HTTP(包括 Websockets)、TCP 和 UDP 提供支持背压和响应式的网络引擎。
上面是对于官方文档的翻译。下面来说说我自己对 Reactor 和响应式编程的理解。
回想一下之前的非阻塞 IO 编程,例如我们现在要用非阻塞的方式调用一个远程服务,当远程接口数据可用时去做一些业务处理。这时候代码怎么写呢?我们需要提供一个回调函数,然后在响应就绪的时候,去调用我们的回调函数。
从逻辑上来看,这完全没有问题。但是如果我们的回调很复杂,代码看起来会是什么样呢?
// 以下案例来自 Reactor 官网
userService.getFavorites(userId, new Callback<List<String>>() {
public void onSuccess(List<String> list) {
if (list.isEmpty()) {
suggestionService.getSuggestions(new Callback<List<Favorite>>() {
public void onSuccess(List<Favorite> list) {
UiUtils.submitOnUiThread(() -> {
list.stream()
.limit(5)
.forEach(uiList::show);
});
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
});
} else {
list.stream()
.limit(5)
.forEach(favId -> favoriteService.getDetails(favId,
new Callback<Favorite>() {
public void onSuccess(Favorite details) {
UiUtils.submitOnUiThread(() -> uiList.show(details));
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
}
));
}
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
});
这个代码说实话已经有点回调地狱那味儿了,让一段不是很复杂的逻辑变得很难读了。但是如果用 Reactor 写呢?
// 以下案例来自 Reactor 官网
userService.getFavorites(userId)
.flatMap(favoriteService::getDetails)
.switchIfEmpty(suggestionService.getSuggestions())
.take(5)
.publishOn(UiUtils.uiThreadScheduler())
.subscribe(uiList::show, UiUtils::errorPopup);
可以看到,代码变得非常的简洁。唯一带来的困扰就是,我们不知道这些函数到底是啥意思
响应式编程虽然有非常多的特性,但是它并不是什么神奇的技术,他也是建立在传统命令式编程的基础上。只不过它所提供的 API 以及规范更适合在非阻塞 IO 中使用。虽然在非阻塞 IO 框架中几乎只使用响应式编程(Vertx,WebFlux),只是因为这样做更合适,并不是说没了响应式编程,就玩不了非阻塞 IO 了。
响应式编程内幕
Reactor 实现了 org.reactivestreams
提供的 Java 响应式编程规范,我们只要了解 reactivestreams 中代码是如何运转的,再看 Reactor 相关的代码就容易多了。
下图展示了 reactivestreams 中的核心接口
- Publisher:发布者
- Subscriber:订阅者
- Subscription:这个单词中文翻译为名词的订阅,在代码中它是发布者和订阅者之间的媒介
- Processor:该接口继承了发布者和订阅者,可以理解为发布者和订阅者的中间操作(但是 Reactor 的中间操作并没有实现 Processor,在最新版本的 Reactor 中,Processor 的相关实现接口已经被弃用)
在了解了响应式编程的核心接口之后,我们来看下响应式编程是如何运作的
在 Reactor 中大部分实现都是按照上图的逻辑来执行的
- 首先是Subscriber(订阅者)主动订阅 Publisher(发布者),通过调用 Publisher 的 subscribe 方法
- Publisher 在向下游发送数据之前,会先调用 Subscriber 的 onSubscribe 方法,传递的参数为 Subscription(订阅媒介)
- Subscriber 通过 Subscription#request 来请求数据,或者 Subscription#cancel 来取消数据发布(这就是响应式编程中的背压,订阅者可以控制数据发布)
- Subscription 在接收到订阅者的调用后,通过 Subscriber#onNext 向下游订阅者传递数据。
- 在数据发布完成后,调用 Subscriber#onComplete 结束本次流,如果数据发布或者处理遇到错误会调用 Subscriber#onError
调用 Subscriber#onNext,onComplete,onError 这三个方法,可能是在 Publisher 中做的,也可能是在 Subscription 中做的,根据不同的场景有不同的实现方式,并没有什么严格的要求。可以认为 Publisher 和 Subscription 共同配合完成了数据发布
其实 Reactor 中 API 实现原理也都是这个套路,我这边也自己写了个例子便于让读者加深对响应式编程的理解
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
/**
* @author tianwen.yin
*/
public class SimpleReactiveStream {
/**
* 实现一个简单的响应式编程发布者
* 逻辑:当订阅者发起订阅时,像下游发送一个 HelloWorld,发布逻辑由 SimpleSubscription 完成
*/
static class SimplePublisher implements Publisher {
@Override
public void subscribe(Subscriber s) {
// 2. Publisher 发布数据之前,调用 Subscriber 的 onSubscribe
s.onSubscribe(new SimpleSubscription(data(), s));
}
private String data() {
return "Hello World";
}
}
static class SimpleSubscriber implements Subscriber {
@Override
public void onSubscribe(Subscription s) {
// 3. Subscriber 通过 Subscription#request 来请求数据
// 或者 Subscription#cancel 来取消数据发布
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Object o) {
System.out.println(o);
}
@Override
public void onError(Throwable t) {
System.out.println("error");
}
@Override
public void onComplete() {
System.out.println("complete");
}
}
static class SimpleSubscription implements Subscription {
String data;
Subscriber actual;
boolean isCanceled;
public SimpleSubscription(String data, Subscriber actual) {
this.data = data;
this.actual = actual;
}
@Override
public void request(long n) {
if (!isCanceled) {
try {
// 4. Subscription 在接收到订阅者的调用后
// 通过 Subscriber#onNext 向下游订阅者传递数据
actual.onNext(data);
// 5. 在数据发布完成后,调用 Subscriber#onComplete 结束本次流
actual.onComplete();
} catch (Exception e) {
// 如果数据发布或者处理遇到错误会调用 Subscriber#onError
actual.onError(e);
}
}
}
@Override
public void cancel() {
isCanceled = true;
}
}
public static void main(String[] args) {
// 1. Subscriber ”订阅“ Publisher
new SimplePublisher().subscribe(new SimpleSubscriber());
}
}
响应式编程思想
响应式编程,就像装配一条流水线。Publisher 规定了数据如何生产,中间会有 Operators(操作符)对流水线的数据进行解析,校验,转换等等操作,最终处理好的数据流转到 Subscriber。
这条流水线还有一个特点。大部分情况下当 Publisher 的 subscribe 方法被调用之前,什么都不会发生。在被订阅之前我们只是在定义流水线该如何工作,直到真正有人需要的时候,流水线才会启动。
Reactor 中的 Operator
Operators 怎么理解呢?对于上游来说,Operators 像一个订阅者,而对于它的下游来说,它像一个发布者(我们上文说过了 Reactor 中的中间操作并没有实现 Processor 接口)
Mono.just("hello")
.map(a -> a + "world")
.subscribe(System.out::println);
举个简单的例子,在上面的代码中,map 就是一个 Operator,它的实现思路是什么?来看下面的代码
// 注意,这是我基于 Reactor API 实现的伪代码!
public static class MonoMap implements Publisher {
// 我们自定义的转换逻辑
private Function mapper;
// source 代表当前操作符的上游发布者
private Publisher source;
public MonoMap(Publisher source, Function mapper) {
this.source = source;
this.mapper = mapper;
}
@Override
public void subscribe(Subscriber actual) {
source.subscribe(new MonoMapSubscriber(mapper, actual));
}
}
public static class MonoMapSubscriber implements Subscriber {
// 我们自定义的转换逻辑
private Function mapper;
// 真正的下游
private Subscriber actual;
public MonoMapSubscriber(Function mapper, Subscriber actual) {
this.mapper = mapper;
this.actual = actual;
}
@Override
public void onSubscribe(Subscription s) {
actual.onSubscribe(s);
}
@Override
public void onNext(Object o) {
// 当上游数据发送过来时,先进行转换再发送给下游
Object result = mapper.apply(o);
actual.onNext(result);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
}
上述代码是我自己实现的一个伪代码,用于让大家理解操作符的实现思路,实际 Reactor 代码也是这个思路,只不过实现的更加巧妙和严谨
我们首先来分析一下 Mono.just("hello").map(a -> a + "world")
这句话
- 当执行到 Mono.just 时,会新建一个 MonoJust 对象作为当前的 Publisher。该发布者的逻辑是,当订阅时,向下游发送数据 "hello"
- 当执行到 map 方法时,会新建一个 MonoMap 对象替作为当前的 Publisher,MonoJust 成为了 MonoMap 中的一个属性 source(实际的上游)
- 当 MonoMap 被订阅时,会先将它的下游 actual 做一层包装,也就是我们上面的 MonoMapSubscriber。然后去调用 source 的 subscribe 方法。上游发布数据时,MonoMapSubscriber 先对数据进行转换(我们上面的拼接字符串操作),然后再发送给 actual(它的下游)
- 当 MonoMap 被再次转换时,MonoMap 就变成了下游操作符的 source...
最后通过一张图来总结一下
Reactor 该如何学习
本文并没有介绍太多 Reactor 的细节,因为这些东西实在是太多了。我想聊聊我自己是如何学习 Reactor 的
如果你已经通过本文理解了响应式编程的核心接口是如何工作的了,那恭喜你已经迈向了成功的第一步了。接下来就是阅读官方文档,不断的练习和阅读 Reactor 的源码。源码追踪的方向已经很明确了,当我们想了解一个发布者的实现原理是什么,我就要去关注这个发布者的 subscribe 方法和 Subscription 都做了什么。想了解消费者的逻辑,就看它的 onNext,onComplete,onError。
最后
如果觉得我的文章对你有帮助,动动小手点下关注,你的支持是对我最大的帮助