亲宝软件园·资讯

展开

Reactive响应式编程

红帽海绵宝宝 人气:0

1 什么是响应式编程

一句话总结:响应式编程是一种编程范式,通用和专注于数据流和变化的,并且是异步的。

维基百科原文:

In computing, reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease via the employed programming language(s), and that an inferred dependency within the associated execution model exists, which facilitates the automatic propagation of the change involved with data flow.

翻译:

在计算机领域,响应式编程是一个专注于数据流和变化传递的**异步编程范式。**这意味着可以使用编程语言很容易地表示静态(例如数组)或动态(例如事件发射器)数据流,并且在关联的执行模型中,存在着可推断的依赖关系,这个关系的存在有利于自动传播与数据流有关的更改。

举例:

例如,在命令式编程环境中, a:=b+c 表示将表达式的结果赋给a ,而之后改变b 或c 的值不会影响 。但在响应式编程中,a的值会随着b或c 的更新而更新。电子表格程序就是响应式编程的一个例子。单元格可以包含字面值或类似"=B1+C1"的公式,而包含公式的单元格的值会依据其他单元格的值的变化而变化 。

响应式编程最初是为了简化交互式用户界面的创建和实时系统动画的绘制而提出来的一种方法,但它本质上是一种通用的编程范式。

2 回顾Reactor

2.1 什么是Reactor

还是维基百科:

The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.

翻译:

反应器(reactor)设计模式是一种事件处理模式,用于处理由一个或多个输入并发交付给服务处理程序的服务请求。然后,服务处理程序将传入的请求解复用,并将它们同步地分派给相关的请求处理程序。

2.2 为什么是Reactor

Reactor来源于网络IO中同步非阻塞的I/O多路复用机制的模式。

若数据的read都由kernel内核完成了(在内核read数据的过程中,应用进程依旧可以执行其他的任务),这就是异步操作。

换句话说,

NIO一个重要的特点是:socket主要的读、写、注册和接收函数,在等待就绪阶段都是非阻塞的,真正的I/O操作是同步阻塞的(消耗CPU但性能非常高)。
NIO是一种同步非阻塞的I/O模型,也是I/O多路复用的基础。

Reactor模式基本结构:

2.3 Reactor模式的经典实现—Netty

Netty的线程模式就是一个实现了Reactor模式的经典模式。

结构对应:

NioEventLoop ———— Initiation Dispatcher
Synchronous EventDemultiplexer ———— Selector
Evnet Handler ———— ChannelHandler
ConcreteEventHandler ———— 具体的ChannelHandler的实现

模式对应:

Netty服务端使用了“多Reactor线程模式”
mainReactor ———— bossGroup(NioEventLoopGroup) 中的某个NioEventLoop
subReactor ———— workerGroup(NioEventLoopGroup) 中的某个NioEventLoop
acceptor ———— ServerBootstrapAcceptor
ThreadPool ———— 用户自定义线程池

流程:

① 当服务器程序启动时,会配置ChannelPipelineChannelPipeline中是一个ChannelHandler链,所有的事件发生时都会触发Channelhandler中的某个方法,这个事件会在ChannelPipeline中的ChannelHandler链里传播。然后,从bossGroup事件循环池中获取一个NioEventLoop来现实服务端程序绑定本地端口的操作,将对应的ServerSocketChannel注册到该NioEventLoop中的Selector上,并注册ACCEPT事件为ServerSocketChannel所感兴趣的事件。
② NioEventLoop事件循环启动,此时开始监听客户端的连接请求。
③ 当有客户端向服务器端发起连接请求时,NioEventLoop的事件循环监听到该ACCEPT事件,Netty底层会接收这个连接,通过accept()方法得到与这个客户端的连接(SocketChannel),然后触发ChannelRead事件(即,ChannelHandler中的channelRead方法会得到回调),该事件会在ChannelPipeline中的ChannelHandler链中执行、传播。
ServerBootstrapAcceptor的readChannel方法会该SocketChannel(客户端的连接)注册到workerGroup(NioEventLoopGroup) 中的某个NioEventLoop的Selector上,并注册READ事件为SocketChannel所感兴趣的事件。启动SocketChannel所在NioEventLoop的事件循环,接下来就可以开始客户端和服务器端的通信了。

3 Spring5中多Reactive的支持

3.1 Spring Webflux

3.1.1 依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

3.1.2 Controller代码

@RestController
public class HelloController {

    @GetMapping("/hello")
    public Mono<String> hello() {
        return Mono.just("Hello Spring Webflux");
    }
    
}

3.1.3 测试

C:\Users\xxxx\Desktop\sb-reactive>curl http://localhost:8080/hello
Hello Spring Webflux

3.1.4 Spring MVC和Spring WebFlux模式上的不同

3.2 Spring Data Reactive Respositories

3.2.1 依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>

3.2.2 配置

public class RedisReactiveConfig {

    @Bean
    public ReactiveRedisConnectionFactory connectionFactory() {
        return new LettuceConnectionFactory("127.0.0.1", 6379);
    }

    @Bean
    public ReactiveStringRedisTemplate reactiveStringRedisTemplate(ReactiveRedisConnectionFactory factory) {
        return new ReactiveStringRedisTemplate(factory);
    }

}

3.3.3 测试

@SpringBootTest
class SbReactiveApplicationTests {

    @Autowired
    private ReactiveStringRedisTemplate reactiveRedisTemplate;

    @Test
    void contextLoads() {
        reactiveRedisTemplate
                .opsForValue().set("1", "zs")
                .subscribe(b -> System.out.println("success"),
                        e -> System.out.println("error"));
    }

}

4 如何理解Reactive响应式编程?

概念有很多,但是它相较我们的一般请求处理到底有什么更好的价值体现?

解释:

Reactive Programming 作为观察者模式(Observer) 的延伸,不同于传统的命令编程方式( Imperative programming)同步拉取数据的方式,如迭代器模式(Iterator) 。而是采用数据发布者同步或异步地推送到数据流(Data Streams)的方案。当该数据流(Data Steams)订阅者监听到传播变化时,立即作出响应动作。在实现层面上,Reactive Programming 可结合函数式编程简化面向对象语言语法的臃肿性,屏蔽并发实现的复杂细节,提供数据流的有序操作,从而达到提升代码的可读性,以及减少 Bugs 出现的目的。同时,Reactive Programming 结合背压(Backpressure)的技术解决发布端生成数据的速率高于订阅端消费的问题。

加载全部内容

相关教程
猜你喜欢
用户评论