网络编程中的服务器端,是不是每个客户端链接都需要一个独立线程来为它服务呢?虽然可以这么做,但是太浪费了,也不够灵活扩展,真实的应用中这种做法对于可用性而言简直是灾难。Reactor Pattern是通常主流的方式来处理并发多个客户端连接的架构模式。基于一些性能分析可以得到的基本现实是,一个线程如果不需要同步等待网络I/O,那么单个线程其实往往可以很容易同时处理上万个客户端连接请求。Java中的Non_Blocking IO(非阻塞IO)就是这种不需要线程阻塞起来等待IO的API。Reactor Pattern就是由一个独立线程负责和成千上万的客户端连接通信,接收和发送消息,然后其它若干线程负责解析和处理消息请求。这种模型简单佳作Reactor Pattern,当然从实现的角度,线程间的高效同步和通信是核心,Java也提供了相关底层API。本文对Java的Reactor Pattern涉及到的Java技术进行了整理,也方便大家理解NIO。

献良发布于2018/10/25 13:24

注脚

1.Practical Session 12 Reactor Pattern

2.Disadvantages of Thread per Client Its wasteful Creating a new Thread is relatively expensive. Each thread requires a fair amount of memory . Threads are blocked most of the time waiting for network IO. Its not scalable A Multi-Threaded server cant grow to accommodate hundreds of concurrent requests Its also vulnerable to Denial Of Service attacks ( an attempt to make a machine or network resource unavailable to its intended users). Poor availability I t takes a long time to create a new thread for each new client. The response time degrades as the number of clients rises. Solution The Reactor Pattern is another (better) design for handling several concurrent clients.

3.Reactor Pattern Its based on the observation that if a thread does not need to wait for Network IO, a single thread could easily handle tens of client requests alone. Uses Non-Blocking IO , so threads dont waste time waiting for Network. Have one thread in charge of the network: Accepting new connections and handling IO . As the Network is non-blocking, read, write and accept operations "take no time", and a single thread is enough for all the clients. Have a fixed number of threads , which are in charge of the protocol. These threads perform the message framing ( Tokenization ) Perform message processing (what to do with each message). Note that unlike the Multi-Threaded server, in this design a single thread may handle many clients.

4.How Is I t Done? We require non-blocking network access. We talked about: Channel (wrap a socket, decouples open and connect; supports concurrent threads, and concurrent reading and writing; only one thread reading at a time; only one thread writing at a time). Buffer (need it to read/write to channels) We will talk about: Selector

5.Selectors When we perform non-blocking IO, read (), we read currently available bytes and return the result. This means we can read or write 0 bytes, and this is what will probably happen most of the time. We would like a way for our thread to wait until any of our channels is ready, and only then perform the read(), write() or accept(), only on the ready channel . This is what the Selector class does. Tutorial: http://tutorials.jenkov.com/java-nio/selectors.html

6.Selectors When we perform non-blocking IO, read (), we read currently available bytes and return the result. This means we can read or write 0 bytes, and this is what will probably happen most of the time. We would like a way for our thread to wait until any of our channels is ready, and only then perform the read(), write() or accept(), only on the ready channel . This is what the Selector class does. Tutorial: http://tutorials.jenkov.com/java-nio/selectors.html

7.Workflow Example: Initial State Server listens to some port. Selector registered to retrieve ACCEPT events when requested. Attachment ConnectionAcceptor used to accept incoming connection.

8.Client 1: Attempts to connect to server

9.ACCEPT Event is raised

10.Server selects events, handles them.

11.Client 1: Sends data. Client 2: Connection request.

12.New events raised.

13.Events Selected and handled.

14.New Events raised – Write Events are selectable!

15.Write event for client 1 handled. For client 2 ignored

16.Client 3: Connection request. Client 2/3: Write

17.Events raised!

18.And handled…

19.Registering Selector to Channel The Selector registers itself to a Channel. Registration means that the channel informs the Selector when some event happens. This event can be: Channel is ready to be read from. [incoming message] Channel can now be written to. [outgoing message] Channel can now accept a new connection. [incoming connection request] During registration a selection key is given to the channel, and the channel is told which events to monitor: Selector selector = Selector. open (); // a new Selector ConnectionHandler connectionHandler = new ConnectionHandler (); Or ConnectionAccepter connectionAcceptor = new ConnectionAcceptor (); socketChannel.register (selector, SelectionKey.OP_READ , anAttachmemt ) ; A channel can monitor OP_READ, OP_WRITE, OP_ACCEPT, OP_CONNECT In order to support more than one command, we use bitwise or : OP_READ | OP_WRITE Attachment: Done when registering a selector to a channel. An attachment is an object that we will have access to when the event occurs Usually, this object is associated with a task that should be performed .

20.Registration Example Adding a selector for the server channel which accepts new connections:

21.Registration Example Adding a selector for the server channel which accepts new connections:

22.select() Once a Selector is registered to some channels, one can use the "select()" method. This method blocks until at least one of the channels is ready for the registered event. Then, a list of SelectionKeys is returned. Each Selectionkey is associated with one channel , and holds: The interest set (events we’re registered to) The ready set (events the channel is ready for) The Channel The Selector the attachment registered on the channel (optional).

23.Three Different Server E vents Event is isAcceptable () – OP_ACCEPT We retrieve ConnectionAcceptor object attached. We run its accept() method, which accepts a new incoming connection through that channel. Note: ConnectionAcceptor contains a reference to its channel (done upon construction). Event is isReadable () – OP_READ We retrieve ConnectionHandler object attached. We run its .read() method, which attempts to read data received by the channel from client. Event is isWritable () – OP_WRITE We retrieve ConnectionHandler object attached. We run its .write() method, which attempts to write data to the channel in order to send it from server to client.

24.ConnectionAcceptor Used only for ServerSocketChannel in order to accept new connections. Accept workflow: Selector now listens to OP_READ events on this channel in anticipation of new data.

25.Accepting a new connection ConnectionAcceptor method. In ConnectionHandler.create () we attach the ConnectionHandler to the SelectorKey . In register() : 0 indicates that initially it does not listen to any type of event. switchToReadOnlyMode () changes the Selector to listen to SelectionKey.OP_READ The “key” returned from the register() is the token [id] of this registration (one key per channel). Accessing the selector can be done via this “key”.

26.ConnectionHandler ConnectionHandler contains: outData : Vector< ByteBuffer > - contains data to be sent to client . SocketChannel it needs to read from and write to. read() from channel to (local) ByteBuffer . write() from outData in channel. MessageTokenizer in order to tokenize received data. void addBytes ( ByteBuffer buffer) – adds (the local) ByteBuffer to StringBuffer Boolean hasMessage () – returns true if a complete message exists in inData String nextMessage () – removes and returns a complete message from inDat ServerProtocol in order to process message, and create response. String processMessage (String message) – processes message, creates response. Boolean isEnd (String message) – checks whether message is shutdown request. ProtocolTask is our runnable. One runnable per one connection handler. We run the same task once per event handling. Uses MessageTokenizer in order to retrieve a complete message. Uses ServerProtocol in order to process message and create response. Response stored in outData

27.ConnectionHandler : read()

28.

29.ConnectionHandler : write() outData = Vector< ByteBuffer >(). Each message to be sent is stored in ByteBuffer . All messages to be sent are stored in outData .

30.ConnectionHandler : write() outData = Vector< ByteBuffer >(). Each message to be sent is stored in ByteBuffer . All messages to be sent are stored in outData .

31.Reactor main loop. Scans the channels and retrieves the new events. Initiates the appropriate function of the retrieved attachment depending on event type.

32.The different parts The Reactor: Our driver [main] In charge of handling reading, writing and accepting new connection events . Reading and Writing are encapsulated in the ConnectionHandler class A ccepting is encapsulated in the ConnectionAcceptor class. Holds a thread-pool of workers (an Executor). [inside ReactorData object] These worker-threads are in charge of the protocol handling . They receive the bytes read from the network, and apply the protocol on the received data. ProtocolTask : [ our runnable] The class which represents tasks submitted to the executor. The ProtocolTask object is created along with the ConnectionHandler . Task object is created once , and used every time new data is received. Checks if a complete message is found Applies protocol on received data Adds response to outData vector< ByteBuffer >

33.The different parts The Reactor: Our driver [main] In charge of handling reading, writing and accepting new connection events . Reading and Writing are encapsulated in the ConnectionHandler class A ccepting is encapsulated in the ConnectionAcceptor class. Holds a thread-pool of workers (an Executor). [inside ReactorData object] These worker-threads are in charge of the protocol handling . They receive the bytes read from the network, and apply the protocol on the received data. ProtocolTask : [ our runnable] The class which represents tasks submitted to the executor. The ProtocolTask object is created along with the ConnectionHandler . Task object is created once , and used every time new data is received. Checks if a complete message is found Applies protocol on received data Adds response to outData vector< ByteBuffer >

34.The different parts The Reactor: Our driver [main] In charge of handling reading, writing and accepting new connection events . Reading and Writing are encapsulated in the ConnectionHandler class A ccepting is encapsulated in the ConnectionAcceptor class. Holds a thread-pool of workers (an Executor). [inside ReactorData object] These worker-threads are in charge of the protocol handling . They receive the bytes read from the network, and apply the protocol on the received data. ProtocolTask : [ our runnable] The class which represents tasks submitted to the executor. The ProtocolTask object is created along with the ConnectionHandler . Task object is created once , and used every time new data is received. Checks if a complete message is found Applies protocol on received data Adds response to outData vector< ByteBuffer >

user picture
  • 献良
  • 非著名互联网公司工程师

相关Slides

  • 用最精炼的语言(图)来描绘出Akka编程的基本核心组件的概念,并点出基本原理,特别是编程方法,是akka概念性入门的非常好的参考资料。

  • Java多线程开发的基本概念和原理,包括什么是线程,什么是进程,为什么要用多线程,现代处理器多核时代,并行编程会碰到的问题,java对应的多线程并行编程的API和对应的操作系统原理是什么?如果使用java来实现多线程,线程管理,暂停,休眠,中断,以及线程间的协作同步,死锁等概念及其对应Java的编程模型。

  • 多线程编程中,特别是在多核时代,各种程序优化诸如乱序执行等,如何严格确保数据在多线程间共享访问的处理顺序,相关同步的API很关键,理解原理对于编写正确安全的多线程程序非常重要。本文对于多线程执行进行的设计模式归纳,把常用的编程模型用java语言进行了演示。