消息隊(duì)列的概念、原理、實(shí)現(xiàn)方式
概念
- 隊(duì)列結(jié)構(gòu)的一個(gè)中間件
- 不需要立即消費(fèi)消息
- 由消費(fèi)者或者訂閱者進(jìn)行按順序消費(fèi)
基本的流程圖如下所示
- 流程
應(yīng)用場景
- 冗余
- 解耦
- 流量削峰
- 異步通信
實(shí)現(xiàn)方式
- mysql:可靠、速度慢
- redis:速度快,對于大消息包處理較慢
- 消息系統(tǒng):可靠、專業(yè)性強(qiáng)
消息的觸發(fā)機(jī)制
- 死循環(huán)的方式,故障時(shí)無法及時(shí)恢復(fù)
- 定時(shí)任務(wù):壓力均分、但是處理量有上限
- 守護(hù)進(jìn)程的方式
解耦 (訂單和配送系統(tǒng))
-
架構(gòu)設(shè)計(jì)1 采用定時(shí)任務(wù)的方式
php入門到就業(yè)線上直播課:進(jìn)入學(xué)習(xí)
Apipost = Postman + Swagger + Mock + Jmeter 超好用的API調(diào)試工具:點(diǎn)擊使用 -
使用配送處理系統(tǒng)進(jìn)行處理時(shí),將當(dāng)前數(shù)據(jù)庫里需要處理的訂單狀態(tài)更新為2,待處理完成后將狀態(tài)設(shè)為1
-
可以每次指定更新多少條數(shù)據(jù)
流量削鋒 (redis實(shí)現(xiàn)秒殺)
-
使用隊(duì)列的數(shù)據(jù)結(jié)構(gòu)
- lpush/rpush 將數(shù)據(jù)放入列表中
- lpop/rpop 將數(shù)據(jù)移除列表并獲取到移除的值
- ltrim 保留指定區(qū)間內(nèi)的元素
- llen 獲取列表長度
- lset 通過索引設(shè)置列表的值
- lindex 通過索引獲取列表中的值
- lrange 獲取指定范圍的元素
-
圖示如下
-
代碼流程如下
-
秒殺程序?qū)⒄埱髮懭雛edis(uid,time)
-
檢查redis列表存放的長度,超過10個(gè)直接舍棄
-
通過死循環(huán)讀取redis數(shù)據(jù),并存入數(shù)據(jù)庫
// Spike.php 秒殺程序if(Redis::llen('lottery') < 10){ // 成功 Redis::lpush('lottery', $uid.'%'.microtime());}else{ // 失敗}
登錄后復(fù)制// Warehousing.php 入庫程序while(true){ $user = Redis::rpop('lottery'); if (!$user || $user == 'nil') { sleep(2); continue; } $user_arr = explode($user, '%'); $insert_user = [ 'uid' => $user_arr[0], 'time' => $user_arr[1] ]; $res = DB::table('lottery_queue')->insert($insert_user); if (!$res) { Redis::lpush('lottery', $user); }}
登錄后復(fù)制
-
-
上述代碼中假如并發(fā)過大的話會(huì)存在超賣的情況,此時(shí)可以使用文件鎖或者redis分布式鎖進(jìn)行控制,先將商品放入redis list中 使用rpop進(jìn)行取出,如果取不到則說明已經(jīng)賣完
-
具體的思路及偽代碼如下
// 先將商品放入redis中 $goods_id = 2; $sql = select id,num from goods where id = $goods_id; $res = DB::select($sql); if (!empty($res)) { // 也可以指定多少件 Redis::del('lottery_goods' . $goods_id); for($i=0;$i<$res['num'];$i++){ Redis::lpush('lottery_goods . $goods_id', $i); } LOG::info('商品存入隊(duì)列成功,數(shù)量:' . Redis::llen('lottery_goods . $goods_id')); } else { LOG::info($goods_id . '加入失敗'); }
登錄后復(fù)制// 開始秒殺 $count = Redis::rpop('lottery_goods' . $goods_id); if (!$count) { // 商品已搶完 ... } // 用戶搶購隊(duì)列 $user_list = 'user_goods_id_' . $goods_id; $user_status = Redis::sismember($user_list, $user_id); if ($user_status) { // 已搶過 ... } // 將搶到的放到列表中 Redis::sadd($user_list, $uid); $msg = '用戶:' . $uid . '順序' . $count; Log::info($msg); // 生成訂單等 ... // 減庫存 $sql = update goods set num = num -1 where id = $goods_id and num > 0; // 防止超賣 DB::update($sql) // 搶購成功
登錄后復(fù)制
rabbitmq
-
架構(gòu)及原理
其中P代表生產(chǎn)者,X為交換機(jī)(channal),C代表消費(fèi)者 -
簡單使用
// Send.php require_once __DIR__.'/vendor/autoload.php'; use PhpAmqpLibConnectionAMQPStreamConnection; use PhpAmqpLibMessageAMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 創(chuàng)建通道 $channel = $connection->channel(); // 聲明一個(gè)隊(duì)列 $channel->queue_declare('user_email', false, false, false, false); // 制作消息 $msg = new AMQPMessage('send email'); // 將消息推送到隊(duì)列 $channel->basic_publish($msg, '', 'user_email'); echo '[x] send email'; $channel->close(); $connection->close();
登錄后復(fù)制// Receive.php require_once __DIR__.'/vendor/autoload.php'; use PhpAmqpLibConnectionAMQPStreamConnection; use PhpAmqpLibMessageAMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); //創(chuàng)建通道 $channel = $connection->channel(); $channel->queue_declare('user_email', false, false, false, false); // 當(dāng)收到消息時(shí)的回調(diào)函數(shù) $callback = function($msg){ //發(fā)送郵件 echo 'Received '.$msg->body.'n'; }; $channel->basic_consume('user_email', '', false, true, false, false, $callback); // 保持監(jiān)聽狀態(tài) while($channel->is_open()){ $channel->wait(); }
登錄后復(fù)制