佳木斯湛栽影视文化发展公司

主頁 > 知識庫 > 關(guān)于Redis網(wǎng)絡(luò)模型的源碼詳析

關(guān)于Redis網(wǎng)絡(luò)模型的源碼詳析

熱門標(biāo)簽:鐵路電話系統(tǒng) AI電銷 呼叫中心市場需求 百度競價(jià)排名 服務(wù)外包 Linux服務(wù)器 網(wǎng)站排名優(yōu)化 地方門戶網(wǎng)站

前言

Redis的網(wǎng)絡(luò)模型是基于I/O多路復(fù)用程序來實(shí)現(xiàn)的。源碼中包含四種多路復(fù)用函數(shù)庫epoll、select、evport、kqueue。在程序編譯時(shí)會根據(jù)系統(tǒng)自動選擇這四種庫其中之一。下面以epoll為例,來分析Redis的I/O模塊的源碼。

epoll系統(tǒng)調(diào)用方法

Redis網(wǎng)絡(luò)事件處理模塊的代碼都是圍繞epoll那三個(gè)系統(tǒng)方法來寫的。先把這三個(gè)方法弄清楚,后面就不難了。

epfd = epoll_create(1024);

創(chuàng)建epoll實(shí)例

參數(shù):表示該 epoll 實(shí)例最多可監(jiān)聽的 socket fd(文件描述符)數(shù)量。

返回: epoll 專用的文件描述符。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

管理epoll中的事件,對事件進(jìn)行注冊、修改和刪除。

參數(shù):
epfd:epoll實(shí)例的文件描述符;
op:取值三種:EPOLL_CTL_ADD 注冊、EPOLL_CTL_MOD 修 改、EPOLL_CTL_DEL 刪除;
fd:socket的文件描述符;
epoll_event *event:事件

event代表一個(gè)事件,類似于Java NIO中的channel“通道”。epoll_event 的結(jié)構(gòu)如下:

typedef union epoll_data {
void *ptr;
int fd; /* socket文件描述符 */
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;

struct epoll_event {
__uint32_t events; /* Epoll events 就是各種待監(jiān)聽操作的操作碼求與的結(jié)果,例如EPOLLIN(fd可讀)、EPOLLOUT(fd可寫) */
epoll_data_t data; /* User data variable */
};

int epoll_wait(int epfd, struct epoll_event * events, intmaxevents, int timeout);

等待事件是否就緒,類似于Java NIO中 select 方法。如果事件就緒,將就緒的event存入events數(shù)組中。

參數(shù)
epfd:epoll實(shí)例的文件描述符;
events:已就緒的事件數(shù)組;
intmaxevents:每次能處理的事件數(shù);
timeout:阻塞時(shí)間,等待產(chǎn)生就緒事件的超時(shí)值。

源碼分析

事件

Redis事件系統(tǒng)中將事件分為兩種類型:

  • 文件事件;網(wǎng)絡(luò)套接字對應(yīng)的事件;
  • 時(shí)間事件:Redis中一些定時(shí)操作事件,例如 serverCron 函數(shù)。

下面從事件的注冊、觸發(fā)兩個(gè)流程對源碼進(jìn)行分析

綁定事件

建立 eventLoop

在 initServer方法(由 redis.c 的 main 函數(shù)調(diào)用) 中,在建立 RedisDb 對象的同時(shí),會初始化一個(gè)“eventLoop”對象,我稱之為事件處理器對象。結(jié)構(gòu)體的關(guān)鍵成員變量如下所示:

struct aeEventLoop{
aeFileEvent *events;//已注冊的文件事件數(shù)組
aeFiredEvent *fired;//已就緒的文件事件數(shù)組
aeTimeEvent *timeEventHead;//時(shí)間事件數(shù)組
...
}

初始化 eventLoop 在 ae.c 的“aeCreateEventLoop”方法中執(zhí)行。該方法中除了初始化 eventLoop 還調(diào)用如下方法初始化了一個(gè) epoll 實(shí)例。

/*
 * ae_epoll.c
 * 創(chuàng)建一個(gè)新的 epoll 實(shí)例,并將它賦值給 eventLoop
 */
static int aeApiCreate(aeEventLoop *eventLoop) {

  aeApiState *state = zmalloc(sizeof(aeApiState));

  if (!state) return -1;

  // 初始化事件槽空間
  state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
  if (!state->events) {
    zfree(state);
    return -1;
  }

  // 創(chuàng)建 epoll 實(shí)例
  state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
  if (state->epfd == -1) {
    zfree(state->events);
    zfree(state);
    return -1;
  }

  // 賦值給 eventLoop
  eventLoop->apidata = state;
  return 0;
}

也正是在此處調(diào)用了系統(tǒng)方法“epoll_create”。這里的state是一個(gè)aeApiState結(jié)構(gòu),如下所示:

/*
 * 事件狀態(tài)
 */
typedef struct aeApiState {

  // epoll 實(shí)例描述符
  int epfd;

  // 事件槽
  struct epoll_event *events;

} aeApiState;

這個(gè) state 由 eventLoop->apidata 來記錄。

綁定ip端口與句柄

通過 listenToPort 方法開啟TCP端口,每個(gè)IP端口會對應(yīng)一個(gè)文件描述符 ipfd(因?yàn)榉?wù)器可能會有多個(gè)ip地址)

// 打開 TCP 監(jiān)聽端口,用于等待客戶端的命令請求
if (server.port != 0 
  listenToPort(server.port,server.ipfd,server.ipfd_count) == REDIS_ERR)
  exit(1);

注意:*eventLoop 和 ipfd 分別被 server.el 和 server.ipfd[] 引用。server 是結(jié)構(gòu)體 RedisServer 的實(shí)例,是Redis的全局變量。

注冊事件

如下所示代碼,為每一個(gè)文件描述符綁定一個(gè)事件函數(shù)

// initServer方法:
for (j = 0; j  server.ipfd_count; j++) {
  if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
    acceptTcpHandler,NULL) == AE_ERR)
    {
      redisPanic(
        "Unrecoverable error creating server.ipfd file event.");
    }
}
// ae.c 中的 aeCreateFileEvent 方法
/*
 * 根據(jù) mask 參數(shù)的值,監(jiān)聽 fd 文件的狀態(tài),
 * 當(dāng) fd 可用時(shí),執(zhí)行 proc 函數(shù)
 */
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
    aeFileProc *proc, void *clientData)
{
  if (fd >= eventLoop->setsize) {
    errno = ERANGE;
    return AE_ERR;
  }

  if (fd >= eventLoop->setsize) return AE_ERR;

  // 取出文件事件結(jié)構(gòu)
  aeFileEvent *fe = eventLoop->events[fd];

  // 監(jiān)聽指定 fd 的指定事件
  if (aeApiAddEvent(eventLoop, fd, mask) == -1)
    return AE_ERR;

  // 設(shè)置文件事件類型,以及事件的處理器
  fe->mask |= mask;
  if (mask  AE_READABLE) fe->rfileProc = proc;
  if (mask  AE_WRITABLE) fe->wfileProc = proc;

  // 私有數(shù)據(jù)
  fe->clientData = clientData;

  // 如果有需要,更新事件處理器的最大 fd
  if (fd > eventLoop->maxfd)
    eventLoop->maxfd = fd;

  return AE_OK;
}

aeCreateFileEvent 函數(shù)中有一個(gè)方法調(diào)用:aeApiAddEvent,代碼如下

/*
 * ae_epoll.c
 * 關(guān)聯(lián)給定事件到 fd
 */
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
  aeApiState *state = eventLoop->apidata;
  struct epoll_event ee;

  /* If the fd was already monitored for some event, we need a MOD
   * operation. Otherwise we need an ADD operation. 
   *
   * 如果 fd 沒有關(guān)聯(lián)任何事件,那么這是一個(gè) ADD 操作。
   *
   * 如果已經(jīng)關(guān)聯(lián)了某個(gè)/某些事件,那么這是一個(gè) MOD 操作。
   */
  int op = eventLoop->events[fd].mask == AE_NONE ?
      EPOLL_CTL_ADD : EPOLL_CTL_MOD;

  // 注冊事件到 epoll
  ee.events = 0;
  mask |= eventLoop->events[fd].mask; /* Merge old events */
  if (mask  AE_READABLE) ee.events |= EPOLLIN;
  if (mask  AE_WRITABLE) ee.events |= EPOLLOUT;
  ee.data.u64 = 0; /* avoid valgrind warning */
  ee.data.fd = fd;

  if (epoll_ctl(state->epfd,op,fd,ee) == -1) return -1;

  return 0;
}

這里實(shí)際上就是調(diào)用系統(tǒng)方法“epoll_ctl”,將事件(文件描述符)注冊進(jìn) epoll 中。首先要封裝一個(gè) epoll_event 結(jié)構(gòu),即 ee ,通過“epoll_ctl”將其注冊進(jìn) epoll 中。

除此之外,aeCreateFileEvent 還完成了下面兩個(gè)重要操作:

  • 將事件函數(shù)“acceptTcpHandler”存入了eventLoop中,即由eventLoop->events[fd]->rfileProc 來引用(也可能是wfileProc,分別代表讀事件和寫事件);
  • 將當(dāng)操作碼添加進(jìn) eventLoop->events[fd]->mask 中(mask 類似于JavaNIO中的ops操作碼,代表事件類型)。

事件監(jiān)聽與執(zhí)行

redis.c 的main函數(shù)會調(diào)用 ae.c 中的 main 方法,如下所示:

/*
 * 事件處理器的主循環(huán)
 */
void aeMain(aeEventLoop *eventLoop) {

  eventLoop->stop = 0;

  while (!eventLoop->stop) {

    // 如果有需要在事件處理前執(zhí)行的函數(shù),那么運(yùn)行它
    if (eventLoop->beforesleep != NULL)
      eventLoop->beforesleep(eventLoop);

    // 開始處理事件
    aeProcessEvents(eventLoop, AE_ALL_EVENTS);
  }
}

上述代碼會調(diào)用 aeProcessEvents 方法用于處理事件,方法如下所示

/* Process every pending time event, then every pending file event
 * (that may be registered by time event callbacks just processed).
 *
 * 處理所有已到達(dá)的時(shí)間事件,以及所有已就緒的文件事件。
 * 函數(shù)的返回值為已處理事件的數(shù)量
 */
 int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
  int processed = 0, numevents;

  /* Nothing to do? return ASAP */
  if (!(flags  AE_TIME_EVENTS)  !(flags  AE_FILE_EVENTS)) return 0;

  if (eventLoop->maxfd != -1 ||
    ((flags  AE_TIME_EVENTS)  !(flags  AE_DONT_WAIT))) {
    int j;
    aeTimeEvent *shortest = NULL;
    struct timeval tv, *tvp;

    // 獲取最近的時(shí)間事件
    if (flags  AE_TIME_EVENTS  !(flags  AE_DONT_WAIT))
      shortest = aeSearchNearestTimer(eventLoop);
    if (shortest) {
      // 如果時(shí)間事件存在的話
      // 那么根據(jù)最近可執(zhí)行時(shí)間事件和現(xiàn)在時(shí)間的時(shí)間差來決定文件事件的阻塞時(shí)間
      long now_sec, now_ms;

      /* Calculate the time missing for the nearest
       * timer to fire. */
      // 計(jì)算距今最近的時(shí)間事件還要多久才能達(dá)到
      // 并將該時(shí)間距保存在 tv 結(jié)構(gòu)中
      aeGetTime(now_sec, now_ms);
      tvp = tv;
      tvp->tv_sec = shortest->when_sec - now_sec;
      if (shortest->when_ms  now_ms) {
        tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
        tvp->tv_sec --;
      } else {
        tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
      }

      // 時(shí)間差小于 0 ,說明事件已經(jīng)可以執(zhí)行了,將秒和毫秒設(shè)為 0 (不阻塞)
      if (tvp->tv_sec  0) tvp->tv_sec = 0;
      if (tvp->tv_usec  0) tvp->tv_usec = 0;
    } else {
      
      // 執(zhí)行到這一步,說明沒有時(shí)間事件
      // 那么根據(jù) AE_DONT_WAIT 是否設(shè)置來決定是否阻塞,以及阻塞的時(shí)間長度

      /* If we have to check for events but need to return
       * ASAP because of AE_DONT_WAIT we need to set the timeout
       * to zero */
      if (flags  AE_DONT_WAIT) {
        // 設(shè)置文件事件不阻塞
        tv.tv_sec = tv.tv_usec = 0;
        tvp = tv;
      } else {
        /* Otherwise we can block */
        // 文件事件可以阻塞直到有事件到達(dá)為止
        tvp = NULL; /* wait forever */
      }
    }

    // 處理文件事件,阻塞時(shí)間由 tvp 決定
    numevents = aeApiPoll(eventLoop, tvp);
    for (j = 0; j  numevents; j++) {
      // 從已就緒數(shù)組中獲取事件
      aeFileEvent *fe = eventLoop->events[eventLoop->fired[j].fd];

      int mask = eventLoop->fired[j].mask;
      int fd = eventLoop->fired[j].fd;
      int rfired = 0;

      /* note the fe->mask  mask  ... code: maybe an already processed
       * event removed an element that fired and we still didn't
       * processed, so we check if the event is still valid. */
      // 讀事件
      if (fe->mask  mask  AE_READABLE) {
        // rfired 確保讀/寫事件只能執(zhí)行其中一個(gè)
        rfired = 1;
        fe->rfileProc(eventLoop,fd,fe->clientData,mask);
      }
      // 寫事件
      if (fe->mask  mask  AE_WRITABLE) {
        if (!rfired || fe->wfileProc != fe->rfileProc)
          fe->wfileProc(eventLoop,fd,fe->clientData,mask);
      }

      processed++;
    }
  }

  /* Check time events */
  // 執(zhí)行時(shí)間事件
  if (flags  AE_TIME_EVENTS)
    processed += processTimeEvents(eventLoop);

  return processed; 
}

該函數(shù)中代碼大致分為三個(gè)主要步驟

  • 根據(jù)時(shí)間事件與當(dāng)前時(shí)間的關(guān)系,決定阻塞時(shí)間 tvp;
  • 調(diào)用aeApiPoll方法,將就緒事件都寫入eventLoop->fired[]中,返回就緒事件數(shù)目;
  • 遍歷eventLoop->fired[],遍歷每一個(gè)就緒事件,執(zhí)行之前綁定好的方法rfileProc 或者wfileProc。

ae_epoll.c 中的 aeApiPoll 方法如下所示:

/*
 * 獲取可執(zhí)行事件
 */
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
  aeApiState *state = eventLoop->apidata;
  int retval, numevents = 0;

  // 等待時(shí)間
  retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
      tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);

  // 有至少一個(gè)事件就緒?
  if (retval > 0) {
    int j;

    // 為已就緒事件設(shè)置相應(yīng)的模式
    // 并加入到 eventLoop 的 fired 數(shù)組中
    numevents = retval;
    for (j = 0; j  numevents; j++) {
      int mask = 0;
      struct epoll_event *e = state->events+j;

      if (e->events  EPOLLIN) mask |= AE_READABLE;
      if (e->events  EPOLLOUT) mask |= AE_WRITABLE;
      if (e->events  EPOLLERR) mask |= AE_WRITABLE;
      if (e->events  EPOLLHUP) mask |= AE_WRITABLE;

      eventLoop->fired[j].fd = e->data.fd;
      eventLoop->fired[j].mask = mask;
    }
  }
  
  // 返回已就緒事件個(gè)數(shù)
  return numevents;
}

執(zhí)行epoll_wait后,就緒的事件會被寫入 eventLoop->apidata->events 事件槽。后面的循環(huán)就是將事件槽中的事件寫入到 eventLoop->fired[] 中。具體描述:每一個(gè)事件都是一個(gè) epoll_event 結(jié)構(gòu),用e來指代,則e.data.fd代表文件描述符,e->events表示其操作碼,將操作碼轉(zhuǎn)化為mask,最后將fd 和 mask 都寫入eventLoop->fired[j]中。

之后,在外層的 aeProcessEvents 方法中會執(zhí)行函數(shù)指針 rfileProc 或者 wfileProc 指向的方法,例如前文提到已注冊的“acceptTcpHandler”。

總結(jié)

Redis的網(wǎng)絡(luò)模塊其實(shí)是一個(gè)簡易的Reactor模式。本文順著“服務(wù)端注冊事件——>接受客戶端連接——>監(jiān)聽事件是否就緒——>執(zhí)行事件”這樣的路線,來分析Redis源碼,描述了Redis接受客戶端connect的過程。實(shí)際上NIO的思想都基本類似。

到此這篇關(guān)于Redis網(wǎng)絡(luò)模型的源碼詳析的文章就介紹到這了,更多相關(guān)Redis網(wǎng)絡(luò)模型源碼內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

您可能感興趣的文章:
  • Redis源碼解析:集群手動故障轉(zhuǎn)移、從節(jié)點(diǎn)遷移詳解
  • Spring AOP實(shí)現(xiàn)Redis緩存數(shù)據(jù)庫查詢源碼
  • redis源碼分析教程之壓縮鏈表ziplist詳解
  • Redisson分布式鎖源碼解析
  • 從源碼解讀redis持久化
  • scrapy-redis源碼分析之發(fā)送POST請求詳解

標(biāo)簽:崇左 湘潭 蘭州 衡水 湖南 銅川 黃山 仙桃

巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《關(guān)于Redis網(wǎng)絡(luò)模型的源碼詳析》,本文關(guān)鍵詞  ;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問題,煩請?zhí)峁┫嚓P(guān)信息告之我們,我們將及時(shí)溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無關(guān)。
  • 相關(guān)文章
  • 收縮
    • 微信客服
    • 微信二維碼
    • 電話咨詢

    • 400-1100-266
    南溪县| 桐城市| 封开县| 罗定市| 松溪县| 韶关市| 讷河市| 丹寨县| 湖州市| 西乡县| 鄱阳县| 灵璧县| 沾化县| 东乡族自治县| 成都市| 富蕴县| 东乌珠穆沁旗| 揭东县| 延边| 甘孜| 芜湖县| 湘乡市| 九江市| 都昌县| 敖汉旗| 甘孜| 晋宁县| 桂平市| 永丰县| 榆社县| 曲周县| 望谟县| 邢台市| 渝中区| 汝阳县| 甘谷县| 烟台市| 永川市| 琼结县| 黎平县| 尚义县|