PHP 使用 Redis 实现消息队列

2020-03-26 09:24:07 +08:00
 KevinRed

原文排版更整洁哦=>PHP 使用 Redis 实现消息队列

原文排版更整洁哦=>PHP 使用 Redis 实现消息队列

原文排版更整洁哦=>PHP 使用 Redis 实现消息队列

在服务器性能不佳的情况下,高并发访问使用消息队列存储请求,并按一定速率处理是比较好的解决方案

关于 php 多线程和 redis 使用前面文章有介绍,传送门:

PHP 实现多线程

Redis 安装使用 & PHP Redis 插件安装

PHP 使用 Redis 实现分布式锁

设计方案如下:

1.设置标识 flag ( nil 未执行,1 执行中),表示消费线程是否正在运行( flag 的访问需要加锁);并给 flag 设置一个失效时间,避免消费线程意外中断,flag 一直为 1

2.接口接收到请求时,将请求参数存到 redis 的 list 中。请求存到 redis 中后判断 flag,决定是否启动消费线程

3.消费线程从 redis 中取出参数处理,若 list 不为空,重置 flag 失效时间,并置为 1 ;若 list 为空,删除 flag (这里可以加一个循环判断,阻塞一段时间)

本文以一个批量发送邮件的例子说明

有一个发送邮件的接口,在并发访问时,邮件服务器扛不住请求,导致邮件发送失败

接下来上代码

/**
 * 发送邮件接口
 *
 */
    public function addMail(){

        $param = I();

        $redis = new \Redis();
        $redis->connect('127.0.0.1', 6379);
       

        //尝试获取锁
        $lockKey = 'lock';
        $lockExpire = 60;

        $status = true;
        while ($status) {
        
            $lockValue = time() + $lockExpire;
            $lock = $redis->setnx($lockKey, $lockValue);

            if (!empty($lock) || ($redis->get($lockKey) < time() && $redis->getSet($lockKey, $lockValue) < time())) {
                $redis->expire($lockKey, $lockExpire);

                //已获取到锁
                
                $flag = $redis->get('flag');
                
                //请求存入
                $redis->lPush("mail", $param);

                
                //判断是否要启动消费线程
                if (empty($flag)) {

                    //设置 flag
                    $redis->set('flag', 1);
                    
                    //设置 flag 失效时间防止线程死掉
                    $redis->expire($key, 600);
                    
                    //请求消费线程
                    $this->sendHttpRequest("http://127.0.0.1/Home/Index/redis", "POST");
                }

                if ($redis->ttl($lockKey))
                    $redis->del($lockKey);
                $status = FALSE;
            } else {
                sleep(2);
            }
        }
    }



    /**
     * 消费线程
     *
     */
    public function redis(){

        set_time_limit(0);
        ignore_user_abort(true);//设置与客户机断开是否会终止执行

       
        $redis = new \Redis();
        $redis->connect('127.0.0.1', 6379);
        
        $time = 1;//记录阻塞时间
        
        //开始消费
        while (1) {
        
            //取出请求
            $param = $redis->rPop("mail");
            
            if (empty($param)) {
            
                //阻塞一段时间
                if ($time <= 10) {
                    $time++;
                    sleep(10);
                } else {

                    //结束消费,尝试获取锁
                    $lockKey = 'lock';
                    $lockExpire = 60;

                    $status = true;
                    while ($status) {
                        $lockValue = time() + $lockExpire;
                        $lock = $redis->setnx($lockKey, $lockValue);

                        if (!empty($lock) || ($redis->get($lockKey) < time() && $redis->getSet($lockKey, $lockValue) < time())) {
                            $redis->expire($lockKey, $lockExpire);
 
                            
                            //删除 flag
                            $redis->del('flag');

                            if ($redis->ttl($lockKey))
                                $redis->del($lockKey);
                            $status = FALSE;
                        } else {
                            sleep(2);
                        }
                    }


                    break;
                }
            } else {


                //发送邮件
                $result = $this->sendEmail();
                
                $time = 1;//重置阻塞时间
                
                //设置 flag 状态,尝试获取锁
                $lockKey = 'lock';
                $lockExpire = 60;

                $status = true;
                while ($status) {
                    $lockValue = time() + $lockExpire;
                    $lock = $redis->setnx($lockKey, $lockValue);

                    if (!empty($lock) || ($redis->get($lockKey) < time() && $redis->getSet($lockKey, $lockValue) < time())) {
                        $redis->expire($lockKey, $lockExpire);

                        //设置运行状态
                        $redis->set($key, 1);
                        
                        //设置失效时间防止线程死掉
                        $redis->expire($key, 600);

                        if ($redis->ttl($lockKey))
                            $redis->del($lockKey);
                        $status = FALSE;
                    } else {
                        sleep(2);
                    }
                }
            }
        }

    }


     /**
     * 发送邮件
     *
     */    
    function sendEmail(){
        set_time_limit(540);//设置超时时间

        if (Send()) {
            return 1;
        } else {
            return 0;
        }
    }


2808 次点击
所在节点    程序员
4 条回复
win7pro
2020-03-26 10:22:42 +08:00
swoole
KevinRed
2020-03-26 10:33:53 +08:00
@win7pro 这确实是好东西,移植过去太费劲就手动撸了一个
z5864703
2020-03-26 10:54:42 +08:00
有几个改进的地方
1.redis 操作可以使用 lua 脚本来做几个命令组合运行,同时还可以保证原子性,比如 ttl+del 那块操作
2.消费者可以用 cli 单独进程来跑,循环读 list 就好了,而不用通过一次 post 请求来触发。
3.防止进程死掉可以用信号来实现
KevinRed
2020-03-26 13:27:26 +08:00
@z5864703 谢谢大佬指教

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/656252

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX