佳木斯湛栽影视文化发展公司

主頁 > 知識庫 > swoole_process實(shí)現(xiàn)進(jìn)程池的方法示例

swoole_process實(shí)現(xiàn)進(jìn)程池的方法示例

熱門標(biāo)簽:科大訊飛語音識別系統(tǒng) 電子圍欄 團(tuán)購網(wǎng)站 阿里云 服務(wù)器配置 Linux服務(wù)器 銀行業(yè)務(wù) Mysql連接數(shù)設(shè)置

swoole —— 重新定義PHP

swoole 的進(jìn)程之間有兩種通信方式,一種是消息隊(duì)列(queue),另一種是管道(pipe),對swoole_process 的研究在swoole中顯得尤為重要。

預(yù)備知識

IO多路復(fù)用

swoole 中的io多路復(fù)用表現(xiàn)為底層的 epoll進(jìn)程模型,在C語言中表現(xiàn)為 epoll 函數(shù)。

  • epoll 模型下會持續(xù)監(jiān)聽自己名下的素有socket 描述符 fd
  • 當(dāng)觸發(fā)了 socket 監(jiān)聽的事件時(shí),epoll 函數(shù)才會響應(yīng),并返回所有監(jiān)聽該時(shí)間的 socket 集合
  • epoll 的本質(zhì)是阻塞IO,它的優(yōu)點(diǎn)在于能同事處理大量socket連接

Event loop 事件循環(huán)

swoole 對 epoll 實(shí)現(xiàn)了一個(gè)Reactor線程模型封裝,設(shè)置了read事件和write事件的監(jiān)聽回調(diào)函數(shù)。(詳見swoole_event_add)

  • Event loop 是一個(gè)Reactor線程,其中運(yùn)行了一個(gè)epoll實(shí)例。
  • 通過swoole_event_add將socket描述符的一個(gè)事件添加到epoll監(jiān)聽中,事件發(fā)生時(shí)將執(zhí)行回調(diào)函數(shù)
  • 不可用于fpm環(huán)境下,因?yàn)閒pm在任務(wù)結(jié)束時(shí)可能會關(guān)掉進(jìn)程。

swoole_process

  • 基于C語言封裝的進(jìn)程管理模塊,方便php來調(diào)用
  • 內(nèi)置管道、消息隊(duì)列接口,方便實(shí)現(xiàn)進(jìn)程間通信

我們在php-fpm.conf配置文件中發(fā)現(xiàn),php-fpm中有兩種進(jìn)程池管理設(shè)置。

  • 靜態(tài)模式 即初始化固定的進(jìn)程數(shù),當(dāng)來了一個(gè)請求時(shí),從中選取一個(gè)進(jìn)程來處理。
  • 動態(tài)模式 指定最小、最大進(jìn)程數(shù),當(dāng)請求量過大,進(jìn)程數(shù)不超過最大限制時(shí),新增線程去處理請求

接下來用swoole代碼來實(shí)現(xiàn),這里只是為理解swoole_process、進(jìn)程間通信、定時(shí)器等使用,實(shí)際情況使用封裝好的swoole_server來實(shí)現(xiàn)task任務(wù)隊(duì)列池會更方便。

假如有個(gè)定時(shí)投遞的任務(wù)隊(duì)列:

?php

/**
 * 動態(tài)進(jìn)程池,類似fpm
 * 動態(tài)新建進(jìn)程
 * 有初始進(jìn)程數(shù),最小進(jìn)程數(shù),進(jìn)程不夠處理時(shí)候新建進(jìn)程,不超過最大進(jìn)程數(shù)
 */

// 一個(gè)進(jìn)程定時(shí)投遞任務(wù)

/**
 * 1. tick
 * 2. process及其管道通訊
 * 3. event loop 事件循環(huán)
 */
class processPool
{
  private $pool;

  /**
   * @var swoole_process[] 記錄所有worker的process對象
   */
  private $workers = [];

  /**
   * @var array 記錄worker工作狀態(tài)
   */
  private $used_workers = [];

  /**
   * @var int 最小進(jìn)程數(shù)
   */
  private $min_woker_num = 5;

  /**
   * @var int 初始進(jìn)程數(shù)
   */
  private $start_worker_num = 10;

  /**
   * @var int 最大進(jìn)程數(shù)
   */
  private $max_woker_num = 20;

  /**
   * 進(jìn)程閑置銷毀秒數(shù)
   * @var int
   */
  private $idle_seconds = 5;

  /**
   * @var int 當(dāng)前進(jìn)程數(shù)
   */
  private $curr_num;

  /**
   * 閑置進(jìn)程時(shí)間戳
   * @var array
   */
  private $active_time = [];

  public function __construct()
  {
    $this->pool = new swoole_process(function () {
      // 循環(huán)建立worker進(jìn)程
      for ($i = 0; $i  $this->start_worker_num; $i++) {
        $this->createWorker();
      }
      echo '初始化進(jìn)程數(shù):' . $this->curr_num . PHP_EOL;
      // 每秒定時(shí)往閑置的worker的管道中投遞任務(wù)
      swoole_timer_tick(1000, function ($timer_id) {
        static $count = 0;
        $count++;
        $need_create = true;
        foreach ($this->used_workers as $pid => $used) {
          if ($used == 0) {
            $need_create = false;
            $this->workers[$pid]->write($count . ' job');
            // 標(biāo)記使用中
            $this->used_workers[$pid] = 1;
            $this->active_time[$pid] = time();
            break;
          }
        }
        foreach ($this->used_workers as $pid => $used)
          // 如果所有worker隊(duì)列都沒有閑置的,則新建一個(gè)worker來處理
          if ($need_create  $this->curr_num  $this->max_woker_num) {
            $new_pid = $this->createWorker();
            $this->workers[$new_pid]->write($count . ' job');
            $this->used_workers[$new_pid] = 1;
            $this->active_time[$new_pid] = time();
          }

        // 閑置超過一段時(shí)間則銷毀進(jìn)程
        foreach ($this->active_time as $pid => $timestamp) {
          if ((time() - $timestamp) > $this->idle_seconds  $this->curr_num > $this->min_woker_num) {
            // 銷毀該進(jìn)程
            if (isset($this->workers[$pid])  $this->workers[$pid] instanceof swoole_process) {
              $this->workers[$pid]->write('exit');
              unset($this->workers[$pid]);
              $this->curr_num = count($this->workers);
              unset($this->used_workers[$pid]);
              unset($this->active_time[$pid]);
              echo "{$pid} destroyed\n";
              break;
            }
          }
        }

        echo "任務(wù){(diào)$count}/{$this->curr_num}\n";

        if ($count == 20) {
          foreach ($this->workers as $pid => $worker) {
            $worker->write('exit');
          }
          // 關(guān)閉定時(shí)器
          swoole_timer_clear($timer_id);
          // 退出進(jìn)程池
          $this->pool->exit(0);
          exit();
        }
      });

    });

    $master_pid = $this->pool->start();
    echo "Master $master_pid start\n";

    while ($ret = swoole_process::wait()) {
      $pid = $ret['pid'];
      echo "process {$pid} existed\n";
    }
  }

  /**
   * 創(chuàng)建一個(gè)新進(jìn)程
   * @return int 新進(jìn)程的pid
   */
  public function createWorker()
  {
    $worker_process = new swoole_process(function (swoole_process $worker) {
      // 給子進(jìn)程管道綁定事件
      swoole_event_add($worker->pipe, function ($pipe) use ($worker) {
        $data = trim($worker->read());
        if ($data == 'exit') {
          $worker->exit(0);
          exit();
        }
        echo "{$worker->pid} 正在處理 {$data}\n";
        sleep(5);
        // 返回結(jié)果,表示空閑
        $worker->write("complete");
      });
    });

    $worker_pid = $worker_process->start();

    // 給父進(jìn)程管道綁定事件
    swoole_event_add($worker_process->pipe, function ($pipe) use ($worker_process) {
      $data = trim($worker_process->read());
      if ($data == 'complete') {
        // 標(biāo)記為空閑
//        echo "{$worker_process->pid} 空閑了\n";
        $this->used_workers[$worker_process->pid] = 0;
      }
    });

    // 保存process對象
    $this->workers[$worker_pid] = $worker_process;
    // 標(biāo)記為空閑
    $this->used_workers[$worker_pid] = 0;
    $this->active_time[$worker_pid] = time();
    $this->curr_num = count($this->workers);
    return $worker_pid;
  }

}

new processPool();

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

您可能感興趣的文章:
  • 詳解PHP swoole process的使用方法

標(biāo)簽:江蘇 棗莊 萍鄉(xiāng) 廣元 衡水 蚌埠 衢州 大理

巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《swoole_process實(shí)現(xiàn)進(jìn)程池的方法示例》,本文關(guān)鍵詞  ;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問題,煩請?zhí)峁┫嚓P(guān)信息告之我們,我們將及時(shí)溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無關(guān)。
  • 相關(guān)文章
  • 收縮
    • 微信客服
    • 微信二維碼
    • 電話咨詢

    • 400-1100-266
    西藏| 尼玛县| 南雄市| 延吉市| 栾城县| 滨州市| 彰武县| 迭部县| 阿拉善左旗| 襄垣县| 永川市| 正镶白旗| 从化市| 巴林右旗| 克拉玛依市| 云龙县| 博客| 武强县| 金湖县| 石门县| 平果县| 西昌市| 田东县| 安多县| 石景山区| 呼伦贝尔市| 定远县| 吴桥县| 兰考县| 克什克腾旗| 永清县| 梧州市| 济南市| 台北市| 利津县| 堆龙德庆县| 洪江市| 富蕴县| 奈曼旗| 衢州市| 周宁县|