workerman的redis queue队列消费实例

本文环境 CentOS8.0,PHP8.1,Nginx1.8,Workerman 4.0 不懂的可以评论或联系我邮箱:owen@owenzhang.com 著作权归OwenZhang所有。商业转载请联系OwenZhang获得授权,非商业转载请注明出处。

workerman的redis queue队列消费实例插图

消息模式介绍

一般来说,消息队列有两种模式:

生产者消费者模式(Queue); workerman的redis queue队列消费实例插图1

消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。

消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

一种是发布者订阅者模式(Topic);

workerman的redis queue队列消费实例插图2

消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。

和点对点方式不同,发布到topic的消息会被所有订阅者消费。

利用redis这两种场景的消息队列都能实现。

Queue 模式介绍

生产者生产消息放到队列中,多个消费者同时监听队列,谁先抢到消息谁就会从队列中取走消息,即对于每个消息最多只能被一个消费者拥有。

具体的方法就是创建一个任务队列,生产者主动lpush消息,而消费者去rpop数据。但是这样存在一个问题,就是消费者需要主动去请求数据,周期性的请求会造成资源的浪费。如果可以实现一旦有新消息加入队列就通知消费者就好了,这时借助brpop命令就可以实现这样的需求。brpop和rpop命令相似,唯一区别就是当列表中没有元素时,brpop命令会一直阻塞住连接,直到有新元素加入。

config/process.php

    // 队列消费
    'redis_consumer' => [
        'handler'     => Webman\RedisQueue\Process\Consumer::class,
        'count'       => cpu_count() * 3, // 可以设置多进程
        'constructor' => [
            // 消费者类目录
            'consumer_dir' => app_path() . '/queue/redis',
        ],
    ],

config/redis_queue.php

 [
        'host'    => 'redis://127.0.0.1:6379',
        'options' => [
            'auth'          => env('REDIS_PASSWORD',''),            // 密码,可选参数
            'db'            => 3,                       // 数据库
            'max_attempts'  => 5,                       // 消费失败后,重试次数
            'retry_seconds' => 5,                       // 重试间隔,单位秒
        ]
    ],
];

config/redis.php


 * @copyright walkor
 * @link      http://www.workerman.net/
 * @license   http://www.opensource.org/licenses/mit-license.php MIT License
 */

return [
    'default' => [
        'host'     => env('REDIS_HOST', '127.0.0.1'),
        'password' => env('REDIS_PASSWORD', ''),
        'port'     => env('REDIS_PORT', '6379'),
        'database' => 0,
    ],
];

app/queue/redis/ConverterSend.php

队列自动消费类

 $i,
     * ];
     * $a = RedisQueueService::instance()->_send($contact);
     *
     * @throws Exception
     */
    public function consume($data)
    {
        Log::info($data['title']);
    }
}

app/service/RedisQueueService.php

队列生产服务类

queue_name = $name;
        return $this;
    }

    /**
     * @return string
     */
    public function getName(): string
    {
        return $this->queue_name;
    }

    /**
     * 投递消息
     *
     * @param array $data
     * @param int   $delay
     * @return bool
     */
    public function _send(array $data = [], int $delay = 0): bool
    {
        $queue = $this->queue_name;
        Client::send($queue, $data, $delay);      // 投递延迟消息,消息会在 $delay 秒后处理
        return true;
    }
}

队列生产实例

$contact = [
            'title' => 1,
        ];
RedisQueueService::instance()->_send($contact);

效果消费能力速度

简单消费写入日志

Log::info($data[‘title’]);

服务器配置: 2核2G

队列数量 消费时间

  • 1W=2S
  • 10W=21S
  • 100W=1m33s

效果还是不错的 嘻嘻

Buy me a cup of coffee 🙂

觉得对你有帮助,就给我打赏吧,谢谢!

微信赞赏码链接,点击跳转:

workerman的redis queue队列消费实例插图3