百度网站建设电话销售话术,网站开发谢辞,单页面 网站 模板,wordpress 评论美化转载自 Selector 实现原理概述
Selector是NIO中实现I/O多路复用的关键类。Selector实现了通过一个线程管理多个Channel#xff0c;从而管理多个网络连接的目的。
Channel代表这一个网络连接通道#xff0c;我们可以将Channel注册到Selector中以实现Selector对其的管理。一个C…转载自 Selector 实现原理概述
Selector是NIO中实现I/O多路复用的关键类。Selector实现了通过一个线程管理多个Channel从而管理多个网络连接的目的。
Channel代表这一个网络连接通道我们可以将Channel注册到Selector中以实现Selector对其的管理。一个Channel可以注册到多个不同的Selector中。
当Channel注册到Selector后会返回一个SelectionKey对象该SelectionKey对象则代表这这个Channel和它注册的Selector间的关系。并且SelectionKey中维护着两个很重要的属性interestOps、readyOps
interestOps是我们希望Selector监听Channel的哪些事件。我们将我们感兴趣的事件设置到该字段这样在selection操作时当发现该Channel有我们所感兴趣的事件发生时就会将我们感兴趣的事件再设置到readyOps中这样我们就能得知是哪些事件发生了以做相应处理。
Selector的中的重要属性
Selector中维护3个特别重要的SelectionKey集合分别是
keys所有注册到Selector的Channel所表示的SelectionKey都会存在于该集合中。keys元素的添加会在Channel注册到Selector时发生。selectedKeys该集合中的每个SelectionKey都是其对应的Channel在上一次操作selection期间被检查到至少有一种SelectionKey中所感兴趣的操作已经准备好被处理。该集合是keys的一个子集。cancelledKeys执行了取消操作的SelectionKey会被放入到该集合中。该集合是keys的一个子集。
下面的源码解析会说明上面3个集合的用处。
Selector 源码解析
下面我们通过一段对Selector的使用流程讲解来进一步深入其实现原理。首先先来段Selector最简单的使用片段
1234567891011121314151617ServerSocketChannel serverChannel ServerSocketChannel.open(); serverChannel.configureBlocking(false); intport 5566; serverChannel.socket().bind(newInetSocketAddress(port)); Selector selector Selector.open(); serverChannel.register(selector, SelectionKey.OP_ACCEPT); while(true){ intn selector.select(); if(n 0) { IteratorSelectionKey iter selector.selectedKeys().iterator(); while(iter.hasNext()) { SelectionKey selectionKey iter.next(); ...... iter.remove(); } } }1、Selector的构建
SocketChannel、ServerSocketChannel和Selector的实例初始化都通过SelectorProvider类实现。
ServerSocketChannel.open();
123publicstatic ServerSocketChannel open() throwsIOException { returnSelectorProvider.provider().openServerSocketChannel();}SocketChannel.open();
123publicstatic SocketChannel open() throwsIOException { returnSelectorProvider.provider().openSocketChannel();}Selector.open();
123publicstatic Selector open() throwsIOException { returnSelectorProvider.provider().openSelector();}我们来进一步的了解下SelectorProvider.provider()
1234567891011121314151617publicstatic SelectorProvider provider() { synchronized(lock) { if(provider ! null) returnprovider; returnAccessController.doPrivileged( newPrivilegedAction() { publicSelectorProvider run() { if(loadProviderFromProperty()) returnprovider; if(loadProviderAsService()) returnprovider; provider sun.nio.ch.DefaultSelectorProvider.create(); returnprovider; } }); } }① 如果配置了“java.nio.channels.spi.SelectorProvider”属性则通过该属性值load对应的SelectorProvider对象如果构建失败则抛异常。② 如果provider类已经安装在了对系统类加载程序可见的jar包中并且该jar包的源码目录META-INF/services包含有一个java.nio.channels.spi.SelectorProvider提供类配置文件则取文件中第一个类名进行load以构建对应的SelectorProvider对象如果构建失败则抛异常。③ 如果上面两种情况都不存在则返回系统默认的SelectorProvider即sun.nio.ch.DefaultSelectorProvider.create();④ 随后在调用该方法即SelectorProvider.provider()。则返回第一次调用的结果。
不同系统对应着不同的sun.nio.ch.DefaultSelectorProvider这里我们看linux下面的sun.nio.ch.DefaultSelectorProvider
123456789101112131415publicclass DefaultSelectorProvider { /** * Prevent instantiation. */ privateDefaultSelectorProvider() { } /** * Returns the default SelectorProvider. */ publicstatic SelectorProvider create() { returnnew sun.nio.ch.EPollSelectorProvider(); }}可以看见linux系统下sun.nio.ch.DefaultSelectorProvider.create(); 会生成一个sun.nio.ch.EPollSelectorProvider类型的SelectorProvider这里对应于linux系统的epoll
接下来看下 selector.open()
12345678910111213141516/** * Opens a selector. * * p The new selector is created by invoking the {link * java.nio.channels.spi.SelectorProvider#openSelector openSelector} method * of the system-wide default {link * java.nio.channels.spi.SelectorProvider} object. /p * * return A new selector * * throws IOException * If an I/O error occurs */ publicstatic Selector open() throwsIOException { returnSelectorProvider.provider().openSelector(); }在得到sun.nio.ch.EPollSelectorProvider后调用openSelector()方法构建Selector这里会构建一个EPollSelectorImpl对象。
EPollSelectorImpl
12345678910111213classEPollSelectorImpl extendsSelectorImpl{ // File descriptors used for interrupt protectedint fd0; protectedint fd1; // The poll object EPollArrayWrapper pollWrapper; // Maps from file descriptors to keys privateMapInteger,SelectionKeyImpl fdToKey;1234567891011121314151617181920212223EPollSelectorImpl(SelectorProvider sp) throwsIOException { super(sp); longpipeFds IOUtil.makePipe(false); fd0 (int) (pipeFds 32); fd1 (int) pipeFds; try{ pollWrapper newEPollArrayWrapper(); pollWrapper.initInterrupt(fd0, fd1); fdToKey newHashMap(); }catch(Throwable t) { try{ FileDispatcherImpl.closeIntFD(fd0); }catch(IOException ioe0) { t.addSuppressed(ioe0); } try{ FileDispatcherImpl.closeIntFD(fd1); }catch(IOException ioe1) { t.addSuppressed(ioe1); } throwt; } }EPollSelectorImpl构造函数完成
① EPollArrayWrapper的构建EpollArrayWapper将Linux的epoll相关系统调用封装成了native方法供EpollSelectorImpl使用。② 通过EPollArrayWrapper向epoll注册中断事件
12345voidinitInterrupt(intfd0, intfd1) { outgoingInterruptFD fd1; incomingInterruptFD fd0; epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN); }③ fdToKey构建文件描述符-SelectionKeyImpl映射表所有注册到selector的channel对应的SelectionKey和与之对应的文件描述符都会放入到该映射表中。
EPollArrayWrapper
EPollArrayWrapper完成了对epoll文件描述符的构建以及对linux系统的epoll指令操纵的封装。维护每次selection操作的结果即epoll_wait结果的epoll_event数组。EPollArrayWrapper操纵了一个linux系统下epoll_event结构的本地数组。
1234567891011* typedef union epoll_data {* void*ptr;* intfd;* __uint32_t u32;* __uint64_t u64;* } epoll_data_t;** struct epoll_event {* __uint32_t events;* epoll_data_t data;* };epoll_event的数据成员(epoll_data_t data)包含有与通过epoll_ctl将文件描述符注册到epoll时设置的数据相同的数据。这里data.fd为我们注册的文件描述符。这样我们在处理事件的时候持有有效的文件描述符了。
EPollArrayWrapper将Linux的epoll相关系统调用封装成了native方法供EpollSelectorImpl使用。
1234privatenative int epollCreate(); privatenative void epollCtl(intepfd, intopcode, intfd, intevents); privatenative int epollWait(longpollAddress, intnumfds, longtimeout, intepfd) throwsIOException;上述三个native方法就对应Linux下epoll相关的三个系统调用12345678// The fd of the epoll driver privatefinal int epfd; // The epoll_event array for results from epoll_wait privatefinal AllocatedNativeObject pollArray; // Base address of the epoll_event array privatefinal long pollArrayAddress;123// 用于存储已经注册的文件描述符和其注册等待改变的事件的关联关系。在epoll_wait操作就是要检测这里文件描述法注册的事件是否有发生。 privatefinal byte[] eventsLow newbyte[MAX_UPDATE_ARRAY_SIZE]; privatefinal MapInteger,Byte eventsHigh newHashMap();123456789EPollArrayWrapper()throwsIOException { // creates the epoll file descriptor epfd epollCreate(); // the epoll_event array passed to epoll_wait intallocationSize NUM_EPOLLEVENTS * SIZE_EPOLLEVENT; pollArray newAllocatedNativeObject(allocationSize, true); pollArrayAddress pollArray.address(); }EPoolArrayWrapper构造函数创建了epoll文件描述符。构建了一个用于存放epoll_wait返回结果的epoll_event数组。
ServerSocketChannel的构建
ServerSocketChannel.open();
返回ServerSocketChannelImpl对象构建linux系统下ServerSocket的文件描述符。
123456// Our file descriptor privatefinal FileDescriptor fd; // fd value needed for dev/poll. This value will remain valid // even after the value in the file descriptor object has been set to -1 privateint fdVal;123456ServerSocketChannelImpl(SelectorProvider sp) throwsIOException { super(sp); this.fd Net.serverSocket(true); this.fdVal IOUtil.fdVal(fd); this.state ST_INUSE; }将ServerSocketChannel注册到Selector
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
12345678910111213141516171819202122232425262728publicfinal SelectionKey register(Selector sel, intops, Object att) throwsClosedChannelException { synchronized(regLock) { if(!isOpen()) thrownew ClosedChannelException(); if((ops ~validOps()) ! 0) thrownew IllegalArgumentException(); if(blocking) thrownew IllegalBlockingModeException(); SelectionKey k findKey(sel); if(k ! null) { k.interestOps(ops); k.attach(att); } if(k null) { // New registration synchronized(keyLock) { if(!isOpen()) thrownew ClosedChannelException(); k ((AbstractSelector)sel).register(this, ops, att); addKey(k); } } returnk; } }1234567891011121314protectedfinal SelectionKey register(AbstractSelectableChannel ch, intops, Object attachment) { if(!(ch instanceofSelChImpl)) thrownew IllegalSelectorException(); SelectionKeyImpl k newSelectionKeyImpl((SelChImpl)ch, this); k.attach(attachment); synchronized(publicKeys) { implRegister(k); } k.interestOps(ops); returnk; }① 构建代表channel和selector间关系的SelectionKey对象② implRegister(k)将channel注册到epoll中③ k.interestOps(int) 完成下面两个操作a) 会将注册的感兴趣的事件和其对应的文件描述存储到EPollArrayWrapper对象的eventsLow或eventsHigh中这是给底层实现epoll_wait时使用的。b) 同时该操作还会将设置SelectionKey的interestOps字段这是给我们程序员获取使用的。
EPollSelectorImpl. implRegister
123456789protectedvoid implRegister(SelectionKeyImpl ski) { if(closed) thrownew ClosedSelectorException(); SelChImpl ch ski.channel; intfd Integer.valueOf(ch.getFDVal()); fdToKey.put(fd, ski); pollWrapper.add(fd); keys.add(ski); }① 将channel对应的fd(文件描述符)和对应的SelectionKeyImpl放到fdToKey映射表中。② 将channel对应的fd(文件描述符)添加到EPollArrayWrapper中并强制初始化fd的事件为0 ( 强制初始更新事件为0因为该事件可能存在于之前被取消过的注册中。)③ 将selectionKey放入到keys集合中。
Selection操作
selection操作有3中类型
① select()该方法会一直阻塞直到至少一个channel被选择(即该channel注册的事件发生了)为止除非当前线程发生中断或者selector的wakeup方法被调用。② select(long time)该方法和select()类似该方法也会导致阻塞直到至少一个channel被选择(即该channel注册的事件发生了)为止除非下面3种情况任意一种发生a) 设置的超时时间到达b) 当前线程发生中断c) selector的wakeup方法被调用③ selectNow()该方法不会发生阻塞如果没有一个channel被选择也会立即返回。
我们主要来看看select()的实现 int n selector.select();
123publicint select() throwsIOException { returnselect(0);}最终会调用到EPollSelectorImpl的doSelect
1234567891011121314151617181920212223protectedint doSelect(longtimeout) throwsIOException { if(closed) thrownew ClosedSelectorException(); processDeregisterQueue(); try{ begin(); pollWrapper.poll(timeout); }finally{ end(); } processDeregisterQueue(); intnumKeysUpdated updateSelectedKeys(); if(pollWrapper.interrupted()) { // Clear the wakeup pipe pollWrapper.putEventOps(pollWrapper.interruptedIndex(),0); synchronized(interruptLock) { pollWrapper.clearInterrupted(); IOUtil.drain(fd0); interruptTriggered false; } } returnnumKeysUpdated; }① 先处理注销的selectionKey队列② 进行底层的epoll_wait操作③ 再次对注销的selectionKey队列进行处理④ 更新被选择的selectionKey
先来看processDeregisterQueue():
1234567891011121314151617181920212223voidprocessDeregisterQueue() throwsIOException { Set var1 this.cancelledKeys(); synchronized(var1) { if(!var1.isEmpty()) { Iterator var3 var1.iterator(); while(var3.hasNext()) { SelectionKeyImpl var4 (SelectionKeyImpl)var3.next(); try{ this.implDereg(var4); }catch(SocketException var12) { IOException var6 newIOException(Error deregistering key); var6.initCause(var12); throwvar6; }finally{ var3.remove(); } } } } }从cancelledKeys集合中依次取出注销的SelectionKey执行注销操作将处理后的SelectionKey从cancelledKeys集合中移除。执行processDeregisterQueue()后cancelledKeys集合会为空。
1234567891011121314protectedvoid implDereg(SelectionKeyImpl ski) throwsIOException { assert(ski.getIndex() 0); SelChImpl ch ski.channel; intfd ch.getFDVal(); fdToKey.remove(Integer.valueOf(fd)); pollWrapper.remove(fd); ski.setIndex(-1); keys.remove(ski); selectedKeys.remove(ski); deregister((AbstractSelectionKey)ski); SelectableChannel selch ski.channel(); if(!selch.isOpen() !selch.isRegistered()) ((SelChImpl)selch).kill(); }注销会完成下面的操作① 将已经注销的selectionKey从fdToKey( 文件描述与SelectionKeyImpl的映射表 )中移除② 将selectionKey所代表的channel的文件描述符从EPollArrayWrapper中移除③ 将selectionKey从keys集合中移除这样下次selector.select()就不会再将该selectionKey注册到epoll中监听④ 也会将selectionKey从对应的channel中注销⑤ 最后如果对应的channel已经关闭并且没有注册其他的selector了则将该channel关闭
完成上面的的操作后注销的SelectionKey就不会出现先在keys、selectedKeys以及cancelKeys这3个集合中的任何一个。
接着我们来看EPollArrayWrapper.poll(timeout)
123456789101112intpoll(longtimeout) throwsIOException { updateRegistrations(); updated epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd); for(inti0; iupdated; i) { if(getDescriptor(i) incomingInterruptFD) { interruptedIndex i; interrupted true; break; } } returnupdated; }updateRegistrations()方法会将已经注册到该selector的事件(eventsLow或eventsHigh)通过调用epollCtl(epfd, opcode, fd, events); 注册到linux系统中。
这里epollWait就会调用linux底层的epoll_wait方法并返回在epoll_wait期间有事件触发的entry的个数
再看updateSelectedKeys()
123456789101112131415161718192021222324privateint updateSelectedKeys() { intentries pollWrapper.updated; intnumKeysUpdated 0; for(inti0; ientries; i) { intnextFD pollWrapper.getDescriptor(i); SelectionKeyImpl ski fdToKey.get(Integer.valueOf(nextFD)); // ski is null in the case of an interrupt if(ski ! null) { intrOps pollWrapper.getEventOps(i); if(selectedKeys.contains(ski)) { if(ski.channel.translateAndSetReadyOps(rOps, ski)) { numKeysUpdated; } }else{ ski.channel.translateAndSetReadyOps(rOps, ski); if((ski.nioReadyOps() ski.nioInterestOps()) ! 0) { selectedKeys.add(ski); numKeysUpdated; } } } } returnnumKeysUpdated; }该方法会从通过EPollArrayWrapper pollWrapper 以及 fdToKey( 构建文件描述符-SelectorKeyImpl映射表 )来获取有事件触发的SelectionKeyImpl对象然后将SelectionKeyImpl放到selectedKey集合( 有事件触发的selectionKey集合可以通过selector.selectedKeys()方法获得 )中即selectedKeys。并重新设置SelectionKeyImpl中相关的readyOps值。
但是这里要注意两点
① 如果SelectionKeyImpl已经存在于selectedKeys集合中并且发现触发的事件已经存在于readyOps中了则不会使numKeysUpdated这样会使得我们无法得知该事件的变化。
上面这点说明了为什么我们要在每次从selectedKey中获取到Selectionkey后将其从selectedKey集合移除就是为了当有事件触发使selectionKey能正确到放入selectedKey集合中并正确的通知给调用者。
② 如果发现channel所发生I/O事件不是当前SelectionKey所感兴趣则不会将SelectionKeyImpl放入selectedKeys集合中也不会使numKeysUpdated
epoll原理
epoll是Linux下的一种IO多路复用技术可以非常高效的处理数以百万计的socket句柄。先看看使用c封装的3个epoll系统调用:
int epoll_create(int size)
epoll_create建立一个epoll对象。参数size是内核保证能够正确处理的最大句柄数多于这个最大数时内核可不保证效果。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
epoll_ctl可以操作epoll_create创建的epoll如将socket句柄加入到epoll中让其监控或把epoll正在监控的某个socket句柄移出epoll。
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout)
epoll_wait在调用时在给定的timeout时间内所监控的句柄中有事件发生时就返回用户态的进程。
大概看看epoll内部是怎么实现的
epoll初始化时会向内核注册一个文件系统用于存储被监控的句柄文件调用epoll_create时会在这个文件系统中创建一个file节点。同时epoll会开辟自己的内核高速缓存区以红黑树的结构保存句柄以支持快速的查找、插入、删除。还会再建立一个list链表用于存储准备就绪的事件。当执行epoll_ctl时除了把socket句柄放到epoll文件系统里file对象对应的红黑树上之外还会给内核中断处理程序注册一个回调函数告诉内核如果这个句柄的中断到了就把它放到准备就绪list链表里。所以当一个socket上有数据到了内核在把网卡上的数据copy到内核中后就把socket插入到就绪链表里。当epoll_wait调用时仅仅观察就绪链表里有没有数据如果有数据就返回否则就sleep超时时立刻返回。
epoll的两种工作模式
LTlevel-trigger水平触发模式只要某个socket处于readable/writable状态无论什么时候进行epoll_wait都会返回该socket。ETedge-trigger边缘触发模式只有某个socket从unreadable变为readable或从unwritable变为writable时epoll_wait才会返回该socket。
socket读数据socket写数据最后顺便说下在Linux系统中JDK NIO使用的是 LT 而Netty epoll使用的是 ET。
后记
因为本人对计算机系统组成以及C语言等知识比较欠缺因为文中相关知识点的表示也相当“肤浅”如有不对不妥的地方望读者指出。同时我也会继续加强对该方面知识点的学习~
参考
http://www.jianshu.com/p/0d497fe5484ahttp://remcarpediem.com/2017/04/02/Netty源码-三-I-O模型和Java-NIO底层原理/圣思园netty课程