33人参与 • 2025-02-14 • Golang
在学习完了socket编程的基础知识、linux系统提供的i/o多路复用的实现以及golang的gmp调度模型之后,我们进而学习golang的网络模型——netpoll。本文将从为什么需要使用netpoll模型,以及netpoll的具体流程实现两个主要角度来展开学习。当前使用的go的版本为1.22.4,linux系统。
首先,什么是多路复用?
多路,指的是存在着多个需要服务的对象;复用,指的是重复利用一个单元来为上述的多个目标提供服务。
我们知道,linux系统为用户提供了三个内核实现的io多路复用技术的系统调用,用发展时间来排序分别为:select->poll->epoll
。其中,epoll
在当今使用的最为广泛,对比与select
调用,它有以下的优势:
fd
数量灵活:可监听的fd
数量上限灵活,使用方可以在调用epoll_create
操作时自行指定。fd
,相比与调用select
每次需要将所有的fd
拷贝进内核,监听到事件后再全部拷贝回用户态,epoll
只需要将需要监听的fd
添加到事件表后,即可多次监听。epoll
运行将就绪事件添加到就绪事件列表中,当用户调用epoll_wait
操作时,内核只返回就绪事件,而select
返回的是所有的事件,需要用户再进行一次遍历,找到就绪事件再处理。需要注意的是,在不同的条件环境下,epoll的优势可能反而作用不明显。epoll只适用在监听fd基数较大且活跃度不高的场景,如此epoll事件表的空间复用和epoll_wait操作的精准才能体现出其优势;而当处在fd基数较小且活跃度高的场景下,select反而更加简单有效,构造epoll的红黑树结构的消耗会成为其累赘。
考虑到场景的多样性,我们会选择使用epoll
去完成内核事件监听的操作,那么如何将golang
和epoll
结合起来呢?
在 go 语言的并发模型中,gmp 框架实现了一种高效的协程调度机制,它屏蔽了操作系统线程的细节,用户可以通过轻量级的 goroutine 来实现细粒度的并发操作。然而,底层的 io 多路复用机制(如 linux 的 epoll)调度的单位仍然是线程(m)。为了将 io 调度从线程层面提升到协程层面,充分发挥 goroutine 的高并发优势,netpoll 应运而生。
接下来我们就来学习netpoll
框架的实现。
为了将io调度从线程提升到协程层面,netpoll
框架有个重要的核心结构polldesc
,它有两个,一个为表层,含有指针指向了里层的polldesc
。本文中讲到的polldesc
都为里层polldesc
。
表层polldesc
定位在internel/poll/fd_poll_runtime.go
文件中:
type polldesc struct { runtimectx uintptr }
使用一个runtimectx
指针指向其底层实现实例。
里层的位于runtime/netpoll.go
中。
//网络poller描述符 type polldesc struct { //next指针,指向在pollcache链表结构中,以下个polldesc实例。 link *polldesc //指向fd fd uintptr //读事件状态标识器,状态有四种: //1、pdready:表示读操作已就绪,等待处理 //2、pdwait:表示g将要被阻塞等待读操作就绪,此时还未阻塞 //3、g:读操作的g已经被阻塞,rg指向阻塞的g实例 //4、pdnil:空 rg atomic.uintptr wg atomic.uintptr //... }
polldesc
的核心字段是读/写标识器rg/wg
,它用于标识fd的io事件状态,并且持有被阻塞的g实例。当后续需要唤醒这个g处理读写事件的时候,可以通过polldesc
追溯得到g的实例进行操作。有了polldesc
这个数据结构,golang就能将对处理socket的调度单位从线程thread
转换成协程g
。
pollcache
缓冲池采用了单向链表的方式存储多个polldesc
实例。
type pollcache struct { lock mutex first *polldesc }
其包含了两个核心方法,分别是alloc()
和free()
//从pollcache中分配得到一个polldesc实例 func (c *pollcache) alloc() *polldesc { lock(&c.lock) //如果链表为空,则进行初始化 if c.first == nil { //pdsize = 248 const pdsize = unsafe.sizeof(polldesc{}) //4096 / 248 = 16 n := pollblocksize / pdsize if n == 0 { n = 1 } //分配指定大小的内存空间 mem := persistentalloc(n*pdsize, 0, &memstats.other_sys) //完成指定数量的polldesc创建 for i := uintptr(0); i < n; i++ { pd := (*polldesc)(add(mem, i*pdsize)) pd.link = c.first c.first = pd } } pd := c.first c.first = pd.link lockinit(&pd.lock, lockrankpolldesc) unlock(&c.lock) return pd }
//free用于将一个polldesc放回pollcache func (c *pollcache) free(pd *polldesc) { //... lock(&c.lock) pd.link = c.first c.first = pd unlock(&c.lock) }
在宏观的角度下,netpoll框架主要涉及了以下的几个流程:
poll_init
:底层调用epoll_create
指令,在内核态中开辟epoll事件表。poll_open
:先构造一个polldesc实例,然后通过epoll_ctl(add)
指令,向内核中添加要监听的socket,并将这一个fd绑定在polldesc中。polldesc含有状态标识器rg/wg
,用于标识事件状态以及存储阻塞的g。poll_wait
:当g依赖的事件未就绪时,调用gopark
方法,将g置为阻塞态存放在polldesc中。net_poll
:gmp调度器会轮询netpoll流程,通常会用非阻塞的方式发起epoll_wait
指令,取出就绪的polldesc,提前出其内部陷入阻塞态的g然后将其重新添加到gmp的调度队列中。(以及在sysmon流程和gc流程都会触发netpoll)我们参考以下的简易tcp服务器实现框架,走进netpoll框架的具体源码实现。
// 启动 tcp server 代码示例 func main() { //创建tcp端口监听器,涉及以下事件: //1:创建socket fd,调用bind和accept系统接口函数 //2:调用epoll_create,创建eventpool //3:调用epoll_ctl(add),将socket fd注册到epoll事件表 l, _ := net.listen("tcp", ":8080") // eventloop reactor 模型 for { //等待tcp连接到达,涉及以下事件: //1:循环+非阻塞调用accept //2:若未就绪,则调用gopark进行阻塞 //3:等待netpoller轮询唤醒 //4:获取到conn fd后注册到eventpool //5:返回conn conn, _ := l.accept() // goroutine per conn go serve(conn) } } // 处理一笔到来的 tcp 连接 func serve(conn net.conn) { //关闭conn,从eventpool中移除fd defer conn.close() var buf []byte //读取conn中的数据,涉及以下事件: //1:循环+非阻塞调用recv(read) //2:若未就绪,通过gopark阻塞,等待netpoll轮询唤醒 _, _ = conn.read(buf) //向conn中写入数据,涉及以下事件: //1:循环+非阻塞调用writev (write) //2:若未就绪,通过gopark阻塞,等待netpoll轮询唤醒 _, _ = conn.write(buf) }
以net.listen
方法为入口,进行创建socket fd
,调用的方法栈如下:
方法 | 文件 |
---|---|
net.listen() | net/dial.go |
net.listenconfig.listen() | net/dial.go |
net.syslistener.listentcp() | net/tcpsock_posix.go |
net.internetsocket() | net/ipsock_posix.go |
net.socket() | net/sock_posix.go |
核心的调用在net.socket()
方法内,源码核心流程如下:
func socket(ctx context.context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlctxfn func(context.context, string, string, syscall.rawconn) error) (fd *netfd, err error) { //进行socket系统调用,创建一个socket s, err := syssocket(family, sotype, proto) //绑定socket fd fd, err = newfd(s, family, sotype, net); //... //进行了以下事件: //1、通过syscall bind指令绑定socket的监听地址 //2、通过syscall listen指令发起对socket的监听 //3、完成epollevent表的创建(全局执行一次) //4、将socket fd注册到epoll事件表中,监听读写就绪事件 err := fd.listenstream(ctx, laddr, listenerbacklog(), ctrlctxfn); }
首先先执行了syssocket
系统调用,创建一个socket
,它是一个整数值,用于标识操作系统中打开的文件或网络套接字;接着调用newfd
方法包装成netfd
对象,以便实现更高效的异步 io 和 goroutine 调度。
紧接3.2中的net.socket
方法,在内部还调用了net.netfd.listenstream()
,poll_init
的调用栈如下:
方法 | 文件 |
---|---|
net.netfd.listenstream() | net/sock_posix.go |
net.netfd.init() | net/fd_unix.go |
poll.fd.init() | internal/poll/fd_unix.go |
poll.polldesc.init() | internal/poll/fd_poll_runtime.go |
runtime.poll_runtime_pollserverinit() | runtime/netpoll.go |
runtime.netpollinit() | runtime/netpoll_epoll.go |
net.netfd.listenstream()
核心步骤如下:
func (fd *netfd) listenstream(ctx context.context, laddr sockaddr, backlog int, ctrlctxfn func(context.context, string, string, syscall.rawconn) error) error { //.... //通过bind系统调用绑定监听地址 if err = syscall.bind(fd.pfd.sysfd, lsa); err != nil { return os.newsyscallerror("bind", err) } //通过listen系统调用对socket进行监听 if err = listenfunc(fd.pfd.sysfd, backlog); err != nil { return os.newsyscallerror("listen", err) } //fd.init()进行了以下操作: //1、完成eventpool的创建 //2、将socket fd注册到epoll事件表中 if err = fd.init(); err != nil { return err } //... return nil }
bind
系统调用绑定需要监听的地址listen
系统调用监听socketfd.init
完成eventpool
的创建以及fd的注册net.netfd.init()
方法在内部转而调用poll.fd.init()
func (fd *netfd) init() error { return fd.pfd.init(fd.net, true) } func (fd *fd) init(net string, pollable bool) error { fd.sysfile.init() // we don't actually care about the various network types. if net == "file" { fd.isfile = true } if !pollable { fd.isblocking = 1 return nil } err := fd.pd.init(fd) if err != nil { // if we could not initialize the runtime poller, // assume we are using blocking mode. fd.isblocking = 1 } return err }
然后又转入到poll.polldesc.init()
的调用中。
func (pd *polldesc) init(fd *fd) error { //通过sysonce结构,完成epoll事件表的唯一一次创建 serverinit.do(runtime_pollserverinit) //完成init后,进行poll_open ctx, errno := runtime_pollopen(uintptr(fd.sysfd)) //... //绑定里层的polldesc实例 pd.runtimectx = ctx return nil }
这里的poll.polldesc
是表层polldesc
,表层pd的init是poll_init
和poll_open
流程的入口:
serverinit.do(runtime_pollserverinit)
,其中serverinit
是名为sysonce
的特殊结构,它会保证执行的方法在全局只会被执行一次,然后执行runtime_pollserverinit
,完成poll_init
操作poll_init
后,调用runtime_pollopen(uintptr(fd.sysfd))
将fd加入到eventpool
中,完成poll_open
操作polldesc
实例我们先来关注serverinit.do(runtime_pollserverinit)
中,执行的runtime_pollserverinit
方法,它定位在runtime/netpoll.go
下:
//go:linkname poll_runtime_pollserverinit internal/poll.runtime_pollserverinit func poll_runtime_pollserverinit() { netpollgenericinit() } func netpollgenericinit() { if netpollinited.load() == 0 { lockinit(&netpollinitlock, lockranknetpollinit) lock(&netpollinitlock) if netpollinited.load() == 0 { //进入netpollinit调用 netpollinit() netpollinited.store(1) } unlock(&netpollinitlock) } }
func netpollinit() { var errno uintptr //进行epollcreate系统调用,创建epoll事件表 epfd, errno = syscall.epollcreate1(syscall.epoll_cloexec) //... //创建pipe管道,接收信号,如程序终止: //r:信号接收端,会注册对应的read事件到epoll事件表中 //w:信号发送端,有信号到达的时候,会往w发送信号,并对r产生读就绪事件 r, w, errpipe := nonblockingpipe() //... //在epollevent中注册监听r的读就绪事件 ev := syscall.epollevent{ events: syscall.epollin, } *(**uintptr)(unsafe.pointer(&ev.data)) = &netpollbreakrd errno = syscall.epollctl(epfd, syscall.epoll_ctl_add, r, &ev) //... //使用全局变量缓存pipe的读写端 netpollbreakrd = uintptr(r) netpollbreakwr = uintptr(w) }
在netpollinit()
方法内部,进行了以下操作:
执行epoll_create
指令创建了epoll事件表,并返回epoll文件描述符epfd
。
创建了两个pipe管道,当向w端写入信号的时候,r端会发生读就绪事件。
注册监听r的读就绪事件。
缓存管道。
在这里,我们创建了两个管道r
以及w
,并且在eventpool
中注册了r的读就绪事件的监听,当我们向w管道写入数据的时候,r管道就会产生读就绪事件,从而打破阻塞的epoll_wait操作,进而执行其他的操作。
方法 | 文件 |
---|---|
net.netfd.listenstream() | net/sock_posix.go |
net.netfd.init() | net/fd_unix.go |
poll.fd.init() | internal/poll/fd_unix.go |
poll.polldesc.init() | internal/poll/fd_poll_runtime.go |
runtime.poll_runtime_pollopen() | runtime/netpoll.go |
runtime.netpollopen | runtime/netpoll_epoll.go |
在poll.polldesc.init()
方法中,完成了poll_init
流程后,就会进入到poll_open
流程,执行runtime.poll_runtime_pollopen()
。
//go:linkname poll_runtime_pollopen internal/poll.runtime_pollopen func poll_runtime_pollopen(fd uintptr) (*polldesc, int) { //获取一个polldesc实例 pd := pollcache.alloc() lock(&pd.lock) wg := pd.wg.load() if wg != pdnil && wg != pdready { throw("runtime: blocked write on free polldesc") } rg := pd.rg.load() if rg != pdnil && rg != pdready { throw("runtime: blocked read on free polldesc") } //绑定socket fd到polldesc中 pd.fd = fd //... //初始化读写状态标识器为无状态 pd.rg.store(pdnil) pd.wg.store(pdnil) //... unlock(&pd.lock) //将fd添加进epoll事件表中 errno := netpollopen(fd, pd) //... //返回polldesc实例 return pd, 0 }
func netpollopen(fd uintptr, pd *polldesc) uintptr { var ev syscall.epollevent //通过epollctl操作,在epollevent中注册针对fd的监听事件 //操作类型宏指令:epoll_ctl_add——添加fd并注册监听事件 //事件类型:epollevent.events: //1、epollin:监听读就绪事件 //2、epollout:监听写就绪事件 //3、epollrdhup:监听中断事件 //4、epollet:使用边缘触发模式 ev.events = syscall.epollin | syscall.epollout | syscall.epollrdhup | syscall.epollet tp := taggedpointerpack(unsafe.pointer(pd), pd.fdseq.load()) *(*taggedpointer)(unsafe.pointer(&ev.data)) = tp return syscall.epollctl(epfd, syscall.epoll_ctl_add, int32(fd), &ev) }
不仅在net.listen()
流程中会触发poll open
,在net.listener.accept
流程中也会,当我们获取到了连接之后,也需要为这个连接封装成一个polldesc
实例,然后执行poll_open
流程将其注册到epoll事件表中。
func (fd *netfd) accept()(netfd *netfd, err error){ // 通过 syscall accept 接收到来的 conn fd d, rsa, errcall, err := fd.pfd.accept() // ... // 封装到来的 conn fd netfd, err = newfd(d, fd.family, fd.sotype, fd.net) // 将 conn fd 注册到 epoll 事件表中 err = netfd.init() // ... return netfd,nil }
当连接conn需要关闭的时候,最终会进入到poll_close
流程,执行epoll_ctl(delete)
删除对应的fd。
方法 | 文件 |
---|---|
net.conn.close | net/net.go |
net.netfd.close | net/fd_posix.go |
poll.fd.close | internal/poll/fd_unix.go |
poll.fd.decref | internal/poll/fd_mutex.go |
poll.fd.destroy | internal/poll/fd_unix.go |
poll.polldesc.close | internal/poll/fd_poll_runtime.go |
poll.runtime_pollclose | internal/poll/fd_poll_runtime.go |
runtime.poll_runtime_pollclose | runtime/netpoll.go |
runtime.netpollclose | runtime/netpoll_epoll.go |
syscall.epollctl | runtime/netpoll_epoll.go |
//go:linkname poll_runtime_pollclose internal/poll.runtime_pollclose func poll_runtime_pollclose(pd *polldesc) { if !pd.closing { throw("runtime: close polldesc w/o unblock") } wg := pd.wg.load() if wg != pdnil && wg != pdready { throw("runtime: blocked write on closing polldesc") } rg := pd.rg.load() if rg != pdnil && rg != pdready { throw("runtime: blocked read on closing polldesc") } netpollclose(pd.fd) pollcache.free(pd) }
func netpollclose(fd uintptr) uintptr { var ev syscall.epollevent return syscall.epollctl(epfd, syscall.epoll_ctl_del, int32(fd), &ev) }
poll_wait
流程最终会执行gopark
将g陷入到用户态阻塞。
方法 | 文件 |
---|---|
poll.polldesc.wait | internal/poll/fd_poll_runtime.go |
poll.runtime_pollwait | internal/poll/fd_poll_runtime.go |
runtime.poll_runtime_pollwait | runtime/netpoll.go |
runtime.netpollblock | runtime/netpoll.go |
runtime.gopark | runtime/proc.go |
runtime.netpollblockcommit | runtime/netpoll.go |
在表层polldesc
中,会通过其内部的里层polldesc
指针,调用到runtime
下的netpollblock
方法。
/* 针对某个 polldesc 实例,监听指定的mode 就绪事件 - 返回true——已就绪 返回false——因超时或者关闭导致中断 - 其他情况下,会通过 gopark 操作将当前g 阻塞在该方法中 */ func netpollblock(pd *polldesc, mode int32, waitio bool) bool { //针对mode事件,获取相应的状态 gpp := &pd.rg if mode == 'w' { gpp = &pd.wg } for { //关心的io事件就绪,直接返回 if gpp.compareandswap(pdready, pdnil) { return true } //关心的io事件未就绪,则置为等待状态,g将要被阻塞 if gpp.compareandswap(pdnil, pdwait) { break } //... } //... //将g置为阻塞态 gopark(netpollblockcommit, unsafe.pointer(gpp), waitreasoniowait, traceblocknet, 5) //当前g从阻塞态被唤醒,重置标识器 old := gpp.swap(pdnil) if old > pdwait { throw("runtime: corrupted polldesc") } //判断是否是因为所关心的事件触发而唤醒 return old == pdready }
在gopark方法中,会闭包调用netpollblockcommit
方法,其中会根据g关心的事件类型,将其实例存储到polldesc的rg或wg容器
中。
// 将 gpp 状态标识器的值由 pdwait 修改为当前 g func netpollblockcommit(gp *g, gpp unsafe.pointer) bool { r := atomic.casuintptr((*uintptr)(gpp), pdwait, uintptr(unsafe.pointer(gp))) if r { //增加等待轮询器的例程计数。 //调度器使用它来决定是否阻塞 //如果没有其他事情可做,则等待轮询器。 netpolladjustwaiters(1) } return r }
接着我们来关注何时会触发poll_wait
流程。
首先是在listener.accept
流程中,如果当前尚未有连接到达,则执行poll wait
将当前g阻塞挂载在该socket fd对应polldesc的rg
中。
// accept wraps the accept network call. func (fd *fd) accept() (int, syscall.sockaddr, string, error) { //... for { //以非阻塞模式发起一次accept,尝试接收conn s, rsa, errcall, err := accept(fd.sysfd) if err == nil { return s, rsa, "", err } switch err { //忽略中断类错误 case syscall.eintr: continue //尚未有到达的conn case syscall.eagain: //进入poll_wait流程,监听fd的读就绪事件,当有conn到达表现为fd可读。 if fd.pd.pollable() { //假如读操作未就绪,当前g会被阻塞在方法内部,直到因为超时或者就绪被netpoll ready唤醒。 if err = fd.pd.waitread(fd.isfile); err == nil { continue } } //... } }
// 指定 mode 为 r 标识等待的是读就绪事件,然后走入更底层的 poll_wait 流程 func (pd *polldesc) waitread(isfile bool) error { return pd.wait('r', isfile) }
其次分别是在conn.read
/conn.write
流程中,假若conn fd下读操作未就绪(无数据到达)/写操作未就绪(缓冲区空间不足),则会执行poll wait将g阻塞并挂载在对应的polldesc中的rg/wg
中。
func (fd *fd) read(p []byte) (int, error) { //... for { //非阻塞模式进行一次read调用 n, err := ignoringeintrio(syscall.read, fd.sysfd, p) if err != nil { n = 0 //进入poll_wait流程,并标识关心读就绪事件 if err == syscall.eagain && fd.pd.pollable() { if err = fd.pd.waitread(fd.isfile); err == nil { continue } } } err = fd.eoferror(n, err) return n, err } }
func (fd *fd)write(p []byte)(int,error){ // ... for{ // ... // 以非阻塞模式执行一次syscall write操作 n, err := ignoringeintrio(syscall.write, fd.sysfd, p[nn:max]) if n >0{ nn += n } // 缓冲区内容都已写完,直接退出 if nn ==len(p){ return nn, err } // 走入 poll_wait 流程,并标识关心的是该 fd 的写就绪事件 if err == syscall.eagain && fd.pd.pollable(){ // 倘若写操作未就绪,当前g 会 park 阻塞在该方法内部,直到因超时或者事件就绪而被 netpoll ready 唤醒 if err = fd.pd.waitwrite(fd.isfile); err ==nil{ continue } } // ... }
netpoll
流程至关重要,它会在底层调用系统的epoll_wait
操作,找到触发事件的fd,然后再逆向找到绑定fd的polldesc
实例,返回内部阻塞的g叫给上游处理唤醒。其调用栈如下:
方法 | 文件 |
---|---|
runtime.netpoll | runtime/netpoll_epoll.go |
runtime.netpollready | runtime/netpoll.go |
runtime.netpollunblock | runtime/netpoll.go |
netpoll
具体的源码如下:
//netpoll用于轮询检查是否有就绪的io事件 //若发现了就绪的io事件,检查是否有polldesc中的g关心其事件 //若找到了关心其io事件就绪的g,添加到list返回给上游处理 func netpoll(delay int64) (glist, int32) { if epfd == -1 { return glist{}, 0 } var waitms int32 //根据传入的delay参数,决定调用epoll_wait的模式: //delay < 0:设为阻塞模式(在 gmp 调度流程中,如果某个 p 迟迟获取不到可执行的 g 时,会通过该模式,使得 thread 陷入阻塞态,但该情况全局最多仅有一例) //delay = 0:设为非阻塞模式(通常情况下为此模式,包括 gmp 常规调度流程、gc 以及全局监控线程 sysmon 都是以此模式触发的 netpoll 流程) //delay > 0:设为超时模式(在 gmp 调度流程中,如果某个 p 迟迟获取不到可执行的 g 时,并且通过 timer 启动了定时任务时,会令 thread 以超时模式执行 epoll_wait 操作) if delay < 0 { waitms = -1 } else if delay == 0 { waitms = 0 } else if delay < 1e6 { waitms = 1 } else if delay < 1e15 { waitms = int32(delay / 1e6) } else { waitms = 1e9 } //最多接收128个io就绪事件 var events [128]syscall.epollevent retry: //以指定模式调用epoll_wait n, errno := syscall.epollwait(epfd, events[:], int32(len(events)), waitms) //... //存储关心io事件就绪的g实例 var torun glist delta := int32(0) //遍历返回的就绪事件 for i := int32(0); i < n; i++ { ev := events[i] if ev.events == 0 { continue } //pipe接收端的信号处理,检查是否需要退出netpoll if *(**uintptr)(unsafe.pointer(&ev.data)) == &netpollbreakrd { if ev.events != syscall.epollin { println("runtime: netpoll: break fd ready for", ev.events) throw("runtime: netpoll: break fd ready for something unexpected") } //... continue } var mode int32 //记录io就绪事件的类型 if ev.events&(syscall.epollin|syscall.epollrdhup|syscall.epollhup|syscall.epollerr) != 0 { mode += 'r' } if ev.events&(syscall.epollout|syscall.epollhup|syscall.epollerr) != 0 { mode += 'w' } // 根据 epollevent.data 获取到监听了该事件的 polldesc 实例 if mode != 0 { tp := *(*taggedpointer)(unsafe.pointer(&ev.data)) pd := (*polldesc)(tp.pointer()) //... //检查是否为g所关心的事件 delta += netpollready(&torun, pd, mode) } } return torun, delta }
func netpollready(torun *glist, pd *polldesc, mode int32) int32 { delta := int32(0) var rg, wg *g if mode == 'r' || mode == 'r'+'w' { //就绪事件包含读就绪,尝试唤醒pd内部的rg rg = netpollunblock(pd, 'r', true, &delta) } if mode == 'w' || mode == 'r'+'w' { //就绪事件包含读就绪,尝试唤醒pd内部的wg wg = netpollunblock(pd, 'w', true, &delta) } //存在g实例,则加入list中 if rg != nil { torun.push(rg) } if wg != nil { torun.push(wg) } return delta }
func netpollunblock(pd *polldesc, mode int32, ioready bool, delta *int32) *g { //获取存储的g实例 gpp := &pd.rg if mode == 'w' { gpp = &pd.wg } for { old := gpp.load() //... new := pdnil if ioready { new = pdready } //将gpp的值从g置换成pdready if gpp.compareandswap(old, new) { if old == pdwait { old = pdnil } else if old != pdnil { *delta -= 1 } //返回需要唤醒的g实例 return (*g)(unsafe.pointer(old)) } } }
那么,我们也同样需要关注在哪个环节进入了net_poll
流程。
首先,是在gmp调度器中的findrunnable
方法中被调用,用于找到可执行的g实例。具体的实现在之前的gmp调度文章中有讲解,这里只关心涉及到net_poll
方面的源码。
findrunnable
方法定位在runtime/proc.go
中
func findrunnable()(gp *g, inherittime, trywakep bool){ // .. /* 同时满足下述三个条件,发起一次【非阻塞模式】的 netpoll 流程: - epoll事件表初始化过 - 有 g 在等待io 就绪事件 - 没有空闲 p 在以【阻塞或超时】模式发起 netpoll 流程 */ if netpollinited()&& atomic.load(&netpollwaiters)>0&& atomic.load64(&sched.lastpoll)!=0{ // 以非阻塞模式发起一轮 netpoll,如果有 g 需要唤醒,一一唤醒之,并返回首个 g 给上层进行调度 if list := netpoll(0);!list.empty(){// non-blocking // 获取就绪 g 队列中的首个 g gp := list.pop() // 将就绪 g 队列中其余 g 一一置为就绪态,并添加到全局队列 injectglist(&list) // 把首个g 也置为就绪态 casgstatus(gp,_gwaiting,_grunnable) // ... //返回 g 给当前 p进行调度 return gp,false,false } } // ... /* 同时满足下述三个条件,发起一次【阻塞或超时模式】的 netpoll 流程: - epoll事件表初始化过 - 有 g 在等待io 就绪事件 - 没有空闲 p 在以【阻塞或超时】模式发起 netpoll 流程 */ if netpollinited()&&(atomic.load(&netpollwaiters)>0|| polluntil !=0)&& atomic.xchg64(&sched.lastpoll,0)!=0{ // 默认为阻塞模式 delay :=int64(-1) // 存在定时时间,则设为超时模式 if polluntil !=0{ delay = polluntil - now // ... } // 以【阻塞或超时模式】发起一轮 netpoll list := netpoll(delay)// block until new work is available } // ... }
其次,是位于同文件下的sysmon
方法中,它会被一个全局监控者g执行,每隔10ms发一次非阻塞的net_poll流程。
// the main goroutine. func main(){ // ... // 新建一个 m,直接运行 sysmon 函数 systemstack(func(){ newm(sysmon,nil,-1) }) // ... } // 全局唯一监控线程的执行函数 func sysmon(){ // ... for{ // ... /* 同时满足下述三个条件,发起一次【非阻塞模式】的 netpoll 流程: - epoll事件表初始化过 - 没有空闲 p 在以【阻塞或超时】模式发起 netpoll 流程 - 距离上一次发起 netpoll 流程的时间间隔已超过 10 ms */ lastpoll :=int64(atomic.load64(&sched.lastpoll)) if netpollinited()&& lastpoll !=0&& lastpoll+10*1000*1000< now { // 以非阻塞模式发起 netpoll list := netpoll(0)// non-blocking - returns list of goroutines // 获取到的 g 置为就绪态并添加到全局队列中 if!list.empty(){ // ... injectglist(&list) // ... } } // ... } }
最后,还会发生在gc流程中。
func pollwork() bool{ // ... // 若全局队列或 p 的本地队列非空,则提前返回 /* 同时满足下述三个条件,发起一次【非阻塞模式】的 netpoll 流程: - epoll事件表初始化过 - 有 g 在等待io 就绪事件 - 没有空闲 p 在以【阻塞或超时】模式发起 netpoll 流程 */ if netpollinited()&& atomic.load(&netpollwaiters)>0&& sched.lastpoll !=0{ // 所有取得 g 更新为就绪态并添加到全局队列 if list := netpoll(0);!list.empty(){ injectglist(&list) return true } } // ... }
感谢观看,本篇博文参考了小徐先生的文章,非常推荐大家去观看并且进入到源码中学习,链接如下:
到此这篇关于golang网络模型netpoll源码解析的文章就介绍到这了,更多相关golang网络模型netpoll内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论