佳木斯湛栽影视文化发展公司

主頁(yè) > 知識(shí)庫(kù) > golang將多路復(fù)異步io轉(zhuǎn)成阻塞io的方法詳解

golang將多路復(fù)異步io轉(zhuǎn)成阻塞io的方法詳解

熱門標(biāo)簽:鐵路電話系統(tǒng) 呼叫中心市場(chǎng)需求 地方門戶網(wǎng)站 服務(wù)外包 Linux服務(wù)器 網(wǎng)站排名優(yōu)化 AI電銷 百度競(jìng)價(jià)排名

前言

本文主要給大家介紹了關(guān)于golang 如何將多路復(fù)異步io轉(zhuǎn)變成阻塞io的相關(guān)內(nèi)容,分享出來(lái)供大家參考學(xué)習(xí),下面話不多說(shuō)了,來(lái)一起看看詳細(xì)的介紹:

package main

import (
 "net"
)

func handleConnection(c net.Conn) {
 //讀寫(xiě)數(shù)據(jù)
 buffer := make([]byte, 1024)
 c.Read(buffer)
 c.Write([]byte("Hello from server"))
}

func main() {
 l, err := net.Listen("tcp", "host:port")
 if err != nil {
 return
 }
 defer l.Close()
 for {
 c, err := l.Accept()
 if err!= nil {
 return
 }
 go handleConnection(c)
 }
}

對(duì)于我們都會(huì)寫(xiě)上面的代碼,很簡(jiǎn)單,的確golang的網(wǎng)絡(luò)部分對(duì)于我們隱藏了太多東西,我們不用像c++一樣去調(diào)用底層的socket函數(shù),也不用去使用epoll等復(fù)雜的io多路復(fù)用相關(guān)的邏輯,但是上面的代碼真的就像我們看起來(lái)的那樣在調(diào)用accept和read時(shí)阻塞嗎?

// Multiple goroutines may invoke methods on a Conn simultaneously.
//官方注釋:多個(gè)goroutines可能同時(shí)調(diào)用方法在一個(gè)連接上,我的理解就是所謂的驚群效應(yīng)吧
//換句話說(shuō)就是你多個(gè)goroutines監(jiān)聽(tīng)同一個(gè)連接同一個(gè)事件,所有的goroutines都會(huì)觸發(fā),
//這只是我的猜測(cè),有待驗(yàn)證。
type Conn interface {
 Read(b []byte) (n int, err error)
 Write(b []byte) (n int, err error)
 Close() error
 LocalAddr() Addr
 RemoteAddr() Addr
 SetDeadline(t time.Time) error
 SetReadDeadline(t time.Time) error
 SetWriteDeadline(t time.Time) error
}

type conn struct {
 fd *netFD
}

這里面又一個(gè)Conn接口,下面conn實(shí)現(xiàn)了這個(gè)接口,里面只有一個(gè)成員netFD.

// Network file descriptor.
type netFD struct {
 // locking/lifetime of sysfd + serialize access to Read and Write methods
 fdmu fdMutex

 // immutable until Close
 sysfd  int
 family  int
 sotype  int
 isConnected bool
 net   string
 laddr  Addr
 raddr  Addr

 // wait server
 pd pollDesc
}

func (fd *netFD) accept() (netfd *netFD, err error) {
 //................
 for {
 s, rsa, err = accept(fd.sysfd)
 if err != nil {
 nerr, ok := err.(*os.SyscallError)
 if !ok {
 return nil, err
 }
 switch nerr.Err {
 /* 如果錯(cuò)誤是EAGAIN說(shuō)明Socket的緩沖區(qū)為空,未讀取到任何數(shù)據(jù)
    則調(diào)用fd.pd.WaitRead,*/
 case syscall.EAGAIN:
 if err = fd.pd.waitRead(); err == nil {
  continue
 }
 case syscall.ECONNABORTED:
 continue
 }
 return nil, err
 }
 break
 }
 //.........
 //代碼過(guò)長(zhǎng)不再列出,感興趣看go的源碼,runtime 下的fd_unix.go
 return netfd, nil
}

上面代碼段是accept部分,這里我們注意當(dāng)accept有錯(cuò)誤發(fā)生的時(shí)候,會(huì)檢查這個(gè)錯(cuò)誤是否是syscall.EAGAIN,如果是,則調(diào)用WaitRead將當(dāng)前讀這個(gè)fd的goroutine在此等待,直到這個(gè)fd上的讀事件再次發(fā)生為止。當(dāng)這個(gè)socket上有新數(shù)據(jù)到來(lái)的時(shí)候,WaitRead調(diào)用返回,繼續(xù)for循環(huán)的執(zhí)行,這樣以來(lái)就讓調(diào)用netFD的Read的地方變成了同步“阻塞”。有興趣的可以看netFD的讀和寫(xiě)方法,都有同樣的實(shí)現(xiàn)。

到這里所有的疑問(wèn)都集中到了pollDesc上,它到底是什么呢?

const (
 pdReady uintptr = 1
 pdWait uintptr = 2
)

// Network poller descriptor.
type pollDesc struct {
 link *pollDesc // in pollcache, protected by pollcache.lock
 lock mutex // protects the following fields
 fd  uintptr
 closing bool
 seq  uintptr // protects from stale timers and ready notifications
 rg  uintptr // pdReady, pdWait, G waiting for read or nil
 rt  timer // read deadline timer (set if rt.f != nil)
 rd  int64 // read deadline
 wg  uintptr // pdReady, pdWait, G waiting for write or nil
 wt  timer // write deadline timer
 wd  int64 // write deadline
 user uint32 // user settable cookie
}

type pollCache struct {
 lock mutex
 first *pollDesc
}

pollDesc網(wǎng)絡(luò)輪詢器是Golang中針對(duì)每個(gè)socket文件描述符建立的輪詢機(jī)制。 此處的輪詢并不是一般意義上的輪詢,而是Golang的runtime在調(diào)度goroutine或者GC完成之后或者指定時(shí)間之內(nèi),調(diào)用epoll_wait獲取所有產(chǎn)生IO事件的socket文件描述符。當(dāng)然在runtime輪詢之前,需要將socket文件描述符和當(dāng)前goroutine的相關(guān)信息加入epoll維護(hù)的數(shù)據(jù)結(jié)構(gòu)中,并掛起當(dāng)前goroutine,當(dāng)IO就緒后,通過(guò)epoll返回的文件描述符和其中附帶的goroutine的信息,重新恢復(fù)當(dāng)前goroutine的執(zhí)行。這里我們可以看到pollDesc中有兩個(gè)變量wg和rg,其實(shí)我們可以把它們看作信號(hào)量,這兩個(gè)變量有幾種不同的狀態(tài):

  • pdReady:io就緒 
  • pdWait:當(dāng)前的goroutine正在準(zhǔn)備掛起在信號(hào)量上,但是還沒(méi)有掛起。
  • G pointer:當(dāng)我們把它改為指向當(dāng)前goroutine的指針時(shí),當(dāng)前goroutine掛起  

繼續(xù)接著上面的WaitRead調(diào)用說(shuō)起,go在這里到底做了什么讓當(dāng)前的goroutine掛起了呢。

func net_runtime_pollWait(pd *pollDesc, mode int) int {
 err := netpollcheckerr(pd, int32(mode))
 if err != 0 {
 return err
 }
 // As for now only Solaris uses level-triggered IO.
 if GOOS == "solaris" {
 netpollarm(pd, mode)
 }
 for !netpollblock(pd, int32(mode), false) {
 err = netpollcheckerr(pd, int32(mode))
 if err != 0 {
 return err
 }
 // Can happen if timeout has fired and unblocked us,
 // but before we had a chance to run, timeout has been reset.
 // Pretend it has not happened and retry.
 }
 return 0
}


// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
 //根據(jù)讀寫(xiě)模式獲取相應(yīng)的pollDesc中的讀寫(xiě)信號(hào)量
 gpp := pd.rg
 if mode == 'w' {
 gpp = pd.wg
 }

 for {
 old := *gpp
 //已經(jīng)準(zhǔn)備好直接返回true
 if old == pdReady {
 *gpp = 0
 return true
 }
 if old != 0 {
 throw("netpollblock: double wait")
 }
  //設(shè)置gpp pdWait
 if atomic.Casuintptr(gpp, 0, pdWait) {
 break
 }
 }

 if waitio || netpollcheckerr(pd, mode) == 0 {
 gopark(netpollblockcommit, unsafe.Pointer(gpp), "IO wait", traceEvGoBlockNet, 5)
 }

 old := atomic.Xchguintptr(gpp, 0)
 if old > pdWait {
 throw("netpollblock: corrupted state")
 }
 return old == pdReady
}

當(dāng)調(diào)用WaitRead時(shí)經(jīng)過(guò)一段匯編最重調(diào)用了上面的net_runtime_pollWait函數(shù),該函數(shù)循環(huán)調(diào)用了netpollblock函數(shù),返回true表示io已準(zhǔn)備好,返回false表示錯(cuò)誤或者超時(shí),在netpollblock中調(diào)用了gopark函數(shù),gopark函數(shù)調(diào)用了mcall的函數(shù),該函數(shù)用匯編來(lái)實(shí)現(xiàn),具體功能就是把當(dāng)前的goroutine掛起,然后去執(zhí)行其他可執(zhí)行的goroutine。到這里整個(gè)goroutine掛起的過(guò)程已經(jīng)結(jié)束,那當(dāng)goroutine可讀的時(shí)候是如何通知該goroutine呢,這就是epoll的功勞了。

func netpoll(block bool) *g {
 if epfd == -1 {
 return nil
 }
 waitms := int32(-1)
 if !block {
 waitms = 0
 }
 var events [128]epollevent
retry:
 //每次最多監(jiān)聽(tīng)128個(gè)事件
 n := epollwait(epfd, events[0], int32(len(events)), waitms)
 if n  0 {
 if n != -_EINTR {
 println("runtime: epollwait on fd", epfd, "failed with", -n)
 throw("epollwait failed")
 }
 goto retry
 }
 var gp guintptr
 for i := int32(0); i  n; i++ {
 ev := events[i]
 if ev.events == 0 {
 continue
 }
 var mode int32
 //讀事件
 if ev.events(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
 mode += 'r'
 }
 //寫(xiě)事件
 if ev.events(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
 mode += 'w'
 }
 if mode != 0 {
  //把epoll中的data轉(zhuǎn)換成pollDesc
 pd := *(**pollDesc)(unsafe.Pointer(ev.data))
 netpollready(gp, pd, mode)
 }
 }
 if block  gp == 0 {
 goto retry
 }
 return gp.ptr()
}

這里就是熟悉的代碼了,epoll的使用,看起來(lái)親民多了。pd:=*(**pollDesc)(unsafe.Pointer(ev.data))這是最關(guān)鍵的一句,我們?cè)谶@里拿到當(dāng)前可讀時(shí)間的pollDesc,上面我們已經(jīng)說(shuō)了,當(dāng)pollDesc的讀寫(xiě)信號(hào)量保存為G pointer時(shí)當(dāng)前goroutine就會(huì)掛起。而在這里我們調(diào)用了netpollready函數(shù),函數(shù)中把相應(yīng)的讀寫(xiě)信號(hào)量G指針擦出,置為pdReady,G-pointer狀態(tài)被抹去,當(dāng)前goroutine的G指針就放到可運(yùn)行隊(duì)列中,這樣goroutine就被喚醒了。

可以看到雖然我們?cè)趯?xiě)tcp server看似一個(gè)阻塞的網(wǎng)絡(luò)模型,在其底層實(shí)際上是基于異步多路復(fù)用的機(jī)制來(lái)實(shí)現(xiàn)的,只是把它封裝成了跟阻塞io相似的開(kāi)發(fā)模式,這樣是使得我們不用去關(guān)注異步io,多路復(fù)用等這些復(fù)雜的概念以及混亂的回調(diào)函數(shù)。

總結(jié)

以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問(wèn)大家可以留言交流,謝謝大家對(duì)腳本之家的支持。

您可能感興趣的文章:
  • Go語(yǔ)言同步與異步執(zhí)行多個(gè)任務(wù)封裝詳解(Runner和RunnerAsync)

標(biāo)簽:蘭州 仙桃 湖南 銅川 湘潭 黃山 崇左 衡水

巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《golang將多路復(fù)異步io轉(zhuǎn)成阻塞io的方法詳解》,本文關(guān)鍵詞  ;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問(wèn)題,煩請(qǐng)?zhí)峁┫嚓P(guān)信息告之我們,我們將及時(shí)溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無(wú)關(guān)。
  • 相關(guān)文章
  • 收縮
    • 微信客服
    • 微信二維碼
    • 電話咨詢

    • 400-1100-266
    辉南县| 大竹县| 建平县| 仪征市| 正安县| 咸阳市| 攀枝花市| 若羌县| 友谊县| 延寿县| 玉林市| 铜川市| 响水县| 波密县| 澜沧| 南漳县| 绥化市| 炉霍县| 东丰县| 册亨县| 仁怀市| 于都县| 兴国县| 正定县| 大宁县| 富顺县| 宾阳县| 图片| 荆州市| 盖州市| 桐梓县| 陇川县| 万全县| 金秀| 梓潼县| 攀枝花市| 碌曲县| 汤原县| 康马县| 乐平市| 德保县|