唐山哪里有做网站的,上海市建交人才网,济源建设网站,wordpress pot 汉化字节开源的netPoll底层LinkBuffer设计与实现 为什么需要LinkBuffer介绍设计思路数据结构LinkBufferNodeAPI LinkBuffer读 API写 APIbook / bookAck api 小结 本文基于字节开源的NetPoll版本进行讲解#xff0c;对应官方文档链接为: Netpoll对应官方文档链接
netPoll底层有一个… 字节开源的netPoll底层LinkBuffer设计与实现 为什么需要LinkBuffer介绍设计思路数据结构LinkBufferNodeAPI LinkBuffer读 API写 APIbook / bookAck api 小结 本文基于字节开源的NetPoll版本进行讲解对应官方文档链接为: Netpoll对应官方文档链接
netPoll底层有一个非常核心的数据结构叫LinkBuffer , 本文作为netPoll正式源码分析的前导篇 , 主要来看看netPoll底层使用到的LinkBuffer的源码实现。 为什么需要LinkBuffer
我们先来看一段官方对NetPoll的定义:
Netpoll 是由 字节跳动 开发的高性能 NIO(Non-blocking I/O)网络库专注于 RPC 场景。RPC 通常有较重的处理逻辑因此无法串行处理 I/O。而 Go 的标准库 net 设计了 BIO(Blocking I/O) 模式的 API使得 RPC 框架设计上只能为每个连接都分配一个 goroutine。 这在高并发下会产生大量的 goroutine大幅增加调度开销。此外net.Conn 没有提供检查连接活性的 API因此 RPC 框架很难设计出高效的连接池池中的失效连接无法及时清理。
NetPoll对标的其实就是java中的Netty框架 , 而对于这一类多路IO复用框架来说他们底层实现都依赖于epollkqueue等底层操作系统向上提供的多路复用API 在多路复用模型设计中底层epoll等API的事件触发方式会影响I/O和buffer的设计这也是netpoll推出LinkBuffer的原因。
Linux提供的epoll有两种触发方式:
水平触发(LT) : 由于I/O就绪事件会持续触发直到无数据可读可写 , 所以需要同步的在事件触发后主动完成I/O , 并向上层代码直接提供buffer边沿触发(ET) : 由于I/O就绪事件只会通知一次所以可选择只负责处理事件通知转发由上层代码完成I/O并管理Buffer
go原生网络库采用边沿触发(ET)模式而netpoll采用水平触发(LT)模式LT模式实效性更好主动I/O可以集中内存使用和管理并且还可以像netpoll这样提供nocpoy操作同时还能减少GC 。 目前一些热门开源网络库同样也是采用的LT模式如easygoevio和gnet等 这里主动IO是指由netpoll提供一个缓冲区当监听到fd上的读事件时就主动将数据读取到该缓冲区中至于什么时候从netpoll提供的缓冲区读出数据则是用户的事情了。 主动I/O需要网络库自身提供一个数据缓冲区这会引入上层代码并发操作buffer的问题同时网络库自身也需要对该缓冲区进行I/O读写因此为了保证数据正确性同时又避免加锁带来的低性能目前开源的网络库通常都会采取同步处理buffer (easygo , evio) 或将 buffer copy (gent) 一份提供给上层代码的方式来实现。
已有的实现方式不适合大流量环境下的业务处理或存在copy开销同时常见的bytes , bufio , ringbuffer 等 buffer 库 均存在扩容需要拷贝原数组数据以及只能扩容无法缩容导致占用大量内存等问题。因此LinkBuffer的提出就是为了解决上面提出的两个问题 介绍
相比于常见的Buffer库LinkBuffer的优势有以下几点:
读写并行无锁支持零拷贝的流式读写 链式buffer存在读写两个指针实现读写并行效果 高效扩缩容 由于采用链式buffer实现扩容时直接在尾部添加新的Node节点即可缩容时借助头指针直接释放掉那些多余的Node节点占用的空间同时给每个节点建立一个单独的引用计数确保只在Node节点上的引用计数为0时才会回收其占用的内存 灵活切片和拼接buffer 支持读取LinkBuffer中任意段数据上层代码可以nocopy地并行处理数据流分段无需关心生命周期通过引用计数GC支持任意拼接(nocopy) , 写buffer借助追加Node到链表尾部实现无需copy同时保证数据只会写一次 nocpy buffer 池化减少GC 将每个字节数组看作Node节点构建对象池维护空闲Node节点用于实现Node对象的复用减少内存占用和GC 设计思路
LinkBuffer 的设计思路如下图所示: LinkBuffer 通过将 node 串接成链表的形式实现逻辑上的整体 buffer 。其中 node 是大小固定的内存块 (默认 4k) 。LinkBuffer 拥有读写两个游标 , 从链表头部读数据链表尾部写数据由此实现读写并行无锁。对于读操作由于切片特性可以灵活读取一个LinkBuffer切片 (如: arr[1:10]) , 同时对每个Node都有引用计数(切片多少次就标记多少次) 当所有的切片均使用完释放后用完的Node会自动回收到Node内存池。对于写操作可以直接在链表尾部添加新的Node实现零拷贝扩容同时支持多个LinkBuffer按顺序拼接实现zerocopy的buffer写操作(把一个链表挂接到另一个链表的末尾)node pool 为预先开辟的node池为全局所有的LinkBuffer提供node并回收用完的node减少了分配新内存和系统GC的开销 数据结构
LinkBufferNode
LinkBuffer 中的 LinkBufferNode 节点结构如下:
type linkBufferNode struct {buf []byte // 字节缓冲区off int // 读偏移量malloc int // 写偏移量refer int32 // 引用计数readonly bool // 只读节点,表示底层buf中的内存是不是自己控制的,为真表示不是,自己不能释放origin *linkBufferNode // 当我们从某个slice中切分出其中一部分返回时,此时会用origin指针记录其原本的切片对象next *linkBufferNode // next指针
}LinkBufferNode的构造函数如下:
var linkedPool sync.Pool{New: func() interface{} {return linkBufferNode{refer: 1, // 自带 1 引用}},
}func newLinkBufferNode(size int) *linkBufferNode {// 从缓冲池中拿到一个空闲的node节点var node linkedPool.Get().(*linkBufferNode)// 重置节点的读写偏移量,引用计数和只读属性node.off, node.malloc, node.refer, node.readonly 0, 0, 1, false// 节点大小小于等于0,表示为只读节点if size 0 {node.readonly truereturn node}// LinkBufferCap表示每个node节点的最小的大小if size LinkBufferCap {size LinkBufferCap}// 分配len(slice)0 , len(cap)size大小的切片node.buf malloc(0, size)return node
}// malloc 底层调用的是字节开源的mache库
func malloc(size, capacity int) []byte {if capacity mallocMax {return make([]byte, size, capacity)}return mcache.Malloc(size, capacity)
}LinkBufferNode 中最重要的属性便是buf了buf 是整个网络读写最终的存储变量这段内存是单独管理的且大小不固定与buf相关的操作有如下几种:
创建时申请了一块内存后buf : buf[:0] 来保存内存的引用此时 len(buf) 0Malloc 时从buf中申请了一段切片 buf[:malloc] , 此处申请的是切片引用而不是底层实际内存此时 len(buf) 0 malloc - len(buf) writeable 调用方法需要做长度检查以在Node Malloc时底层数据访问不越界Flush 时 buf buf[:malloc] len(buf) malloc 由于底层内存是重用的且放回时并不会reset底层数组所以严格依赖 buf buf[:malloc] 来确保底层内存中的内容的确时我们已经写入到的LinkBufferNode 中哪部分数据对外可见也是依赖于len(buf)属性大小的因为读取数据的时候都是读取buf[:len(buf)]区间范围内的数据
buf 内存分配有以下三种情况:
分配至mcache需要手动free当分配内存大于mallocMax时直接make创建被runtime自动管理外部直接赋值由外部进行管理
buf可读数据范围: readable buf[off:len(buf)] (off 读指针)
buf可写数据范围: writeable buf[len(buf):malloc] 如果node的readonly属性为true表示底层buf中的内存不是自己控制的不能去主动释放Node对象readonly属性为true有以下两种情况:
外部bytes直接写入: WriteBinary该Node属于引用类型 有origin节点 API
这里简单看看LinkBufferNode提供的一些常用的API实现:
Len : 返回剩余可读数据量
// Len 剩余可读数据量
func (node *linkBufferNode) Len() (l int) {return len(node.buf) - node.off
}IsEmpty : 返回当前节点可读数据量是否为空
// IsEmpty 当前节点可读数据量是否为空
func (node *linkBufferNode) IsEmpty() (ok bool) {return node.off len(node.buf)
}Reset : 重置节点状态
// Reset 重置节点状态
func (node *linkBufferNode) Reset() {// 如果当前节点拥有的切片是个子切片或者当前切片的引用计数不等于1说明当前节点不能重置if node.origin ! nil || atomic.LoadInt32(node.refer) ! 1 {return}// 重置读写指针node.off, node.malloc 0, 0// 重置缓冲区len大小cap不变node.buf node.buf[:0]return
}Next: 往后读取n个字节数据,并移动读指针
// Next 往后读取n个字节数据,并移动读指针
// 调用方需要检查传入的长度n确保其不超过malloc-off 如果超过了可能会读到buf重用产生的脏数据
func (node *linkBufferNode) Next(n int) (p []byte) {off : node.offnode.off nreturn node.buf[off:node.off]
}Peek: 不移动读指针只是预览数据
// Peek 不移动读指针只是预览数据
func (node *linkBufferNode) Peek(n int) (p []byte) {return node.buf[node.off : node.offn]
}Malloc: 申请一段内存来写入数据在没有flush(buf:buf[:malloc])前,不会读到这段内存
// Malloc 申请一段内存来写入数据在没有flush(buf:buf[:malloc])前,不会读到这段内存
// 注意Node上的Malloc不会真正去申请内存Node的内存在buf创建时就已经申请好了
func (node *linkBufferNode) Malloc(n int) (buf []byte) {malloc : node.mallocnode.malloc nreturn node.buf[malloc:node.malloc]
}Refer: 返回一个新的Node对象,并设置origin父对象,此处指向的origin是根origin
// Refer 返回一个新的Node对象,并设置origin父对象,此处指向的origin是根origin -- linkBufferNode为两级结构
// 将 [read,readn]范围的切片切分出来由一个新的node节点引用同时增加当前节点的引用计数
func (node *linkBufferNode) Refer(n int) (p *linkBufferNode) {// 创建一个只读节点p newLinkBufferNode(0)// 当前节点p指向[read,readn]范围的切片p.buf node.Next(n)// 如果当前节点本身指向的也是一个子切片这边不会形成一个树状结构而是指向根节点if node.origin ! nil {p.origin node.origin} else {p.origin node}// 增加根节点的引用计数atomic.AddInt32(p.origin.refer, 1)return p
}Release: 如果当前节点不存在其他引用了重置node各属性放回节点池等待重用
// Release 如果有原始节点先释放原始节点
// 如果当前节点不存在其他引用了重置node各属性放回节点池等待重用
func (node *linkBufferNode) Release() (err error) {// 如果当前节点指向的是子切片先释放父切片if node.origin ! nil {node.origin.Release()}// release self// 递减根节点引用计数 (计数只会在根节点上递增所以这里只关心根节点上的递减即可)if atomic.AddInt32(node.refer, -1) 0 {// readonly nodes cannot recycle node.buf, other node.buf are recycled to mcache.// 释放根节点占用的buf空间if !node.readonly {free(node.buf)}// 将相关属性设置为nullnode.buf, node.origin, node.next nil, nil, nil// 将node重新放回节点池中linkedPool.Put(node)}return nil
}LinkBuffer LinkBuffer 抽象来看属于一个二维切片如果使用传统的read/write系统调用仅支持传入一维切片需要反复调用才能处理完整个二维切片的数据所以LinkBuffer这里对外提供readv/writev系统调用用来一次性传输多个数组的数据:
// writev 包装 writev 系统调用
// writev以顺序iov[0]、iov[1]至iov[iovcnt-1]从各缓冲区中聚集输出数据到fd
func writev(fd int, bs [][]byte, ivs []syscall.Iovec) (n int, err error) {// 将ivs[i].base 指向 bs[i] , 也就是将bs作为写缓冲区数据来源iovLen : iovecs(bs, ivs)if iovLen 0 {return 0, nil}// 执行writev系统调用,将ivs[i].base指针指向的缓冲区数据写入fd代表的文件中r, _, e : syscall.RawSyscall(syscall.SYS_WRITEV, uintptr(fd), uintptr(unsafe.Pointer(ivs[0])), uintptr(iovLen))// 清空ivs和bs缓冲区数据resetIovecs(bs, ivs[:iovLen])if e ! 0 {return int(r), syscall.Errno(e)}// 返回成功写入的字节数量return int(r), nil
}// readv 包装readv系统调用 , 返回 0 或 nil 表示数据读完了
// readv则将从fd读入的数据按同样的顺序散布到各缓冲区中readv总是先填满一个缓冲区然后再填下一个
func readv(fd int, bs [][]byte, ivs []syscall.Iovec) (n int, err error) {// 将ivs[i].base 指向 bs[i] , 也就是将bs作为最终接收数据的缓冲区iovLen : iovecs(bs, ivs)if iovLen 0 {return 0, nil}// 执行readv系统调用,将数据读取到ivs[i].base指针指向的缓冲区中r, _, e : syscall.RawSyscall(syscall.SYS_READV, uintptr(fd), uintptr(unsafe.Pointer(ivs[0])), uintptr(iovLen))// 之所以要reset是因为 barrier 不能持有原始 []byte 的引用否则这段 []byte 永远不能被 GCresetIovecs(bs, ivs[:iovLen])if e ! 0 {return int(r), syscall.Errno(e)}// 返回成功读取到的字节数量return int(r), nil
}此处使用到了Linux相关的IO系统调用: Unix/Linux编程分散输入和集中输出------readv() 、 writev() 关于TestReadv函数实现bug的pr链接: fix: 修复sys_exec_test.go函数中TestReadv测试函数使用错误 #297 LinkBuffer 具体的数据结构如下所示:
// LinkBuffer implements ReadWriter.
type LinkBuffer struct {length int64 // 可读数据量mallocSize int // 已写数据量head *linkBufferNode // release head 头结点read *linkBufferNode // read head 读指针flush *linkBufferNode // malloc head 写开始指针write *linkBufferNode // malloc tail 写结束指针caches [][]byte // buf allocated by Next when cross-package, which should be freed when release
}head - read 这一段表示可以释放的Node节点范围因为该范围内的Node节点持有的数据都已经被读取了read - flush 这一段表示已经写入但是还没有读取的Node节点范围flush - write 这一段表示已经创建但是未真正写入的可写空间因为在没有调用Flush前这段空间内的数据是不可读的因此这段空间内buf中的数据是可能出现无效数据的因为用户可能分配了空间但是还没有往里面写入数据。 读 API
这里只对Next和Slice方法展开进行讲解其他读API大家自行阅读源码学习即可实现思路大同小异。
Next 函数存在两种实现场景:
单节点读取数据采用的是zero-copy实现跨节点读取数据会copy出一个一维切片返回所以不是zero-copy的实现
// Next implements Reader.
func (b *LinkBuffer) Next(n int) (p []byte, err error) {... // 递减总的可读数据量b.recalLen(-n)// 是否需要跨节点读取if b.isSingleNode(n) {// 读取当前read指向节点的可读数据同时推进当前节点上的read指针return b.read.Next(n), nil}// 跨节点读取var pIdx intif block1k n n mallocMax {// 要在release的时候释放p malloc(n, n)b.caches append(b.caches, p)} else {p make([]byte, n)}var l intfor ack : n; ack 0; ack ack - l {l b.read.Len()if l ack {pIdx copy(p[pIdx:], b.read.Next(ack))break} else if l 0 {pIdx copy(p[pIdx:], b.read.Next(l))}b.read b.read.next}_ pIdxreturn p, nil
}const mallocMax block8k * block1kfunc malloc(size, capacity int) []byte {if capacity mallocMax {return make([]byte, size, capacity)}return mcache.Malloc(size, capacity)
}// 增加或减少b.length大小
func (b *LinkBuffer) recalLen(delta int) (length int) {return int(atomic.AddInt64(b.length, int64(delta)))
}此处必须返回一维切片是因为协议层反序列化时需要组装出定义的结构体字段。
如果都是小读取那只有小概率会触发到跨节点读取对于大读取还是优先考虑Slice与Next的区别是Slice会返回一个新的LinkBuffer无论大小都是zero-copy缺点是用户需要手动管理Buffer :
func (b *LinkBuffer) Slice(n int) (r Reader, err error) {// 递减剩余可读取数据量b.recalLen(-n)// 创建一个新的LinkBufferp : LinkBuffer{length: int64(n),}defer func() {p.flush p.flush.nextp.write p.flush}()// 如果是单节点读取那正好zero-copyif b.isSingleNode(n) {// 从 Slice() 返回的 LinkBuffer 是只读的node : b.read.Refer(n)p.head, p.read, p.flush node, node, nodereturn p, nil}// 如果是跨节点读取// 先基于当前读节点给新 LinkBuffer 赋予第一个头节点var l b.read.Len()node : b.read.Refer(l)// 读指针前进一个节点b.read b.read.nextp.head, p.read, p.flush node, node, nodefor ack : n - l; ack 0; ack ack - l {l b.read.Len()// 表示是新 LinkBuffer 的最后一个 Node// 从当前读节点引用出一个需要长度的 Nodeif l ack {p.flush.next b.read.Refer(ack)p.flush p.flush.nextbreak} else if l 0 {// 表示需要创建一个完整大小的 Nodeflush 指针前进p.flush.next b.read.Refer(l)p.flush p.flush.next}b.read b.read.next}// b.Release() 只会 release 已读的内容即返回的 slice 的内容// 由于有引用计数的存在所以底部内存并不会被回收return p, b.Release()
}写 API
Malloc: 预先分配一块内存,这块内存不可读,直到我们调用了Flush
// Malloc 预先分配一块内存,这块内存不可读,直到我们调用了Flush
func (b *LinkBuffer) Malloc(n int) (buf []byte, err error) {if n 0 {return}// 累加写入数据量计数b.mallocSize n// 如果当前节点剩余空间不足,则进行扩容,也就是创建一个新节点挂载到链表尾部b.growth(n)// 分配n大小的切片空间返回return b.write.Malloc(n), nil
}MallocAck: 缩容操作,保留malloc api预分配的前n个字节数据,丢弃剩余的数据
// MallocAck 缩容操作,保留malloc api预分配的前n个字节数据,丢弃剩余的数据
func (b *LinkBuffer) MallocAck(n int) (err error) {if n 0 {return fmt.Errorf(link buffer malloc ack[%d] invalid, n)}// 将已分配数量缩小到nb.mallocSize n// 从flush节点开始定位n个byte,丢弃剩余byteb.write b.flushvar l int // l 代表当前节点剩余的已分配数据量for ack : n; ack 0; ack ack - l {// 计算当前节点已分配数据量// len(b.write.buf) 表示当前node已经flush的数据量大小l b.write.malloc - len(b.write.buf)// 如果当前节点已经分配出去的数据量比当前ack大则丢弃分配的多余空间if l ack {b.write.malloc ack len(b.write.buf)break}b.write b.write.next}// 将多分配的空间全部回收for node : b.write.next; node ! nil; node node.next {node.off, node.malloc, node.refer, node.buf 0, 0, 1, node.buf[:0]}return nil
}Flush: 默认认为当前malloc的内容都为有效数据 , 调用该函数前用户需要确保已经写入了Malloc的所有数据
// Flush 默认认为当前malloc的内容都为有效数据 , 调用该函数前用户需要确保已经写入了Malloc的所有数据
func (b *LinkBuffer) Flush() (err error) {b.mallocSize 0// FIXME: The tail node must not be larger than 8KB to prevent Out Of Memory.if cap(b.write.buf) pagesize {b.write.next newLinkBufferNode(0)b.write b.write.next}var n int// 从flush指针指向的节点遍历到write指针指向的节点for node : b.flush; node ! b.write.next; node node.next {// 计算当前节点已分配数据量delta : node.malloc - len(node.buf)if delta 0 {// 累加已分配数据量计数n delta// 更新buf的len大小,[0,len]区间代表当前node节点上已经flush的数据范围node.buf node.buf[:node.malloc]}}// 移动flush指针到当前write指针指向的节点b.flush b.write// n 代表总的已经malloc出去的数据量,此处让所有数据都对外可见b.recalLen(n)return nil
}book / bookAck api
book: 申请最少min大小内存并存放在p切片内
func (b *LinkBuffer) book(bookSize, maxSize int) (p []byte) {// 计算当前写入节点剩余空间还有多少l : cap(b.write.buf) - b.write.malloc// 没有空间了那么新创建一个LinkBufferNode , 挂载到链表尾部if l 0 {l maxSizeb.write.next newLinkBufferNode(maxSize)b.write b.write.next}// 当前节点,剩余空间比当前需要的空间还大if l bookSize {l bookSize}// 分配l大小的空间return b.write.Malloc(l)
}与malloc区别: book 用来支持 readv/writev 这类二维切片参数的API , 此外与Malloc相比也不存在内存浪费的情况。 bookAck : 确认写入移动写指针malloc
// bookAck 保留book预留的前n个字符,丢弃多余的book空间
func (b *LinkBuffer) bookAck(n int) (length int, err error) {// 缩小malloc大小b.write.malloc n len(b.write.buf)// 更新len(buf) malloc -- 更新后,数据将可以被读取到// 和mallocAck不同的一点在于,bookAck会更新len(buf)大小,相当于调用了一次flushb.write.buf b.write.buf[:b.write.malloc]b.flush b.write// 增加可读数量length b.recalLen(n)return length, nil
}小结
本文带领大家详细研究了一下netpoll底层使用的LinkBuffer实现其中还有诸多细节由于时间关系不能一一到来这些内容大家可以自行阅读源码进行学习。
LinkBuffer 底层还使用到了字节开源的Mcache和GoPool实现感兴趣的同学可以去了解一下如果本篇文章有讲的错误之处也欢迎在评论区指出或私信与我讨论。