Java reactive programming - Reactor 3 and Spring WebFlux

Java reactive programming - Reactor 3 and Spring WebFlux

1.Java Reactive Programming Reactor 3 and Spring WebFlux 杜万(倚贤)2018-12-22

2. 关于我 • 全栈⼯工程师 • 写了了 12 年年代码,以 JAVA 语⾔言为主 • Elixir 语⾔言爱好者,翻译了了《Elixir 程序设计》 • 前 Coding WebIDE 项⽬目负责⼈人,⽬目前负责阿 ⾥里里云函数计算⼯工具链

3. 主要内容 • Reactive • Project Reactor • Spring Webflux


5. Reactive • Reactive Manifesto • Reactive Programming • Reactive Streams


7. Reactive Manifesto 响应式宣⾔言是⼀一份构建现代云扩展架构的处⽅方 VALUE Responsive 价值 响应性 FORM Elastic Resilient 形式 弹性 韧性 MEANS Message Driven ⽅方法 消息驱动 Published on September 16 2014. (v2.0)

8. 响应式宣⾔言 响应性:快速/⼀一致的响应时间 韧性:复制/遏制/隔绝/委托 弹性:⽆无竞争点或中⼼心瓶颈/分⽚片/扩展 消息驱动:异步/松耦合/隔绝/地址透明/错误作为消息/背压/⽆无阻塞

9.Reactive Programming 响应式编程是⼀一种声明式编程范型 命令式编程 Java 9 Flow API Integer a, b, sum; SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>(); a = 3; publisher.subscribe(new Flow.Subscriber<Integer>() { b = 4; private Integer sum = 0; sum = a + b; @Override a = 6; public void onNext(Integer item) { b = 8; sum += item; } System.out.println(sum); @Override public void onComplete() { System.out.println(sum); } … }); Arrays.asList(3, 4).stream().forEach(publisher::submit); publisher.close();

10.Reactive Programming 响应式编程是⼀一种⾮非阻塞异步编程 import reactor.ipc.netty.tcp.TcpServer; import reactor.ipc.netty.tcp.TcpClient; CountDownLatch latch = new CountDownLatch(10); TcpServer server = TcpServer.create(port); TcpClient client = TcpClient.create("localhost", port); final JsonCodec<Pojo, Pojo> codec = new JsonCodec<Pojo, Pojo>(Pojo.class); server.start(input -> input.send(input.decode(codec).log("serve").map(codec), 5)).await(); client.start(input -> { input.take(10).decode(codec).log("receive").subscribe(data -> latch.countDown()); input.send(Flux.range(1, 10).map( it -> new Pojo("test" + it)).log(“send").map(codec)) .subscribe(); return Mono.never(); }).await(); latch.await(10, TimeUnit.SECONDS); client.shutdown().await(); server.shutdown().await();

11.Reactive Programming 响应式编程是⼀一种数据流编程,关注于数据流⽽而不不是控制流

12. Message-driven Event-driven 消息驱动 VS 事件驱动 • 消息有确定的⽬目标,事件是⼀一件事情希望被观察到 • 事件驱动系统关注事件源,消息驱动系统关注消息的接受者 • 在⼀一个使⽤用响应式编程实现的响应式系统⾥里里,消息擅⻓长于通 讯,事件擅⻓长于反应事实

13. Reactive Streams Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols.

14. Reactive Streams API /** A publisher is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscribers. */ public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); }

15. Reactive Streams API /** Will receive calls to Subscriber.onSubscribe(Subscription) once after passing an instance of Subscriber to Publisher.subscribe(Subscriber). */ public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); }

16. Reactive Streams API /** A Subscription represents a one-to-one lifecycle of a Subscriber subscribing to a Publisher. */ public interface Subscription { public void request(long n); public void cancel(); }

17. Reactive Streams API /** A Processor represents a processing stage — which is both a Subscriber and a Publisher and obeys the contracts of both. */ public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }

18. Reactive Library 分代 • 第 0 代 java.util.Observable • 第 1 代 rx.NET, Reactive4Java,早期的 RxJava • 第 2 代 RxJava • 第 3 代 RxJava2, Project Reactor, Akka-Streams • 第 4 代 RxJava2,Project Reactor 2.5+

19. Reactive Library • Callback-based—where anonymous side-effecting callbacks are attached to event sources, and are being invoked when events pass through the dataflow chain. • Declarative—through functional composition, usually using well-established combinators like map, filter, fold etc.

20.Project Reactor

21. Project Reactor • fully non-blocking,backpressure-ready network engines for HTTP (including Websockets), TCP and UDP.  • implementing Reactive Streams API and Reactive Extensions • interacts with Java 8 functional API, Completable Future, Stream and Duration.

22. Reactor Example private static List<String> words = Arrays.asList( "the", "quick", "brown", "fox", "jumped", "over", "the", "lazy", "dog"); Flux.fromIterable(words) .flatMap(word -> Flux.fromArray(word.split(""))) .concatWith(Mono.just("s")).distinct().sort() .zipWith(Flux.range(1, Integer.MAX_VALUE), (string, count) -> String.format("%2d. %s", count, string) ) .subscribe(System.out::println);

23. Reactive vs Java 8 Stream • Stream is pull-based,Reactive is push-based • Stream can only be used once, Reactive can be subscribed many times • Stream#parallel() 使⽤用 fork-join 并发,Reactive 使⽤用 Event Ioop

24. Reactor Operators 300+ 1. Creating a New Sequence… 6. Working with Time 2. Transforming an Existing Sequence 7. Splitting a Flux 3. Peeking into a Sequence 8. Going Back to the Synchronous 4. Filtering a Sequence World 5. Handling Errors

25.Marble Diagrams

26.Spring Webflux

27. Spring Boot 2.0 主要变化 • 基于 Spring Framework 5(包括新模块:WebFlux)构建 • 集成 Netty 作为默认的 web 服务器器,⽀支持 reactive 应⽤用 • WebFlux 默认运⾏行行在 Netty 上

28. Spring Framework 5 主要变化 • 依赖:最低 Java 8,⽀支持 Java 9 • 提供许多⽀支持 reactive 的基础设施 • 提供⾯面向 Netty 等运⾏行行时环境的适配器器 • 新增 WebFlux 模块(集成的是Reactor 3.x)

29.WebFlux Framework