Reactor是Spring提供的非阻塞式响应式编程框架,实现了Reactive Streams规范。 它提供了可组合的异步序列API,例如Flux(用于[N]个元素)和Mono(用于[0 | 1]个元素)。
Reactor Netty项目还支持非阻塞式网络通信,非常适用于微服务架构,为HTTP(包括Websockets),TCP和UDP提供了响应式编程基础。
本文通过例子展示和源码阅读,分析Reactor中核心设计与实现机制。
文本Reactor源码基于Reactor 3.3
名词解析
响应式编程,维基百科解析为
reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease via the employed programming language(s)
响应式编程是一个专注于数据流和变化传递的异步编程范式。 这意味着使用编程语言可以很容易地表示静态(例如数组)或动态(例如事件发射器)数据流。
下面简单解释一下相关名词。
数据流与变化传递,我的理解,数据流就如同一条车间流水线,数据在上面传递,经过不同的操作台(我们定义的操作方法),可以被观测,被过滤,被调整,或者与另外一条数据流合并为一条新的流,而操作台对数据做的改变会一直向下传递给其他操作台。
java 8 lambda表达式就是一种数据流形式
lists.stream().filter(i -> i%2==0).sorted().forEach(handler);
lists.stream(),构建一个数据流,负责生产数据。
filter,sorted方法以及handler匿名类,都可以视为操作台,他们负责处 理数据。
这里还涉及两个概念
声明式编程,通过表达式直接告诉计算机我们要的结果,具体操作由底层实现,我们并不关心,如sql,html,spring spel。
对应的命令式编程,一步一步告诉计算机先做什么再做什么。我们平时编写java,c等代码就是命令式编程。
上例中通过filter,sorted等方法直接告诉计算机(Spring)执行过滤,排序操作,可以理解为声明式编程。
注意,我的理解是,声明式,命令式编程并没有明确的界限。
越是可以直接通过声明表达我们要什么,就越接近声明式编程,反之,越是需要我们编写操作过程的,就越接近命令式编程。
如Spring中的声明式事务和编程式事务。
可参考:https://www.zhihu.com/question/22285830
函数式编程,就是将函数当做一个数据类型,函数作为参数,返回值,属性。
Java不支持该模式,通过匿名类实现,如上例中forEach方法。
注意,函数式编程还有很多学术性,专业性的概念,感兴趣的同学可以自行了解。
响应式编程,主要是在上面概念加了异步支持。
这个异步支持非常有用,它可以跟Netty这些基于事件模型的异步网络框架很好地结合,下一篇文章我们通过WebFlux来说明这一点。
数据流转
下面我们来简单看一下Reactor的设计与实现吧。
首先通过一个小用例,来看一个Reactor中如何生产数据,又如何传递给订阅者。
@Test
public void range() {
// [1]
Flux flux = Flux.range(1, 10);
// [2]
Subscriber subscriber = new BaseSubscriber<Integer>() {
protected void hookOnNext(Integer value) {
System.out.println(Thread.currentThread().getName() + " -> " + value);
request(1);
}
};
// [3]
flux.subscribe(subscriber);
}
Reactor中,发布者Publisher负责生产数据,有两种发布者,Flux可以生产N个数据,Mono可以生产0~1个数据。
订阅者Subscriber负责处理,消费数据。
1
构建一个发布者Flux
注意,这时发布者还没开始生产数据。
2
构建一个订阅者Subscriber
3
创建订阅关系,这时,生产者开始生产数据,并传递给订阅者。
Flux.range,fromArray等静态方法都会返回一个Flux子类,如FluxRange,FluxArray。
Publisher#subscribe,该方法很重要,它负责创建发布者与订阅者的订阅关系。
Flux#subscribe
public final void subscribe(Subscriber<? super T> actual) {
CorePublisher publisher = Operators.onLastAssembly(this);
CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);
try {
...
publisher.subscribe(subscriber);
}
catch (Throwable e) {
Operators.reportThrowInSubscribe(subscriber, e);
return;
}
}
获取内部的CorePublisher,CoreSubscriber。
Flux子类都是一个CorePublisher。
我们编写的订阅者,都会转化为一个CoreSubscriber。
CorePublisher也有一个内部的subscribe方法,由Flux子类实现。
FluxRange#subscribe
public void subscribe(CoreSubscriber<? super Integer> actual) {
...
actual.onSubscribe(new RangeSubscription(actual, st, en));
}
Subscription代表了发布者与订阅者之间的一个订阅关系,由Publisher端实现。
Flux子类subscribe方法中通常会使用CoreSubscriber创建为Subscription,并调用订阅者的onSubscribe方法,这时订阅关系已完成。
下面来看一下Subscriber端的onSubscribe方法
BaseSubscriber#onSubscribe -> hookOnSubscribe
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(9223372036854775807L);
}
Subscription#request由Publisher端实现,也是核心方法,订阅者通过该方法向发布者拉取特定数量的数据。
注意,这时发布者才开始生产数据。
RangeSubscription#request -> RangeSubscription#slowPath -> Subscriber#onNext
void slowPath(long n) {
Subscriber<? super Integer> a = this.actual;
long f = this.end;
long e = 0L;
long i = this.index;
while(!this.cancelled) {
// [1]
while(e != n && i != f) {
a.onNext((int)i);
if (this.cancelled) {
return;
}
++e;
++i;
}
...
}
}
1
RangeSubscription负责生产指定范围内的整数,并调用Subscriber#onNext将数据推送到订阅者。
可以看到,
Publisher#subscribe完成订阅操作,生成Subscription订阅关系,并触发订阅者钩子方法onSubscribe。
订阅者的onSubscribe方法中,订阅者开始调用Subscription#request请求数据,这时发布者才开始生产数据,并将数据推给订阅者。
操作符方法
跟java 8 lambda表达式一样,Reactor提供了很多的声明式方法,这些方法类似于操作符,直接操作数据(下文称为操作符方法)。
合理利用这些方法,可以大量简化我们的工作。
数据处理,如skip,distinct,sort,filter
钩子方法,如doOnNext,doOnSuccess
组合操作,flatMap,zipWhen
阻塞等待,blockLast
流量控制,limitRate
数据缓存,buffer,cache
可参考官方文档:https://projectreactor.io/docs/core/release/reference/#which-operator
注意,这些操作符方法虽然是添加到Publisher端,但Reactor会将逻辑会转移到Subscriber端。
看一个简单例子
Flux.range(1, 3)
.doOnNext(i -> {
System.out.println(Thread.currentThread().getName() + " doOnNext:" + i);
})
.skip(1)
.subscribe(myHandler);
myHandler即我们实现的Subscriber。
每调用一次操作符方法,Flux都会生成一个新的Flux子类(装饰模式),最后Flux类为FluxSkip[FluxPeek[FluxRange]]
。
我们来看一下完整的Flux#subscribe方法代码
public final void subscribe(Subscriber<? super T> actual) {
CorePublisher publisher = Operators.onLastAssembly(this);
CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);
try {
// [1]
if (publisher instanceof OptimizableOperator) {
OptimizableOperator operator = (OptimizableOperator)publisher;
while(true) {
// [2]
subscriber = operator.subscribeOrReturn(subscriber);
if (subscriber == null) {
return;
}
// [3]
OptimizableOperator newSource = operator.nextOptimizableSource();
if (newSource == null) {
publisher = operator.source();
break;
}
operator = newSource;
}
}
// [4]
publisher.subscribe(subscriber);
} catch (Throwable var6) {
Operators.reportThrowInSubscribe(subscriber, var6);
}
}
1
判断Flux是否由操作符方法产生。
2
OptimizableOperator#subscribeOrReturn会生成新的Subscriber,以执行操作符逻辑。如上面例子中,FluxPeek会生成PeekSubscriber,FluxSkip生成SkipSubscriber。这里将操作符逻辑转移到Subscriber端。
OptimizableOperator#subscribeOrReturn也可以直接调用被装饰Publisher的subscribe方法,从而改变流程。如下面说的FluxSubscribeOn。
3
取出上一层被装饰的Publisher作为新的Publisher,如上例的FluxSkip[FluxPeek[FluxRange]]
,会依次取出FluxPeek,FluxRange。
这个操作一直执行,直到取出真正生产数据的Publisher。
4
使用真正生产数据的Publisher,和最后包装好的Subscriber,再调用subscribe方法。
上面例子中,流程如下
push/pull
Reactor提供了push和pull两种模式。
先看一下pull模式
Flux.generate(sink -> {
int k = (int) (Math.random()*10);
if(k > 8)
sink.complete();
sink.next(k);
})
.subscribe(i -> {
System.out.println("receive:" + i);
});
Sink可以理解为数据池,负责存储数据,根据功能不同划分,如IgnoreSink,BufferAsyncSink,LatestAsyncSink。
Sink#next会将数据放入池中,由Sink缓存或直接发送给订阅者。
Flux#generate(Consumer<SynchronousSink<T>> generator)
,可以理解为pull模式,
订阅者每调用一次request方法,Publisher就会调用一次generator来生产数据,而且generator每次执行中只能调用一次Sink.next。
generator是单线程执行,生成数据后直接同步发送到订阅者。
push模式可以使用create方法
Flux.create(sink -> {
System.out.println("please entry data");
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
while (true) {
try {
sink.next(br.readLine());
} catch (IOException e) {
}
}
}).subscribe(i -> {
System.out.println("receive:" + i);
});
Flux#create(Consumer<? super FluxSink<T>> emitter)
,可以理解为push模式。
注意,Publisher只在Flux#subscribe操作时调用一次emitter,后续request不再调用emitter。
我们可以将Sink绑定到其他的数据源,如上例的控制台,或其他事件监听器。
当数据来了,Sink就会将它推送给订阅者。
Flux#create生成的Flux可以多线程同时调用Sink.next推送数据,并且数据会先缓存到Sink,后续Sink推送给订阅者。
push方法与create类似,只是它只允许一个线程调用Sink.next推送数据。
上例中create方法使用push方法更合适,因为只有一个线程推送数据。
混合模式
假如一个消息处理器MessageProcessor需要会将普通消息直接推送给订阅者,而低级别消息由订阅者拉取。
我们可以FluxSink#onRequest实现混合模式
Flux.create(sink -> {
// [1]
messageProcessor.setHandler((msg) -> {
sink.next(msg);
});
// [2]
sink.onRequest(n -> {
List<String> messages = messageProcessor.getLowMsg();
for(String s : messages) {
sink.next(s);
}
});
})
1
普通消息直接推送
2
低级别消息由订阅者拉取
完整代码可参考:https://gitee.com/binecy/bin-springreactive/blob/master/order-service/src/test/java/com/binecy/FluxPushPullTest.java
线程与调度器
前面说了reactor是支持异步的,不过它并没有默认开启异步,我们可以通过调度器开启,如
public void parallel() throws InterruptedException {
Flux.range(0, 100)
.parallel()
.runOn(Schedulers.parallel())
.subscribe(i -> {
System.out.println(Thread.currentThread().getName() + " -> " + i);
});
new CountDownLatch(1).await();
}
parallel 将数据分成指定份数,随后调用runOn方法并行处理这些数据。
runOn 该方法参数指定的任务执行的线程环境。
最后的CountDownLatch用于阻塞主线程,以免进程停止看不到效果。
调度器相当于Reactor中的ExecutorService,不同的调度器定义不同的线程执行环境。
Schedulers提供的静态方法可以创建不同的线程执行环境。
Schedulers.immediate() 直接在当前线程执行
Schedulers.single() 在一个重复利用的线程上执行
Schedulers.boundedElastic() 在由Reactor维护的线程池上执行,该线程池中闲置时间过长(默认值为60s)的线程也将被丢弃,创建线程数量上限默认为CPU核心数x 10。线程数达到上限后,最多可提交10万个任务,这些任务在线程可用时会被执行。该线程池可以为阻塞操作提供很好的支持。阻塞操作可以执行在独立的线程上,不会占用其他资源。
Schedulers.parallel() 固定线程,对于异步IO,可以使用该方案。
Reactor另外提供了两个操作符方法来切换执行上下文,publishOn和subscribeOn。
publishOn影响当前操作符方法后面操作的线程执行环境,而subscribeOn则影响整个链路的线程执行环境。
(runOn与publishOn类似,影响该方法后续操作线程执行环境)
Flux.range(1, 3)
.doOnNext(i -> {
System.out.println(Thread.currentThread().getName() + " doOnNext:" + i);
})
.publishOn(Schedulers.newParallel("myParallel"))
.skip(1)
.subscribe(myHandler);
myHandler只是简单打印线程和数据
Consumer myHandler = i -> {
System.out.println(Thread.currentThread().getName() + " receive:" + i);
};
输出结果为
main doOnNext:1
main doOnNext:2
main doOnNext:3
myParallel-1 receive:2
myParallel-1 receive:3
publishOn后面的操作(包括skip,myHandler)都已经切换到新的线程。
再来简单看一下publishOn与subscribeOn的实现
前面说了,操作符方法的逻辑会移到Subscriber端,上例过程示意如下
线程切换是在PublishOnSubscriber中完成的,所以PublishOnSubscriber后面的操作都在新线程上。
将上面例子代码修改一下
Flux.range(1, 3)
.doOnNext(i -> {
System.out.println(Thread.currentThread().getName() + " doOnNext:" + i);
})
.subscribeOn(Schedulers.newParallel("myParallel"))
.skip(1)
.subscribe(myHandler);
输出结果为
myParallel-1 doOnNext:1
myParallel-1 doOnNext:2
myParallel-1 receive:2
myParallel-1 doOnNext:3
myParallel-1 receive:3
从数据生产到消费,所有操作都在新的线程上。
示意图如下
前面说了,Flux#subscribe中会调用OptimizableOperator#subscribeOrReturn方法,而在FluxSubscribeOn中,会直接切换任务线程,后面整个流程都执行在新线程上了。
使用publishOn还是subscribeOn,关键在于阻塞操作是在生产数据时还是消费数据时。
如果阻塞操作在生产数据时,如同步查询数据库,查询下游系统,可以使用subscribeOn
如果阻塞操作在消费数据时,如同步保存数据,可以使用publishOn。
流量控制
响应式编程中常常会出现Backpressure的概念,
它是指在push模式下,当发布者生产数据的速度大于订阅者消费数据的速度,导致出现了订阅者传递给订阅者的逆向压力。
FluxSink.OverflowStrategy定义了在这种场景下的几种处理策略。
IGNORE 完全忽略新的数据
ERROR Publisher抛出异常
DROP 抛弃数据,触发Flux#onBackpressureDrop方法
LATEST 订阅者只能获取最新的一个数据
BUFFER 缓存所有的数据,注意,该缓存没有边界,可能导致内存溢出
FluxSink.OverflowStrategy类似于线程池的任务拒绝策略。
下面来看一个例子
@Test
public void backpressure() throws InterruptedException {
Flux.<Integer>create(sink -> {
for (int i = 0; i < 50; i++) {
System.out.println("push: " + i);
sink.next(i);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
}
}, FluxSink.OverflowStrategy.ERROR)
.publishOn(Schedulers.newSingle("receiver"), 10)
.subscribe(new BaseSubscriber<Integer>() {
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(1);
}
protected void hookOnNext(Integer value) {
System.out.println("receive:" + value);
try {
Thread.sleep(12);
} catch (InterruptedException e) {
}
request(1);
}
protected void hookOnError(Throwable throwable) {
throwable.printStackTrace();
System.exit(1);
}
});
new CountDownLatch(1).await();
}
1
发布者每隔10毫秒生产一个数据
注意,FluxSink.OverflowStrategy.ERROR
参数指定了Backpressure处理策略
2
publishOn方法指定后续运行线程环境
注意下文解析的第二个参数。
3
订阅者每隔20毫秒消费一个数据
Sink中有一个关键字段,BaseSink#requested,代表订阅者请求数量。
每次订阅者调用Subscription#request(long n)
方法,BaseSink#requested都会加上对应数值n。
而每次生产数据调用Sink#next
时,BaseSink#requested都会减1。
当Sink#next执行,如果BaseSink#requested为0,就是执行FluxSink.OverflowStrategy指定策略。
publishOn(Scheduler scheduler, int prefetch)
方法会将BaseSink#requested的值初始化为prefetch。
注意,这里并不会生产prefetch个数据并发送给订阅者,只会修改BaseSink#requested。
另外,PublishOnSubscriber中会将Subscription#request
操作缓存,达到阀值后合并为一次request操作。
在上面的例子中阀值为prefetch - (prefetch >> 2)
,就是8了
所以我们会看到结果
receive:5
push: 10
push: 11
21:59:55.828 [Thread-0] DEBUG reactor.core.publisher.Operators - onNextDropped: 11
发布者发送10(prefetch)个数据后,尽管订阅者已经消费5个数据,并发起5次request操作,但被PublishOnSubscriber缓存了,并没有发送到发布者那边,这时BaseSink#requested已经为0了,抛出OverflowException异常,Sink关闭,后面的数据被抛弃。
可以将订阅者的休眠时间调整为12毫秒,这样当发布者发送10(prefetch)个数据前,PublishOnSubscriber会发起一次request(8)的操作,可以看到
push: 19
22:03:33.779 [Thread-0] DEBUG reactor.core.publisher.Operators - onNextDropped: 19
也就是到19个数据才抛出异常,抛弃数据。
到这里,我们已经基本了解Reactor的概念,核心设计与实现机制。
下一篇文章,我们通过比较WebFlux和AsyncRestTemplate,看一下响应式编程会给我们带来什么惊奇。
如果您觉得本文不错,欢迎关注我的微信公众号,系列文章持续更新中。您的关注是我坚持的动力!