workerman的redis queue队列消费实例
本文环境 CentOS8.0,PHP8.1,Nginx1.8,Workerman 4.0 不懂的可以评论或联系我邮箱:owen@owenzhang.com 著作权归OwenZhang所有。商业转载请联系OwenZhang获得授权,非商业转载请注明出处。
消息模式介绍
一般来说,消息队列有两种模式:
生产者消费者模式(Queue);
消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。
消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
一种是发布者订阅者模式(Topic);
消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。
和点对点方式不同,发布到topic的消息会被所有订阅者消费。
利用redis这两种场景的消息队列都能实现。
Queue 模式介绍
生产者生产消息放到队列中,多个消费者同时监听队列,谁先抢到消息谁就会从队列中取走消息,即对于每个消息最多只能被一个消费者拥有。
具体的方法就是创建一个任务队列,生产者主动lpush消息,而消费者去rpop数据。但是这样存在一个问题,就是消费者需要主动去请求数据,周期性的请求会造成资源的浪费。如果可以实现一旦有新消息加入队列就通知消费者就好了,这时借助brpop命令就可以实现这样的需求。brpop和rpop命令相似,唯一区别就是当列表中没有元素时,brpop命令会一直阻塞住连接,直到有新元素加入。
config/process.php
// 队列消费
'redis_consumer' => [
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => cpu_count() * 3, // 可以设置多进程
'constructor' => [
// 消费者类目录
'consumer_dir' => app_path() . '/queue/redis',
],
],
config/redis_queue.php
[
'host' => 'redis://127.0.0.1:6379',
'options' => [
'auth' => env('REDIS_PASSWORD',''), // 密码,可选参数
'db' => 3, // 数据库
'max_attempts' => 5, // 消费失败后,重试次数
'retry_seconds' => 5, // 重试间隔,单位秒
]
],
];
config/redis.php
* @copyright walkor
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
return [
'default' => [
'host' => env('REDIS_HOST', '127.0.0.1'),
'password' => env('REDIS_PASSWORD', ''),
'port' => env('REDIS_PORT', '6379'),
'database' => 0,
],
];
app/queue/redis/ConverterSend.php
队列自动消费类
$i,
* ];
* $a = RedisQueueService::instance()->_send($contact);
*
* @throws Exception
*/
public function consume($data)
{
Log::info($data['title']);
}
}
app/service/RedisQueueService.php
队列生产服务类
queue_name = $name;
return $this;
}
/**
* @return string
*/
public function getName(): string
{
return $this->queue_name;
}
/**
* 投递消息
*
* @param array $data
* @param int $delay
* @return bool
*/
public function _send(array $data = [], int $delay = 0): bool
{
$queue = $this->queue_name;
Client::send($queue, $data, $delay); // 投递延迟消息,消息会在 $delay 秒后处理
return true;
}
}
队列生产实例
$contact = [
'title' => 1,
];
RedisQueueService::instance()->_send($contact);
效果消费能力速度
简单消费写入日志
Log::info($data[‘title’]);
服务器配置: 2核2G
队列数量 消费时间
- 1W=2S
- 10W=21S
- 100W=1m33s
效果还是不错的 嘻嘻
Buy me a cup of coffee 🙂
觉得对你有帮助,就给我打赏吧,谢谢!