NIO-Selector

解析 Selector 源码

Posted by Allen Vork on February 13, 2019

Sample

selector = Selector.open();  
  
ServerSocketChannel ssc = ServerSocketChannel.open();  
ssc.configureBlocking(false);  
ssc.socket().bind(new InetSocketAddress(port));  
  
ssc.register(selector, SelectionKey.OP_ACCEPT);  
  
while (true) {  
  
    // select()阻塞,等待有事件发生唤醒  
    int selected = selector.select();  
  
    if (selected > 0) {  
        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();  
        while (selectedKeys.hasNext()) {  
            SelectionKey key = selectedKeys.next();  
            if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {  
                // 处理 accept 事件  
            } else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {  
                // 处理 read 事件  
            } else if ((key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {  
                // 处理 write 事件  
            }  
            selectedKeys.remove();  
        }  
    }  
}

下面开始详细讲解 selector 的核心代码。

Selector.open()

public abstract class Selector implements Closeable {

    public static Selector open() throws IOException {
        return SelectorProvider.provider().openSelector();
    }
}

继续查看 SelectorProvider.provider():

    public static SelectorProvider provider() {
        synchronized (lock) {
            if (provider != null)
                return provider;
            return AccessController.doPrivileged(
                new PrivilegedAction<SelectorProvider>() {
                    public SelectorProvider run() {
                            if (loadProviderFromProperty())
                                return provider;
                            if (loadProviderAsService())
                                return provider;
                            provider = sun.nio.ch.DefaultSelectorProvider.create();
                            return provider;
                        }
                    });
        }
    }

它通过 DefaultSelectorProvider.create() 来创建一个 SelectorProvider:

public class DefaultSelectorProvider {

    public static SelectorProvider create() {
        return new WindowsSelectorProvider();
    }
}

public class WindowsSelectorProvider extends SelectorProviderImpl {
    public WindowsSelectorProvider() {
    }

    public AbstractSelector openSelector() throws IOException {
        return new WindowsSelectorImpl(this);
    }
}

可以看出 SelectorProvider.provider().openSelector() 是创建了一个 WindowsSelectorImpl:

final class WindowsSelectorImpl extends SelectorImpl {
    ...
    private PollArrayWrapper pollWrapper = new PollArrayWrapper(8);
    private final Pipe wakeupPipe = Pipe.open(); // 创建管道
    private final int wakeupSourceFd; // source 端的文件描述符
    private final int wakeupSinkFd;

    WindowsSelectorImpl(SelectorProvider var1) throws IOException {
        super(var1);
        // 获得上面创建好的管道的 source 端的文件描述符
        this.wakeupSourceFd = ((SelChImpl)this.wakeupPipe.source()).getFDVal();
        SinkChannelImpl var2 = (SinkChannelImpl)this.wakeupPipe.sink();
        var2.sc.socket().setTcpNoDelay(true);

        // 获得上面创建好的管道的 sink 端的文件描述符
        this.wakeupSinkFd = var2.getFDVal();

        // 将 source 文件描述符放到 pollWrapper 中
        this.pollWrapper.addWakeupSocket(this.wakeupSourceFd, 0);
    }
}

它首先会通过 Pipe.open 打开一个管道,然后获得该通道的 source 端和 sink 端的文件描述符,并将 source 端的文件描述符存到 pollWrapper 中。我们来看下 Pipe.open 做了什么:

public abstract class Pipe {

    public abstract Pipe.SourceChannel source();

    public abstract Pipe.SinkChannel sink();

    public static Pipe open() throws IOException {
        return SelectorProvider.provider().openPipe();
    }
}

可以看出它也是调用的 SelectorProvider.provider(),通过上面可知它最终调用的是 WindowsSelectorProvider 的 openPipe 方法:

public abstract class SelectorProviderImpl extends SelectorProvider {

    public Pipe openPipe() throws IOException {
        return new PipeImpl(this);
    }
}

class PipeImpl extends Pipe {
    private static final int NUM_SECRET_BYTES = 16;
    private static final Random RANDOM_NUMBER_GENERATOR = new SecureRandom();
    private SourceChannel source;
    private SinkChannel sink;

    PipeImpl(SelectorProvider var1) throws IOException {
        try {
            AccessController.doPrivileged(new PipeImpl.Initializer(var1));
        } catch (PrivilegedActionException var3) {
            throw (IOException)var3.getCause();
        }
    }
}

可以看出,整个过程最终调用的是 PipeImpl 的 Initializer 方法:

    private class Initializer implements PrivilegedExceptionAction<Void> {
        private final SelectorProvider sp;
        private IOException ioe;

        private Initializer(SelectorProvider var2) {
            this.ioe = null;
            this.sp = var2;
        }

        public Void run() throws IOException {
            // 初始化管道,创建管道的 source 和 sink 端。下面会详细讲
            PipeImpl.Initializer.LoopbackConnector var1 = new PipeImpl.Initializer.LoopbackConnector();
            var1.run();
            ...
        }

        // 下面详细讲
        private class LoopbackConnector implements Runnable {
            private LoopbackConnector() {
            }

            // 创建2个本地的 SocketChannel,然后链接(通过写随机数来做两个 socket 的链接校验)。
            // 2个 SocketChannel 分别实现了管道的 source 和 sink 端。source 端由前面的 WindowsSelectorImpl 
            // 放到 PollWrapper 中。
            public void run() {
                ServerSocketChannel var1 = null;
                SocketChannel var2 = null;
                SocketChannel var3 = null;

                try {
                    ByteBuffer var4 = ByteBuffer.allocate(16);
                    ByteBuffer var5 = ByteBuffer.allocate(16);
                    InetAddress var6 = InetAddress.getByName("127.0.0.1");// 本机地址

                    assert var6.isLoopbackAddress();

                    InetSocketAddress var7 = null;

                    while(true) {
                        if (var1 == null || !var1.isOpen()) {
                            // 将ServerSocketChannel绑定本地环回地址,端口号为 0
                            var1 = ServerSocketChannel.open();
                            var1.socket().bind(new InetSocketAddress(var6, 0));

                            // 根据本机地址 var6 和 ServerSocketChannel 的端口号创建套接字地址 var7
                            var7 = new InetSocketAddress(var6, var1.socket().getLocalPort());
                        }

                        // 建立连接
                        var2 = SocketChannel.open(var7); 

                        // 往 ByteBuffer var4 中填充随机数
                        PipeImpl.RANDOM_NUMBER_GENERATOR.nextBytes(var4.array());

                        do {
                            var2.write(var4); // 将 var4 写到 SocketChannel 中
                        } while(var4.hasRemaining());

                        var4.rewind();
                        var3 = var1.accept(); // 获得 sink 端的 SocketChannel

                        do {
                            var3.read(var5); // 将 var2 传过来的数据读到 var5 中
                        } while(var5.hasRemaining());

                        var5.rewind();
                        if (var5.equals(var4)) { // 校验成功
                            // 通过2个 SocketChannel 创建管道的 source 和 sink 端
                            PipeImpl.this.source = new SourceChannelImpl(Initializer.this.sp, var2);
                            PipeImpl.this.sink = new SinkChannelImpl(Initializer.this.sp, var3);
                            break;
                        }

                        var3.close();
                        var2.close();
                    }
                } catch (IOException var18) {
                    try {
                        if (var2 != null) {
                            var2.close();
                        }

                        if (var3 != null) {
                            var3.close();
                        }
                    } catch (IOException var17) {
                        ;
                    }

                    Initializer.this.ioe = var18;
                } finally {
                    try {
                        if (var1 != null) {
                            var1.close();
                        }
                    } catch (IOException var16) {
                        ;
                    }

                }

            }
        }
    }

可以看出它是创建2个 SocketChannel,然后创建一个16个字节的 ByteBuffer,并用随机数来填充。然后 var2 来发送该数据,var3 来接收。如果发送的数据和接收的数据相同,则校验成功,即可通过这两个 SocketChannel 来创建管道的 source 和 sink 端。
这样 Pipe.open() 就分析完了,再接着看将管道的 source 端放到 pollWrapper 是做了什么:

final class WindowsSelectorImpl extends SelectorImpl {
    // 创建 PollArrayWrapper
    private PollArrayWrapper pollWrapper = new PollArrayWrapper(8);

    WindowsSelectorImpl(SelectorProvider var1) throws IOException {
        super(var1);
        ...
        // 将管道的 source 端文件描述符放进去
        this.pollWrapper.addWakeupSocket(this.wakeupSourceFd, 0);
    }
}

class PollArrayWrapper {
    private AllocatedNativeObject pollArray;
    long pollArrayAddress;
    private static final short FD_OFFSET = 0;
    private static final short EVENT_OFFSET = 4;
    static short SIZE_POLLFD = 8;
    private int size;

    PollArrayWrapper(int var1) {
        int var2 = var1 * SIZE_POLLFD; // 8 * 8
        // 创建 AllocatedNativeObject
        this.pollArray = new AllocatedNativeObject(var2, true);
        this.pollArrayAddress = this.pollArray.address();
        this.size = var1;
    }

    ...

    // var1 为 source 文件描述符,var2 为 0
    void addWakeupSocket(int fdVal, int index) {

        // 将文件描述符保存到 native 中,下面会详细讲
        this.putDescriptor(index, fdVal);

        // 将 source 的 POLLIN 事件标志位感兴趣的,那么当 sink 端有数据写入时,
        // source 对应的文件描述符 wakeupSourceFd 就会处于就绪状态
        this.putEventOps(index, Net.POLLIN);
    }

    // 将 Descripter 放到 AllocatedNativeObject 中
    void putDescriptor(int index, int fdVal) {
        this.pollArray.putInt(SIZE_POLLFD * index + 0, fdVal);
    }

    void putEventOps(int index, int var2) {
        this.pollArray.putShort(SIZE_POLLFD * index + 4, (short)var2);
    }
}

可以看出它是将文件描述符放到 AllocatedNativeObject 中。

class AllocatedNativeObject extends NativeObject {
    AllocatedNativeObject(int var1, boolean var2) {
        super(var1, var2);
    }
}

class NativeObject {
    protected static final Unsafe unsafe = Unsafe.getUnsafe();
    protected long allocationAddress;
    private final long address;
    private static ByteOrder byteOrder = null;
    private static int pageSize = -1;
    
    // size = 8; pageAligned = true
    protected NativeObject(int size, boolean pageAligned) {  
        if (!pageAligned) {  
            this.allocationAddress = unsafe.allocateMemory(size);  
            this.address = this.allocationAddress;  
        } else {  
            int ps = pageSize();  
            long a = unsafe.allocateMemory(size + ps);  
            this.allocationAddress = a;  
            this.address = a + ps - (a & (ps - 1));  
        }  
    }  
}

可以看出 pollArray 是通过 unsafe.allocateMemory((long)(size + ps)) 分配的一块系统内存。

到这里,Selector.open() 就执行完成了。它主要是通过2个链接的 SocketChannel 建立了 Pipe,并将 pipe 的 wakeupSourceFd 放到 pollArray 中。 pollArray 是 Selector 的枢纽。最终返回 WindowsSelectorImpl 类型的 Selector。

2. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT)

创建了 Selector 后,就要创建一个 ServerSocketChannel,然后将该 Channel 注册到 Selector 中:

ServerSocketChannel ssc = ServerSocketChannel.open();  
ssc.configureBlocking(false);  
ssc.socket().bind(new InetSocketAddress(port));  
  
ssc.register(selector, SelectionKey.OP_ACCEPT);  

2.1 ServerSocketChannel.open()

public abstract class ServerSocketChannel extends AbstractSelectableChannel
    implements NetworkChannel {
    public static ServerSocketChannel open() throws IOException {
        // 由前面可知,它调用的是 WindowsSelectorProvider 的 openServerSocketChannel()
        return SelectorProvider.provider().openServerSocketChannel();
    }
}

// WindowsSelectorProvider 是 SelectorProviderImpl 的子类
public abstract class SelectorProviderImpl extends SelectorProvider {
    public ServerSocketChannel openServerSocketChannel() throws IOException {
        return new ServerSocketChannelImpl(this);
    }
}

可以看出,open 方法返回一个 ServerSocketChannelImpl 实例。

2.2 ssc.register(selector, SelectionKey.OP_ACCEPT)

由2.1看出,它调用的是 ServerSocketChannelImpl#register,具体的实现在父类:

public abstract class SelectableChannel extends AbstractInterruptibleChannel implements Channel {
    public final SelectionKey register(Selector var1, int var2) throws ClosedChannelException {
        return this.register(var1, var2, (Object)null);
    }
}

// 最终调用到 SelectableChannel 的子类的 register 方法
public abstract class AbstractSelectableChannel extends SelectableChannel {

    // var1 为 WindowsSelectorImpl 对象;var2 为 SelectionKey.OP_ACCEPT;var3 为 null
    public final SelectionKey register(Selector var1, int var2, Object var3) throws ClosedChannelException {
        Object var4 = this.regLock;
        synchronized(this.regLock) {
            if (!this.isOpen()) {
                throw new ClosedChannelException();
            } else if ((var2 & ~this.validOps()) != 0) {
                throw new IllegalArgumentException();
            } else if (this.blocking) {
                throw new IllegalBlockingModeException();
            } else {
                SelectionKey var5 = this.findKey(var1); // 由于是首次注册,所以为空
                if (var5 != null) {
                    var5.interestOps(var2);
                    var5.attach(var3);
                }

                if (var5 == null) {
                    Object var6 = this.keyLock;
                    synchronized(this.keyLock) {
                        if (!this.isOpen()) {
                            throw new ClosedChannelException();
                        }
                        // 调用 WindowsSelectorImpl 的 register 方法
                        var5 = ((AbstractSelector)var1).register(this, var2, var3);
                        this.addKey(var5);
                    }
                }

                return var5;
            }
        }
    }
}

可以看出 Channel 调用 register 方法,实际是调用的 WindowSelectorImpl 的父类的 register 方法,将 Channel 和 option 传进去:

    // var2:SelectionKey.OP_ACCEPT;  var3: null
    protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) {
        if (!(var1 instanceof SelChImpl)) {
            throw new IllegalSelectorException();
        } else {
            // 将 Selector 和 Channel 的实现类都封装到 SelectionKeyImpl 中
            SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this);
            var4.attach(var3);
            Set var5 = this.publicKeys;
            synchronized(this.publicKeys) {
                this.implRegister(var4); // 调用 implRegister(var4),并传入 SelectionKeyImpl
            }

            var4.interestOps(var2);
            return var4;
        }
    }

register 方法会创建一个 SelectionKeyImpl,然后传给 WindowsSelectorImpl#implRegister 方法:

final class WindowsSelectorImpl extends SelectorImpl {
    private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[8];
    private final WindowsSelectorImpl.FdMap fdMap = new WindowsSelectorImpl.FdMap();
    protected HashSet<SelectionKey> keys = new HashSet(); // 它是父类的

    protected void implRegister(SelectionKeyImpl var1) {
        Object var2 = this.closeLock;
        synchronized(this.closeLock) {
            if (this.pollWrapper == null) {
                throw new ClosedSelectorException();
            } else {
                this.growIfNeeded();
                this.channelArray[this.totalChannels] = var1;
                var1.setIndex(this.totalChannels);
                this.fdMap.put(var1);
                this.keys.add(var1);
                this.pollWrapper.addEntry(this.totalChannels, var1);
                ++this.totalChannels;
            }
        }
    }
}

将 SelectionKeyImpl 放到 channelArray、fdMap 和 pollWrapper 中。看下 pollWrapper:

class PollArrayWrapper {
    void addEntry(int index, SelectionKeyImpl ski) {
        this.putDescriptor(index, ski.channel.getFDVal());
    }
}

putDescriptor 上面讲过,就是将 socketChannel 的文件描述符放到 pollArray 中。

总结

  1. ServerSocketChannel.open() 会创建一个 ServerSocketChannelImpl 的 Channel 对象
  2. ServerSocketChannel#register 方法会调用的 WindowSelectorImpl#register 方法,它会创建一个 SelectionKeyImpl 来存放 Channel(ServerSocketChannelImpl)和 Selector(WindowSelectorImpl)
  3. 调用 implRegister(SelectionKeyImpl) 来将 SelectionKeyImpl 保存到各个列表中,并调用 pollWrapper.addEntry 来将 Channel 的文件描述符保存到 pollArray 中

3. selector.select()

这个 selector 就是 WindowSelectorImpl 的实例,调用的其实是他的父类的 select 方法:

public abstract class SelectorImpl extends AbstractSelector {

    public int select() throws IOException {
        return this.select(0L);
    }

    public int select(long timeout) throws IOException {
        return lockAndDoSelect((timeout == 0) ? -1 : timeout);  
    }  
  
    private int lockAndDoSelect(long timeout) throws IOException {  
        synchronized (this) {  
            ...
            synchronized (publicKeys) {  
                synchronized (publicSelectedKeys) {  
                    return doSelect(timeout);  
                }  
            }  
        }  
    }  
}

可以看出它又回到 WindowsSelectorImpl 中的 doSelect 方法:

    private final WindowsSelectorImpl.SubSelector subSelector = new WindowsSelectorImpl.SubSelector();

    protected int doSelect(long timeout) throws IOException {
        if (this.channelArray == null) {
            throw new ClosedSelectorException();
        } else {
            this.timeout = timeout;
            this.processDeregisterQueue();
            if (this.interruptTriggered) {
                this.resetWakeupSocket();
                return 0;
            } else {
                this.adjustThreadsCount();
                this.finishLock.reset();
                this.startLock.startThreads();

                try {
                    this.begin();

                    try {
                        this.subSelector.poll();
                    } catch (IOException var7) {
                        this.finishLock.setException(var7);
                    }

                    if (this.threads.size() > 0) {
                        this.finishLock.waitForHelperThreads();
                    }
                } finally {
                    this.end();
                }

                this.finishLock.checkForException();
                this.processDeregisterQueue();
                int var3 = this.updateSelectedKeys();
                this.resetWakeupSocket();
                return var3;
            }
        }
    }

    private final class SubSelector {
        private final int pollArrayIndex;
        private final int[] readFds;
        private final int[] writeFds;
        private final int[] exceptFds;

        private SubSelector() {
            this.readFds = new int[1025];
            this.writeFds = new int[1025];
            this.exceptFds = new int[1025];
            this.pollArrayIndex = 0;
        }

        private int poll() throws IOException {
            return this.poll0(WindowsSelectorImpl.this.pollWrapper.pollArrayAddress, Math.min(WindowsSelectorImpl.this.totalChannels, 1024), this.readFds, this.writeFds, this.exceptFds, WindowsSelectorImpl.this.timeout);
        }

        ...

        private native int poll0(long var1, int var3, int[] var4, int[] var5, int[] var6, long var7);
    }

select 就是轮询 pollArray 中的 FD,看有没有事件发生,有的话就收集所有发生事件的 FD,然后退出阻塞。

4. selector.wakeup()


WindowsSelectorImpl.java
----

public Selector wakeup() {
	synchronized (interruptLock) {
		if (!interruptTriggered) {
			setWakeupSocket();
			interruptTriggered = true;
		}
	}
	return this;
}
 
// Sets Windows wakeup socket to a signaled state.
private void setWakeupSocket() {
	setWakeupSocket0(wakeupSinkFd);
}
 
WindowsSelectorImpl.c
----
Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this,
                                                jint scoutFd)
{
    /* Write one byte into the pipe */
    send(scoutFd, (char*)&POLLIN, 1, 0);
}

可以看出 wakeup 调用了 native 的方法,用于往最开始建立的 pipe 的 sink 端写一个字节,那么 source 端的文件描述符就会处于就绪状态,然后 poll 方法就会返回,从而导致 select 方法返回。所以可以看出管道的意义就是让文件描述符处于就绪状态。

参考文献