04 多线程模式下selector的使用以及IO模型概念的区分

目录
  • 1 多线程环境下selector使用

    • 1-1 为什么需要多线程优化?
    • 1-2 多线程架构模型
    • 1-3 多线程环境下利用selector进行网络通信
      • 1-3-1 多线程环境下无法获取可读事件的原因
      • 1-3-2 利用任务队列与wakeup解决可读事件无法获取的问题
    • 1-4 完整的多线程环境下selector的使用
  • 2 NIO与BIO的基础概念
    • 2-1 Java中stream与channel的区别
    • 2-2 IO 模型
      • 2-2-1 从read方法分析阻塞IO/非阻塞IO
      • 2-2-2 同步异步的角度看待IO模型
    • 2-3 IO模型中的零拷贝问题
      • 2-3-1 方式1:传统IO模型的工作流程中数据拷贝次数(结合下图)
      • 2-3-2 方式2:直接内存的优化
      • 2-3-3 方式3:进一步优化
      • 2-3-4 方式4:再进一步优化(硬件优化)
      • 2-3-5 四种方式总结
      • 2-3-6 零拷贝的总结
    • 2-4 异步IO介绍
    • 💡 守护线程
  • 参考资料

1 多线程环境下selector使用

1-1 为什么需要多线程优化?

单线程配合selector选择器虽然能够管理多个channel的事件,但仍存在以下缺点:

缺点1:多核 cpu被白白浪费

缺点2某个事件耗费时间比较长会影响其他事件的处理

  • 单线程处理多个事件适合每个事件的处理事件比较短的情况

补充::Redis采用单线程处理,如果某个操作时间较长,会影响其他操作,所以redis单个操作时间复杂度不能太高。

1-2 多线程架构模型

总体设计上基于分工思想分为二个部分,分别是boss模块和worker模块(通常是一个boss线程配合多个work线程):

boss模块(只负责接待):多线程机制(每个线程都有一个selector),专门用于处理客户端的连接事件

worker模块(只负责读写):多个worker,每个worker实际上是一个线程配合一个selector,worker专门负责数据的读写操作

  • 通常线程的数目与CPU的核心数目是一致的。

1-3 多线程环境下利用selector进行网络通信

1-3-1 多线程环境下无法获取可读事件的原因
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
@Slf4j
public class Server7 {
    public static void main(String[] args) throws IOException {
        Thread.currentThread().setName("boss");
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        /*boss selecor专门用于处理accept event*/
        Selector boss = Selector.open();
        SelectionKey bossKey = ssc.register(boss,0,null);
        bossKey.interestOps(SelectionKey.OP_ACCEPT);
        ssc.bind(new InetSocketAddress(8080));

        // 1 创建固定数量的worker并初始化
        Worker worker = new Worker("worker-0");
        worker.register();
        while(true){
            boss.select();
            Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
            while(iter.hasNext()){
                SelectionKey key = iter.next();
                iter.remove();;
                if(key.isAcceptable()){
                    log.debug("accept event happen!");
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    log.debug("before register...{}",sc.getRemoteAddress());
                    // 2. 关联selector
                    sc.register(worker.selector,SelectionKey.OP_READ,null);
                    log.debug("after register...{}",sc.getRemoteAddress());
                }
            }
        }
    }
    // 只有内部类能够定义为static
    static class Worker implements Runnable{
        private Thread thread;
        private Selector selector;
        private String name;
        private volatile boolean start = false;

        public Worker(String name){this.name = name;}
        // 初始化线程和selector
        public void register() throws IOException {
            if(!start){   // 利用start保证这段代码只会被执行一次。
                selector = Selector.open();   // open返回:SelectorProvider.provider().openSelector()
                thread = new Thread( this,name);
                thread.start();
                start = true;
            }

        }
        @Override
        public void run() {
            while(true){
                try{
                    log.debug("begin select!");
                    selector.select();
                    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                    while(iter.hasNext()){
                        SelectionKey key = iter.next();
                        iter.remove();
                        /*这里实际读写需要考虑消息边界,写的数据规模过大的问题,以及连接的正常/异常关闭问题详见单线程版本设计*/
                        if(key.isReadable()){
                            ByteBuffer buffer = ByteBuffer.allocate(16);
                            SocketChannel channel = (SocketChannel) key.channel();
                            channel.read(buffer);
                            buffer.flip();
                            printBytebuffer(buffer);
                        }
                    }

                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }
    }
    static void printBytebuffer(ByteBuffer tmp){      // 注意:传入的bytebuffer必须时写模式
        System.out.println(StandardCharsets.UTF_8.decode(tmp).toString());
    }
}

客户端建立连接并发送数据后的执行结果

14:44:05.969 [worker-0] DEBUG Server.Server7 - begin select!
14:44:14.910 [boss] DEBUG Server.Server7 - accept event happen!
14:44:14.911 [boss] DEBUG Server.Server7 - before register.../127.0.0.1:14363   // 无法获取可读事件

问题:服务端的boss模块的selector能够处理accept事件,但是work模块去无法获取可读事件

原因分析

  • 主要原因在于worker线程执行了select方法后,主线程中register方法就无法生效。造成selector没有监控读写事件(线程的异步性引发问题

代码段1:主线程让worker的selector监控读写通道(主线程执行该方法!!!)

sc.register(worker.selector,SelectionKey.OP_READ,null);

代码段2:worker线程的run方法内部select方法(worker0线程执方法!!)

public void run() {
            while(true){
                try{
                    selector.select();

单线程版处理可读事件完整逻辑

1-3-2 利用任务队列与wakeup解决可读事件无法获取的问题

解决思路:让worker线程执行注册任务,通过任务队列的方法将任务对象传递给worker线程。

服务端代码

import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
@Slf4j
public class Server8 {
    public static void main(String[] args) throws IOException {
        Thread.currentThread().setName("boss");
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        Selector boss = Selector.open();
        SelectionKey bossKey = ssc.register(boss,0,null);
        bossKey.interestOps(SelectionKey.OP_ACCEPT);
        ssc.bind(new InetSocketAddress(8080));
        Worker worker = new Worker("worker-0");
        while(true){
            boss.select();
            Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
            while(iter.hasNext()){
                SelectionKey key = iter.next();
                iter.remove();
                if(key.isAcceptable()){
                    log.debug("accept event happen!");
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    worker.register(sc);     // 首次调用启动线程并注册,后去调用仅仅注册
                }
            }
        }
    }
    // 只有内部类能够定义为static
    static class Worker implements Runnable{
        private Thread thread;
        private Selector selector;
        private String name;
        private volatile boolean start = false;
        private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();

        public Worker(String name){this.name = name;}
        // 初始化线程和selector
        /*======改进1:boss所在线程在accept事件发生后调用该方法,将Runable对象放入消息队列 ==============================*/
        public void register(SocketChannel sc) throws IOException {
            if(!start){   // 利用start确保worker线程只有一个
                selector = Selector.open();   // open返回:SelectorProvider.provider().openSelector()
                thread = new Thread( this,name);
                thread.start();
                start = true;

            }
            // 向队列中添加注册任务(runable任务),当worker线程运行时从这个队列获取任务并执行
            // 确保channel的注册在select之前。
            queue.add(()->{
                try{
                  sc.register(selector,SelectionKey.OP_READ,null);
                  selector.selectNow();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
            /*
                让后面的第一次selection操作不再阻塞。
                Causes the first selection operation that has not yet returned to return immediately.
             */
            selector.wakeup();     // 这个方法调用让select方法立刻返回一次,确保注册的完成
            log.debug("Wake up for to register new read/write channel for the selector!");
        }
        @Override
        public void run() {
            while(true){
                try{
                    selector.select();
                    /*======改进1:将Runable对象从消息队列取出完成注册==============================*/
                    Runnable task = queue.poll();
                    if(task != null){
                        task.run();  // 执行了sc.register(selector,SelectionKey.OP_READ,null)
                        log.debug("Register successfully!");
                    }
                    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                    while(iter.hasNext()){
                        SelectionKey key = iter.next();
                        /*这里实际读写需要考虑消息边界,写的数据规模过大的问题,详见单线程版本设计*/
                        /*对于可读事件,需要考虑三种情况:
                        * 1)正常的可读事件  2)客户端异常的关闭(需要处理异常) 3)客户端正常管理,可读的字节数为0,必须进行cancel操作。
                        * 忽视第二种情况会造成服务器程序宕机。忽视第三种情况会造成服务器的陷入死循环状态
                        * */
                        if(key.isReadable()){
                            try{
                                ByteBuffer buffer = ByteBuffer.allocate(16);
                                SocketChannel channel = (SocketChannel) key.channel();
                                int read = channel.read(buffer);
                                if(read == -1){
                                    key.cancel();
                                    channel.close();
                                }else{
                                    buffer.flip();
                                    printBytebuffer(buffer);
                                }
                            }catch (IOException e){
                                e.printStackTrace();
                            }
                        }
                        iter.remove();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    static void printBytebuffer(ByteBuffer tmp){      // 注意:传入的bytebuffer必须时写模式
        System.out.println(StandardCharsets.UTF_8.decode(tmp).toString());
    }
}

注意点

  • wakeup方法调用后能够让selector.select()立刻返回一次。
  • 利用ConcurrentLinkedQueue让boss线程将runable对象传递给work线程,从而让worker线程实现select与regiser。

Java中关于可写事件的注意点:有三种情况会触发可读事件

 1)正常的可读事件  2)客户端异常的关闭(需要处理异常) 3)客户端正常关闭,可读的字节数为0,必须进行cancel操作(否则key不会从事件集合中移除)。

这三种情况的处理统一的模板如下

 while(iter.hasNext()){
                        SelectionKey key = iter.next();
                        /*这里实际读写需要考虑消息边界,写的数据规模过大的问题,详见单线程版本设计*/
                        /*对于可读事件,需要考虑三种情况:
                        * 1)正常的可读事件  2)客户端异常的关闭(需要处理异常) 3)客户端正常管理,可读的字节数为0,必须进行cancel操作。
                        * 忽视第二种情况会造成服务器程序宕机。忽视第三种情况会造成服务器的陷入死循环状态
                        * */
                        if(key.isReadable()){
                            try{
                                ByteBuffer buffer = ByteBuffer.allocate(16);
                                SocketChannel channel = (SocketChannel) key.channel();
                                int read = channel.read(buffer);
                                if(read == -1){
                                    key.cancel();
                                    channel.close();
                                }else{
                                    buffer.flip();
                                    printBytebuffer(buffer);
                                }
                            }catch (IOException e){
                                e.printStackTrace();
                            }
                        }
                        iter.remove();
                    }

1-4 完整的多线程环境下selector的使用

程序功能:

1)定义一个boss,其selector专门去监控客户端的accept事件

2)定义实现runable接口的worker类,其selector专门用于监控客户端的读写事件

3)使用单个boss多个worker处理客户端连接。

  • worker的数量通常根据结合cpu核心数设置

4)采用round-robin(轮询)机制让多个work均匀的监控客户端连接的读写事件

  • 多线程环境下采用原子整数实现

服务端实现代码

import lombok.extern.slf4j.Slf4j;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class Server9 {
    public static void main(String[] args) throws IOException {
        new BossEventLoop().register();
    }

    @Slf4j
    static class BossEventLoop implements Runnable {
        /*============================01 重要的属性=============================================*/
        /*创建一个boss以及多个work*/
        private Selector boss;
        private WorkerEventLoop[] workers;
        private volatile boolean start = false;
        /*创建计数器,当有连接建立的时候,轮询可用的worker,将channel绑定到空闲worker的selector上*/
        AtomicInteger index = new AtomicInteger();

        /*============================02 用于初始化boss线程用于监听客户端的连接事件===========================*/
        public void register() throws IOException {
            if (!start) {
                ServerSocketChannel ssc = ServerSocketChannel.open();
                ssc.bind(new InetSocketAddress(8080));
                ssc.configureBlocking(false);
                boss = Selector.open();
                SelectionKey ssckey = ssc.register(boss, 0, null);
                ssckey.interestOps(SelectionKey.OP_ACCEPT);
                workers = initEventLoops();
                new Thread(this, "boss").start();
                log.debug("boss start...");
                start = true;
            }
        }
        /*============================03 用于初始化boss线程用于监听客户端的连接事件===========================*/
        public WorkerEventLoop[] initEventLoops() {
            // Runtime.getRuntime().availableProcessors()可以获取CPU的当前核心数,
            // 该方法存在bug,就是docker环境下无法获得所分配的CPU核心
//        EventLoop[] eventLoops = new EventLoop[Runtime.getRuntime().availableProcessors()];
            WorkerEventLoop[] workerEventLoops = new WorkerEventLoop[2];
            for (int i = 0; i < workerEventLoops.length; i++) {
                workerEventLoops[i] = new WorkerEventLoop(i);
            }
            return workerEventLoops;
        }
        /*============================04 实际运行的代码:基本流程就是boss的selector监听accept事件==============
        当有新的连接,按照轮询的分配方式分配一个worker,这个worker的selector专门用于监听这个连接的读写事件。
        这里采用轮询的策略:是为了保证每个worker监控的连接数目是均匀的。            ===========================*/
        @Override
        public void run() {
            while (true) {
                try {
                    boss.select();
                    Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove();
                        if (key.isAcceptable()) {
                            ServerSocketChannel c = (ServerSocketChannel) key.channel();
                            SocketChannel sc = c.accept();
                            sc.configureBlocking(false);
                            log.debug("{} connected", sc.getRemoteAddress());
                            workers[index.getAndIncrement() % workers.length].register(sc);
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /*=============================04 处理读写事件的worker类的定义============================== */
    @Slf4j
    static class WorkerEventLoop implements Runnable {
        private Selector worker;
        private volatile boolean start = false;
        private int index;
        private final ConcurrentLinkedQueue<Runnable> tasks = new ConcurrentLinkedQueue<>();

        public WorkerEventLoop(int index) {
            this.index = index;
        }

        public void register(SocketChannel sc) throws IOException {
            if (!start) {
                worker = Selector.open();
                new Thread(this, "worker-" + index).start();
                start = true;
            }
            tasks.add(() -> {
                try {
                    SelectionKey sckey = sc.register(worker, 0, null);
                    sckey.interestOps(SelectionKey.OP_READ);
                    worker.selectNow();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
            worker.wakeup();
        }

        @Override
        public void run() {
            while (true) {
                try {
                    worker.select();
                    Runnable task = tasks.poll();
                    if (task != null) {
                        task.run();
                    }
                    Set<SelectionKey> keys = worker.selectedKeys();
                    Iterator<SelectionKey> iter = keys.iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        if (key.isReadable()) {
                            SocketChannel sc = (SocketChannel) key.channel();
                            ByteBuffer buffer = ByteBuffer.allocate(128);
                            try {
                                int read = sc.read(buffer);
                                if (read == -1) {
                                    key.cancel();
                                    sc.close();
                                } else {
                                    buffer.flip();
                                    log.debug("{} message:", sc.getRemoteAddress());
                                    printBytebuffer(buffer);
                                }
                            } catch (IOException e) {
                                e.printStackTrace();
                                key.cancel();
                                sc.close();
                            }
                        }
                        iter.remove();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    static void printBytebuffer(ByteBuffer tmp){      // 注意:传入的bytebuffer必须时写模式
        System.out.println(StandardCharsets.UTF_8.decode(tmp).toString());
    }
}

2 NIO与BIO的基础概念

2-1 Java中stream与channel的区别

定义

stream(流):Classes to support functional-style operations on streams of elements, such as map-reduce transformations on collections.

channel(通道):A channel represents an open connection to an entity such as a hardware device, a file, a network socket, or a program component that is capable of performing one or more distinct I/O operations, for example reading or writing

A channel is either open or closed. A channel is open upon creation, and once closed it remains closed. Once a channel is closed, any attempt to invoke an I/O operation upon it will cause a ClosedChannelException to be thrown. Whether or not a channel is open may be tested by invoking its isOpen method.Channels are, in general, intended to be safe for multithreaded access as described in the specifications of the interfaces and classes that extend and implement this interface.

总结:流是更为抽象的概念,泛指实体的流,而channel表示对具体事事务(硬件设备/文件/网络套接字)的连接

不同点

  • stream 不会自动缓冲数据,channel 会利用系统提供的发送缓冲区、接收缓冲区(更为底层)
  • stream 仅支持阻塞 API,channel 同时支持阻塞、非阻塞 API网络 channel 可配合 selector 实现多路复用(文件channel没有多路复用这个说法)

共同点

  • 二者均为全双工,即读写可以同时进行

Java 8 Stream 总结

Interface Channel

2-2 IO 模型

2-2-1 从read方法分析阻塞IO/非阻塞IO

当调用一次 channel.read 或 stream.read 后,会切换至操作系统内核态来完成真正数据读取,而读取又分为两个阶段,分别为:

  • 等待数据阶段
  • 复制数据阶段

总结:读取操作必须需要操作系统的支持, Java的读取需要操作系统的支持

等待阶段通常由操作系统切换到内核态,然后从硬件中获取数据到内存中的这段时间

名称 调用 结合数据读取2个阶段进行区分
阻塞IO read 阻塞IO会让用户线程在等待数据阶段与复制数据阶段都停止运行(阻塞)
非阻塞IO read 非阻塞IO在等待数据阶段不被阻塞(没有数据立刻返回),在复制数据阶段线程还是阻塞的

总结:可以看到阻塞IO模型与多路复用IO模型从图中表现十分类似,在等待数据与复制数据阶段都会阻塞,这二者的主要区别如下图所示:

可以看到selector与阻塞IO的最大区别:

  • selector对于多种类型的事件只要任意类型事件就绪会返回,有多个事件,则会返回多个事件。
  • 阻塞IO只能逐个处理accept/read/write等不同类型的事件
2-2-2 同步异步的角度看待IO模型
  • 同步:线程自己去获取结果(一个线程)
  • 异步:线程自己不去获取结果,而是由其它线程送结果(至少两个线程)

同步阻塞、同步非阻塞、同步多路复用、异步阻塞(没有此情况)、异步非阻塞(从网络IO模型理解这些概念

名称/IO模型 同步/异步的区分 阻塞/非阻塞区分
同步阻塞(阻塞IO) 调用read方法自己获取结果,只不过等待的过程阻塞用户线程 等待数据阶段与复制数据阶段都停止运行(阻塞)
同步非阻塞(非阻塞IO) 调用read方法自己获取结果,只不过等待的过程不阻塞用户线程等待数据阶段与复制数据阶段都停止运行(阻塞) 等待数据阶段不会阻塞但复制数据阶段会阻塞
同步多路复用(IO多路复用) 调用read方法自己获取结果,有事件发生返回结果处理事件,其余情况阻塞 等待数据阶段与复制数据阶段都停止运行(阻塞)
异步非阻塞(异步IO) 调用方法但自己不获取结果,结果是其他线程通过调用回调函数送了过来 等待数据阶段与复制数据阶段都不阻塞

注意没有异步阻塞这种说法!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

  • 异步已经是让其他线程去送结果了,根本没必要去阻塞

回调方法的理解

上图中是异步IO,关键点:

1)用户线程提供回调方法参数给其他线程。
2)其他线程在满足条件调用回调函数将结果传输给用户线程。

2-3 IO模型中的零拷贝问题

2-3-1 方式1:传统IO模型的工作流程中数据拷贝次数(结合下图)
File f = new File("helloword/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");
byte[] buf = new byte[(int)f.length()];
file.read(buf);
Socket socket = ...;
socket.getOutputStream().write(buf);

情景:从磁盘中文件读取内容然后通过网络套接字给发送出去?

数据拷贝次数(4次):磁盘=>内核缓冲去(DMA,内核态),内核缓冲区=>用户缓冲区(cpu,用户态),用户缓冲区=>socket缓冲区(cpu,用户态),socket缓冲区=>网卡(DMA,内核态)

  1. read 方法调用后,要从 java 程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据从磁盘读入内核缓冲区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access)来实现文件读,不会使用 cpu。

    DMA 也可以理解为硬件单元,用来解放 cpu 完成文件 IO

  2. 内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 cpu 会参与拷贝,无法利用 DMA

  3. 调用 write 方法,这时将数据从用户缓冲区(byte[] buf)写入 socket 缓冲区,cpu 会参与拷贝

  4. 接下来要向网卡写数据,再次从用户态切换至内核态,调用操作系统的写能力,使用 DMA 将 socket 缓冲区的数据写入网卡,不使用 cpu

2-3-2 方式2:直接内存的优化

通过分配直接内存将内核缓冲区与用户缓冲区合并到一起减少了文件数据读取时的一次拷贝

  • 数据拷贝了3次
  • 发生2次内核态与用户态的区分

Java的直接内存详解

2-3-3 方式3:进一步优化
  1. java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
  2. 数据从内核缓冲区传输到 socket 缓冲区,cpu 会参与拷贝
  3. 最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu

可以看到

  • 只发生了一次用户态与内核态的切换
  • 数据拷贝了 3 次
2-3-4 方式4:再进一步优化(硬件优化)
  1. java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
  2. 只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
  3. 使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 cpu(数据的拷贝不会经过socket缓冲区

可以看到

  • 一次用户态与内核态的切换
  • 数据拷贝了 2 次。
2-3-5 四种方式总结
数据拷贝次数 内核态与用户态切换次数 特点
方式1:传统IO模型的工作流程 4 2 磁盘->内核缓冲区->用户缓冲区->socket缓冲区->网卡
方式2:采用JJava内存优化工作流程 3 2 磁盘-->直接内存->socket缓冲区->网卡
方式3 3 1 磁盘-->内核缓冲区->socket缓冲区->网卡
方式4 2 1 磁盘-->内核缓冲区->网卡

总结:

1)方式3与方式4都属于零拷贝,数据没有放入到用户缓冲区,最大的特点就是内核态与用户态只需要进行一次(节约上下文开销)

2)方式4与方式3有各自的适用场景,方式4比方式3少拷贝一次数据。


2-3-6 零拷贝的总结

零拷贝:不是真正无拷贝,而是在不会拷贝重复数据到 jvm 内存中(用户缓冲区)方式3与方式4都是属于零拷贝

零拷贝的优点

  • 更少的用户态与内核态的切换
  • 不利用 cpu 计算,减少 cpu 缓存伪共享
  • 零拷贝适合频繁的,小文件传输

2-4 异步IO介绍

  • Netty5废弃,linux支持的不好,windows支持的比较好,了解即可

AIO 用来解决数据复制阶段的阻塞问题

  • 同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置
  • 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果

异步模型需要底层操作系统(Kernel)提供支持

  • Windows 系统通过 IOCP 实现了真正的异步 IO
  • Linux 系统异步 IO 在 2.6 版本引入,但其底层实现还是用多路复用模拟了异步 IO,性能没有优势

💡 守护线程

默认文件 AIO 使用的线程都是守护线程,所以最后要执行 System.in.read() 以避免守护线程意外结束

参考资料

01 Netty基础课程

(0)

相关推荐