亲宝软件园·资讯

展开

Netty实战源码解析NIO编程

Zhongger 人气:0

1 前言

很久之前就想写与Netty相关的博客了,但由于个人时间安排的问题一直拖到了现在,借助这个机会,重新温习Java高级编程的同时,也把Netty实战以及源码剖析分享给各位读者。

2 Netty是什么?

Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server. 'Quick and easy' doesn't mean that a resulting application will suffer from a maintainability or a performance issue. Netty has been designed carefully with the experiences earned from the implementation of a lot of protocols such as FTP, SMTP, HTTP, and various binary and text-based legacy protocols. As a result, Netty has succeeded to find a way to achieve ease of development, performance, stability, and flexibility without a compromise.

摘自官网,翻译过来就是:Netty是一个基于NIO的客户端-服务端框架,能过快速而简单地开发像客户端-服务端协议的网络应用。它极大地精简了 TCP 和 UDP 套接字服务器等网络编程。“快速而简单”并不意味着生成的应用程序会受到可维护性或性能问题的影响。Netty 是根据从许多协议(如 FTP、SMTP、HTTP 以及各种二进制和基于文本的遗留协议)的实现中获得的经验精心设计的。结果,Netty 成功地找到了一种方法,可以在不妥协的情况下实现易于开发、性能、稳定性和灵活性。

3 Java I/O模型简介

要说到网络通信,就离不开I/O模型,可以把I/O模型简单理解为使用什么通道进行数据的发送和接收。

Java共支持三种网络编程模型:BIO、NIO、AIO

Netty其实就是基于Java的NIO的。接下来,我们通过编写代码来体验一下这三种IO模型吧

3.1 BIO代码实现

package com.Zhongger;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * @author zhongmingyi
 * @date 2021/9/15 1:29 下午
 */
public class BIOServer {
    public static void main(String[] args) throws IOException {
        ExecutorService threadPool = Executors.newCachedThreadPool();
        ServerSocket serverSocket = new ServerSocket(8989);
        System.out.println("服务端已启动");
        while (true) {
            Socket socket = serverSocket.accept();
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    handle(socket);
                }
            });
        }
    }
    public static void handle(Socket socket) {
        byte[] bytes = new byte[1024];
        try {
            InputStream inputStream = socket.getInputStream();
            int read = 0;
            while (true) {
                read = inputStream.read(bytes);
                if (read != -1) {
                    System.out.println("客户端发送给服务端的数据:" + new String(bytes, 0, read));
                } else {
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

上述代码中:

可以看到,BIO模型里,服务器的实现模式为一个Socket连接对应一个线程。 BIO的知识点就介绍到这里,相信大家在【计算机网络】课程的学习中,肯定有接触过Socket编程,实现一个简易版的聊天工具。

4 Java NIO

4.1 基本介绍

JDK 1.4中的java.nio.*包中引入新的Java I/O库,NIO其实有两种解释:

NIO有三个核心组件:

NIO是面向块(缓冲区)的处理,数据读取到一个它稍后处理的缓冲区,需要时可以在缓冲区里前后移动,增加了在处理过程中的灵活性,使用NIO可以提供非阻塞式的高伸缩性网络。这使得一个线程可以在Buffer里有数据的时候去读取,没有可用数据时就可以去做其他事情,不会阻塞读;线程也可以写入一些数据到Buffer中,无需等待写入所有的数据,也可以去做别的事情,不会阻塞写。

4.2 三大核心组件的关系

三大核心组件的关系简单描述图如下:

如图所示:

4.3 Buffer缓冲区

Buffer:缓冲区本质上是一个可以读写数据的内存块,可以理解为一个容器对象(含数组),该对象提供了一组方法,可以更轻松地使用内存块,缓冲区对象内置了一些机制,能过跟踪和记录缓冲区的变化情况。Channel提供了从网络、文件读取数据的渠道,但读取或者写入数据都需要经过Buffer。

Buffer是一个顶层的抽象类,它的子类有多种实现,常用的子类如下:

上述缓冲区的管理方式基本上一致,都可以用类的allocate(int capacity) 方法去获取缓冲区对象。前面说到,Buffer是和数据打交道的载体,也就是读取缓冲区的数据或者把写数据到缓冲区中。所以,Buffer缓冲区的核心方法就是 put()方法和get()方法以及对应的重载方法、扩展方法等。

Buffer类中有以下四个属性:

// Invariants: mark <= position <= limit <= capacity
private int mark = -1; 
private int position = 0;
private int limit;
private int capacity;

简单看下ByteBuffer的使用,体会下往其中写入数据、切换成读模式后上面这四个值的变换:

		ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        System.out.println("初始时-->limit--->" + byteBuffer.limit());
        System.out.println("初始时-->position--->" + byteBuffer.position());
        System.out.println("初始时-->capacity--->" + byteBuffer.capacity());
        System.out.println("初始时-->mark--->" + byteBuffer.mark());
        System.out.println("--------------------------------------");
        // 添加一些数据到缓冲区中
        String s = "后端Dancer";
        byteBuffer.put(s.getBytes());
        // 看一下初始时4个核心变量的值
        System.out.println("put完之后-->limit--->" + byteBuffer.limit());
        System.out.println("put完之后-->position--->" + byteBuffer.position());
        System.out.println("put完之后-->capacity--->" + byteBuffer.capacity());
        System.out.println("put完之后-->mark--->" + byteBuffer.mark());
        System.out.println("--------------------------------------");
        byteBuffer.flip();
        System.out.println("flip完之后-->limit--->" + byteBuffer.limit());
        System.out.println("flip完之后-->position--->" + byteBuffer.position());
        System.out.println("flip完之后-->capacity--->" + byteBuffer.capacity());
        System.out.println("flip完之后-->mark--->" + byteBuffer.mark());

这里介绍一个比较高效的ByteBuffer,MappedByteBuffer 它可以实现文件在堆外内存(非JVM内存的系统内存)的修改:

	public static void mappedByteBufferTest() throws IOException {
        RandomAccessFile file = new RandomAccessFile("/Users/bytedance/Desktop/file.txt", "rw");
        FileChannel fileChannel = file.getChannel();
        //0~3的位置是直接映射到内存中的,可以修改文件中的这部分内容
        MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 3);
        mappedByteBuffer.put(0, (byte) 'Y');
        mappedByteBuffer.put(2, (byte) 'K');
        file.close();
    }

4.4 Channel通道

BIO中流是单向的,要么是输入流,要么是输出流。然后NIO中,Channel作为运输数据的通道,是双向的。Channel是一个抽象类,常用的实现有:ServerSocketChannel、SocketChannel、FileChannel、DatagramChannel,其中DatagramChannel是用于UDP数据,而其他三者用于TCP数据。

FileChannel类主要用于对本地文件进行IO操作,常用的方法有:

下面简单看下这个例子,将字符串输出到文件:

import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
/**
 * @author zhongmingyi
 * @date 2021/9/16 10:38 下午
 */
public class FileChannelTest {
    public static void main(String[] args) throws IOException {
        String text = "Hello, Zhongger!";
        //创建一个文件输出流
        FileOutputStream fileOutputStream = new FileOutputStream("/Users/bytedance/Desktop/file.txt");
        //通过文件输出流获取到FileChannel
        FileChannel fileChannel = fileOutputStream.getChannel();
        //创建一个ByteBuffer,将数据写入ByteBuffer中
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        buffer.put(text.getBytes());
        //切换成读模式
        buffer.flip();
        //把ByteBuffer里到数据写入到FileChannel中
        fileChannel.write(buffer);
        //关闭文件输出流
        fileOutputStream.close();
    }
}

再看下从本地文件读取数据的例子

 	public static void readFromFile() throws IOException {
        //创建一个文件输入流
        FileInputStream fileInputStream = new FileInputStream("/Users/bytedance/Desktop/file.txt");
        //通过文件输入流获取到FileChannel
        FileChannel fileChannel = fileInputStream.getChannel();
        //创建一个ByteBuffer,将Channel的数据读取到ByteBuffer中
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        fileChannel.read(buffer);
        //输出ByteBuffer中的数据
        System.out.println(new String(buffer.array()));
        //关闭文件输入流
        fileInputStream.close();
    }

从一个文件读取数据到Buffer,再把Buffer写入到另外一个文件

 	public static void readFromOneFileWriteToOtherFile() throws IOException {
        FileInputStream fileInputStream = new FileInputStream("/Users/bytedance/Desktop/file.txt");
        FileOutputStream fileOutputStream = new FileOutputStream("/Users/bytedance/Desktop/file2.txt");
        FileChannel fileInputStreamChannel = fileInputStream.getChannel();
        FileChannel fileOutputStreamChannel = fileOutputStream.getChannel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        while (true) {
        	//Buffer复位,防止越界
            buffer.clear();
            int read = fileInputStreamChannel.read(buffer);
            if (read == -1) {
                break;
            }
            //切读
            buffer.flip();
            fileOutputStreamChannel.write(buffer);
        }
        fileInputStream.close();
        fileOutputStream.close();
    }

把一个文件复制到另一个文件:

 	public static void transferFrom() throws IOException {
        FileInputStream fileInputStream = new FileInputStream("/Users/bytedance/Desktop/file.txt");
        FileOutputStream fileOutputStream = new FileOutputStream("/Users/bytedance/Desktop/file_copy.txt");
        FileChannel fileInputStreamChannel = fileInputStream.getChannel();
        FileChannel fileOutputStreamChannel = fileOutputStream.getChannel();
        fileOutputStreamChannel.transferFrom(fileInputStreamChannel, 0, fileInputStreamChannel.size());
        fileInputStream.close();
        fileOutputStream.close();
    }

再简单介绍下:FileChannel提供了map方法把文件映射到虚拟内存,通常情况可以映射整个文件,如果文件比较大,可以进行分段映射。简单地说就是通过映射的方式来减少一次内核态到用户态之间的拷贝,因此可以提高复制的性能。

	public static void mappedByteBufferTest() throws IOException {
        RandomAccessFile file = new RandomAccessFile("/Users/bytedance/Desktop/file.txt", "rw");
        FileChannel fileChannel = file.getChannel();
        MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 3);
        mappedByteBuffer.put(0, (byte) 'Y');
        mappedByteBuffer.put(2, (byte) 'K');
        file.close();
    }

4.5 Selector选择器

Selector选择器是NIO中的多路复用器,一个线程对应一个Selector,而Selector中可以注册多个Channel,当Channel中有事件发生时,线程就可以去处理这个事件,若没有事件发生时,线程可以空出来去做其他事情。使用Selector的好处在于: 使用更少的线程来就可以来处理通道了, 相比使用多个线程,避免了线程上下文切换带来的开销。

4.5.1 Selector的创建

通过调用Selector.open()方法创建一个Selector对象,如下:

Selector selector = Selector.open();

4.5.2 注册Channel到Selector

Channel必须是非阻塞的,否则会抛出IllegalBlockingModeException 异常

channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_ACCEPT);

该方法可以将Channel设置为非阻塞的

abstract SelectableChannel configureBlocking(boolean block)  

注意: SelectableChannel抽象类的configureBlocking()方法是由AbstractSelectableChannel抽象类实现的,SocketChannel、ServerSocketChannel、DatagramChannel都是直接继承了AbstractSelectableChannel抽象类,因此它们可以调用configureBlocking方法设置为非阻塞的模式。

register() 方法的第二个参数,是一个“ interest集合 ”,意思是在通过Selector监听Channel时对什么事件感兴趣。可以监听四种不同类型的事件:

SelectionKey.OP_ACCEPT
SelectionKey.OP_WRITE
SelectionKey.OP_READ
SelectionKey.OP_CONNECT

通道触发了一个事件意思是该事件已经就绪。比如某个Channel成功连接到另一个服务器称为“连接就绪(connect)”。一个ServerSocketChannel准备好接收新进入的连接称为“接收就绪(accept)”。一个有数据可读的通道可以说是“ 读就绪(read)”。等待写数据的通道可以说是“ 写就绪(write) ”。

4.5.3 SelectionKey

一个SelectionKey键表示了一个特定的通道对象(Channel)和一个特定的选择器对象(Selector)之间的注册关系。

key.attachment(); //返回SelectionKey的attachment,attachment可以在注册channel的时候指定。
key.channel(); // 返回该SelectionKey对应的channel。
key.selector(); // 返回该SelectionKey对应的Selector。
key.interestOps(); //返回代表需要Selector监控的IO操作的bit mask
key.readyOps(); // 返回一个bit mask,代表在相应channel上可以进行的IO操作。```

4.5.4 从Selector中选择Channel

Selector维护注册过的Channel集合,并且这种注册关系都被封装在SelectionKey当中。 Selector维护的三种类型SelectionKey集合:

注意: 当键被取消( 可以通过isValid( ) 方法来判断)时,它将被放在相关的选择器的已取消的键的集合里。注册不会立即被取消,但键会立即失效。当再次调用 select( ) 方法时(或者一个正在进行的select()调用结束时),已取消的键的集合中的被取消的键将被清理掉,并且相应的注销也将完成。通道会被注销,而新的SelectionKey将被返回。当通道关闭时,所有相关的键会自动取消(记住,一个通道可以被注册到多个选择器上)。当选择器关闭时,所有被注册到该选择器的通道都将被注销,并且相关的键将立即被无效化(取消)。一旦键被无效化,调用它的与选择相关的方法就将抛出CancelledKeyException。

select()方法介绍:

在刚初始化的Selector对象中,这三个集合都是空的。 通过Selector的select()方法可以选择已经准备就绪的通道 (这些通道包含你感兴趣的的事件)。比如你对读就绪的通道感兴趣,那么select()方法就会返回读事件已经就绪的那些通道。下面是Selector几个重载的select()方法:

int select():阻塞到至少有一个通道在你注册的事件上就绪了。 int select(long timeout):和select()一样,但最长阻塞时间为timeout毫秒。 int selectNow():非阻塞,只要有通道就绪就立刻返回。 select()方法返回的int值表示有多少通道已经就绪,是自上次调用select()方法后有多少通道变成就绪状态。之前在select()调用时进入就绪的通道不会在本次调用中被记入,而在前一次select()调用进入就绪但现在已经不在处于就绪的通道也不会被记入。例如:首次调用select()方法,如果有一个通道变成就绪状态,返回了1,若再次调用select()方法,如果另一个通道就绪了,它会再次返回1。如果对第一个就绪的channel没有做任何操作,现在就有两个就绪的通道,但在每次select()方法调用之间,只有一个通道就绪了。

一旦调用select()方法,并且返回值不为0时,则 可以通过调用Selector的selectedKeys()方法来访问已选择键集合 。如下: Set selectedKeys=selector.selectedKeys(); 进而可以放到和某SelectionKey关联的Selector和Channel。如下所示:

		while (true) {
            if (selector.select(1000) == 0) {
                System.out.println("服务器未连接到客户端。。。");
                continue;
            }
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                if (selectionKey.isAcceptable()) {
                    System.out.println("服务端接受到了客户端请求");
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                }
                if (selectionKey.isReadable()) {
                    SocketChannel channel = (SocketChannel) selectionKey.channel();
                    ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment();
                    channel.read(byteBuffer);
                    System.out.println("from 客户端 " + new String(byteBuffer.array()));
                }
                iterator.remove();
            }
        }

4.5.5 停止选择的方法

选择器执行选择的过程,系统底层会依次询问每个通道是否已经就绪,这个过程可能会造成调用线程进入阻塞状态,那么我们有以下三种方式可以唤醒在select()方法中阻塞的线程。

4.5.6 NIO客户端、服务端

服务端代码:

package com.Zhongger;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
/**
 * @author zhongmingyi
 * @date 2021/9/15 3:00 下午
 */
public class NIOServer {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(8886));
        serverSocketChannel.configureBlocking(false);
        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
            if (selector.select(1000) == 0) {
                System.out.println("服务器未连接到客户端。。。");
                continue;
            }
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                if (selectionKey.isAcceptable()) {
                    System.out.println("服务端接受到了客户端请求");
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                }
                if (selectionKey.isReadable()) {
                    SocketChannel channel = (SocketChannel) selectionKey.channel();
                    ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment();
                    channel.read(byteBuffer);
                    System.out.println("from 客户端 " + new String(byteBuffer.array()));
                }
                iterator.remove();
            }
        }
    }
}

客户端代码:

package com.Zhongger;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
 * @author zhongmingyi
 * @date 2021/9/24 1:27 下午
 */
public class NIOClient {
    public static void main(String[] args) throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 8886);
        //连接服务器
        if (!socketChannel.connect(inetSocketAddress)) {
            while (!socketChannel.finishConnect()) {
                System.out.println("连接需要时间,客户端不会阻塞,可以做其他工作");
            }
        }
        //连接成功,发送数据
        String str = "Hello,Zhongger!";
        ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes());
        socketChannel.write(byteBuffer);
        System.in.read();
    }
}

5 Java NIO 小结

加载全部内容

相关教程
猜你喜欢
用户评论