async-helper
簡介
PHP 的異步進程助手,借助于 AMQP 實現(xiàn)異步執(zhí)行 PHP 的方法,將一些很耗時、追求高可用、需要重試機制的操作放到異步進程中去執(zhí)行,將你的 HTTP 服務(wù)從繁重的業(yè)務(wù)邏輯中解脫出來。以一個較低的成本將傳統(tǒng) PHP 業(yè)務(wù)邏輯轉(zhuǎn)換成非阻塞、高可用、可擴展的異步模式。
依賴
- php 5.6+
- ext-bcmath
- ext-amqp 1.9.1+
- ext-memcached 3.0.3+
安裝
通過 composer 安裝
composer require l669/async-helper
或直接下載項目源碼
wget https://github.com/l669306630/async-helper/archive/master.zip
使用范例
業(yè)務(wù)邏輯:這里定義了很多等待被調(diào)用的類和方法,在你的項目中這可能是數(shù)據(jù)模型、或是一個發(fā)送郵件的類。
<?php class SendMailHelper { /** * @param array $mail * @throws Exception */ public static function request($mail) { // 在這里發(fā)送郵件,或是通過調(diào)用第三方提供的服務(wù)發(fā)送郵件 // 發(fā)送失敗的時候你拋出了異常,希望被進程捕獲,并按設(shè)定的規(guī)則進行重試 } }
生產(chǎn)者:通常是 HTTP 服務(wù),傳統(tǒng)的 PHP 項目或是一個命令行程序,接收到某個請求或指令后進行一系列的操作。
<?php use l669AsyncHelper; class UserController { public function register() { // 假設(shè)這是一個用戶注冊的請求,用戶提交了姓名、郵箱、驗證碼 // 第一步、校驗用戶信息 // 第二步、實例化異步助手,這時候會連接 AMQP $async_helper = new AsyncHelper([ 'host' => '127.0.0.1', 'port' => '5672', 'user' => 'root', 'pass' => '123456', 'vhost' => '/' ]); // 第三步、保存用戶信息到數(shù)據(jù)庫 $mail = [ 'from' => 'service@yourdomain.com', 'to' => 'username@163.com', 'subject' => '恭喜你注冊成功', 'body' => '請點擊郵件中的鏈接完成驗證....' ]; // 第四步、通過異步助手發(fā)送郵件 $async_helper->run('\SendMailHelper', 'request', [$mail]); // 這是同步的模式去發(fā)送郵件,如果郵件服務(wù)響應(yīng)遲緩或異常,就會直接影響該請求的響應(yīng)時間,甚至丟失這封重要郵件 // SendMailHelper::request($mail); } }
消費者:PHP 的異步進程,監(jiān)聽消息隊列,執(zhí)行你指定的方法。并且該消費者進程是可擴展的高可用的服務(wù),這一切都得益于 AMQP,這是系統(tǒng)解耦、布局微服務(wù)的最佳方案。
consume.php
<?php require_once('vendor/autoload.php'); require_once('SendMailHelper.php'); use l669AsyncHelper; use l669CacheHelper; $cache_helper = new CacheHelper('127.0.0.1', 11211); while(true){ try{ $async_helper = new AsyncHelper([ 'host' => '127.0.0.1', 'port' => '5672', 'user' => 'root', 'pass' => '123456', 'vhost' => '/', 'cacheHelper' => $cache_helper ]); $async_helper->consume(); }catch(Exception $e){ // 可以在這里記錄一些日志 sleep(2); } }
# 在命令行下啟動消費者進程,推薦使用 supervisor 來管理進程 php consume.php
支持事務(wù):需要一次提交執(zhí)行多個異步方法,事務(wù)可以確保完成性。
// 接著上面的示例來說,這里省略了一些重復(fù)的代碼,下同 $async_helper->beginTransaction(); try{ $async_helper->run('\SendMailHelper', 'request', [$mail1]); $async_helper->run('\SendMailHelper', 'request', [$mail2]); $async_helper->run('\SendMailHelper', 'request', [$mail3]); $async_helper->commit(); }catch(Exception $e){ $async_helper->rollback(); }
阻塞式重試:當(dāng)異步進程執(zhí)行一個方法,方法內(nèi)部拋出異常時進行重試,一些必須遵循執(zhí)行順序的業(yè)務(wù)就要采用阻塞式的重試,通過指定重試最大阻塞時長來控制。
use l669CacheHelper; use l669AsyncHelper; $async_helper = new AsyncHelper([ 'host' => '127.0.0.1', 'port' => '5672', 'user' => 'root', 'pass' => '123456', 'vhost' => '/', 'cacheHelper' => new CacheHelper('127.0.0.1', 11211), 'retryMode' => AsyncHelper::RETRY_MODE_REJECT, // 阻塞式重試 'maxDuration' => 600 // 最長重試 10 分鐘 ]); $send_mail_helper = new SendMailHelper(); $mail = new stdClass(); $mail->from = 'service@yourdomain.com'; $mail->to = 'username@163.com'; $mail->subject = '恭喜你注冊成功'; $mail->body = '請點擊郵件中的鏈接完成驗證....'; $async_helper->run($send_mail_helper, 'request', [$mail]); // 如果方法中需要拋出異常來結(jié)束程序,又不希望被異步進程重試,可以拋出以下幾種錯誤碼,進程捕獲到這些異常后會放棄重試: // l669AsyncException::PARAMS_ERROR // l669AsyncException::METHOD_DOES_NOT_EXIST // l669AsyncException::KNOWN_ERROR
非阻塞式重試:當(dāng)異步執(zhí)行的方法內(nèi)部拋出異常,async-helper 會將該方法重新放進隊列的尾部,先執(zhí)行新進入隊列的方法,回頭再重試剛才執(zhí)行失敗的方法,通過指定最大重試次數(shù)來控制。【推薦:PHP視頻教程】
use l669CacheHelper; use l669AsyncHelper; $async_helper = new AsyncHelper([ 'host' => '127.0.0.1', 'port' => '5672', 'user' => 'root', 'pass' => '123456', 'vhost' => 'new', 'cacheHelper' => new CacheHelper('127.0.0.1', 11211), 'queueName' => 'emails.vip', // 給付費的大爺走 VIP 隊列 'retryMode' => AsyncHelper::RETRY_MODE_TTL, // 非阻塞式重試 'maxRetries' => 10 // 最多重試 10 次 ]); $mail = new stdClass(); $mail->from = 'service@yourdomain.com'; $mail->to = 'username@163.com'; $mail->subject = '恭喜你注冊成功'; $mail->body = '請點擊郵件中的鏈接完成驗證....'; $async_helper->run('\SendMailHelper', 'request', [$mail]);
應(yīng)用和解惑
- 我們采用的是開源的 RabbitMQ 來為我們提供的 AMQP 服務(wù)。
- 你的項目部署在擁有很多服務(wù)器節(jié)點的集群上,每個節(jié)點的程序都需要寫日志文件,現(xiàn)在的問題就是要收集所有節(jié)點上面的日志到一個地方,方便我們及時發(fā)現(xiàn)問題或是做一些統(tǒng)計。所有節(jié)點都可以使用 async-helper 異步調(diào)用一個寫日志的方法,而執(zhí)行這個寫日志的方法的進程只需要在一臺機器上啟動就可以了,這樣所有節(jié)點的日志就都實時掌握在手里了。
- 做過微信公眾號開發(fā)的都知道,騰訊微信可以將用戶的消息推送到我們的服務(wù)器,如果我們在 5s 內(nèi)未及時響應(yīng),騰訊微信會重試 3 次,其實這就是消息隊列的應(yīng)用,使用 async-helper 可以輕松的做和這一樣的事情。
- 得益于 RabbitMQ,你可以輕松的橫向擴展你的消費者進程的能力,因為 RabbitMQ 天生就支持集群部署,你可以輕松的啟動多個消費者進程,或是將消費者進程分布到多臺機器上。
- 如果 RabbitMQ 服務(wù)不可用怎么辦呢?部署 RabbitMQ 高可用服務(wù)是容易的,對外提供單一 IP,這個 IP 是個負(fù)載均衡,背后是 RabbitMQ 集群,負(fù)載均衡承擔(dān)對后端集群節(jié)點的健康檢查。
- async-helper 能否承受高并發(fā)請求?async-helper 生產(chǎn)者使用的是短連接,也就說在你的 HTTP 還沒有響應(yīng)瀏覽器的時候 async-helper 就已經(jīng)結(jié)束了工作,你連接 RabbitMQ 的時間是百分之百小于 HTTP 請求的時間的,換言之,只要 RabbitMQ 承受并發(fā)的能力超過你的 HTTP 服務(wù)的承受并發(fā)的能力,RabbitMQ 就永遠不會崩,通過橫向擴展 RabbitMQ 很容易做到的。
和傳統(tǒng) PHP 相比
- 對任何 PHP 方法通過反射進行異步執(zhí)行;
- 高可用,執(zhí)行方法進入消息隊列,可持久化,即使服務(wù)器宕機,執(zhí)行任務(wù)也不丟失;
- 高可用,對異??梢赃M行不限次數(shù)和時間的重試,重試次數(shù)和時間可配置;
- 支持對多個異步方法包含在事務(wù)中執(zhí)行,支持回滾事務(wù);
- 方法的參數(shù)類型支持除資源類型(resource)和回調(diào)函數(shù)(callable)外的任意類型的參數(shù);
- 得益于 AMQP,異步方法可以承受高并發(fā)、高負(fù)載,支持集群部署、橫向擴展;
- 低延時,實測延時時間 0.016 ~ 0.021s;
- 適用于:日常數(shù)據(jù)庫操作、日志收集、金融交易、消息推送、發(fā)送郵件和短信、數(shù)據(jù)導(dǎo)入導(dǎo)出、計算大量數(shù)據(jù)生成報表;