Laravel基于redis队列的解析

编程

为什么使用队列

使用队列的目的一般是:

  1. 异步执行
  2. 出错重试

解释一下:

异步执行: 部分代码执行很耗时, 为了提高响应速度及避免占用过多连接资源, 可以将这部分代码放到队列中异步执行.

Eg. 网站新用户注册后, 需要发送欢迎的邮件, 涉及到网络IO无法控制耗时的这一类就很适合放到队列中来执行.

出错重试: 为了保证一些任务的正常执行, 可以将任务放到队列中执行, 若执行出错则可以延迟一段时间后重试, 直到任务处理成功或出错超过N次后取消执行.

Eg. 用户需要绑定手机号, 此时发送短信的接口是依赖第三方, 一个是不确定耗时, 一个是不确定调用的成功, 为了保证调用成功, 必然需要在出错后重试

Laravel 中的队列

以下分析默认使用的队列及其配置如下

  • 默认队列引擎: redis

通过在

 

redis-cli

 中使用

 

monitor

 命令查看具体执行的命令语句

  • 默认队列名: default

分发任务

此处以分发 异步通知(class XxxNotification implement ShouldQueue)为例.

在Laravel中发起异步通知时, Laravel 会往redis中的任务队列添加一条新任务

redis 执行语句

redis> RPUSH queues:default

{

"displayName": "App\Listeners\RebateEventListener",

"job": "Illuminate\Queue\CallQueuedHandler@call",

"maxTries": null,

"timeout": null,

"timeoutAt": null,

"data": {

"commandName": "Illuminate\Events\CallQueuedListener",

"command": "O:36:"Illuminate\Events\CallQueuedListener":7:{s:5:"class";s:33:"App\Listeners\RebateEventListener";s:6:"method";s:15:"onRebateCreated";s:4:"data";a:1:{i:0;O:29:"App\Events\RebateCreatedEvent":4:{s:11:"u0000*u0000tbkOrder";O:45:"Illuminate\Contracts\Database\ModelIdentifier":3:{s:5:"class";s:19:"App\Models\TbkOrder";s:2:"id";i:416;s:10:"connection";s:5:"mysql";}s:15:"u0000*u0000notifyAdmins";b:1;s:13:"u0000*u0000manualBind";b:0;s:6:"socket";N;}}s:5:"tries";N;s:9:"timeoutAt";N;s:7:"timeout";N;s:6:"u0000*u0000job";N;}"

},

"id": "iTqpbeDqqFb3VoED2WP3pgmDbLAUQcMB",

"attempts": 0

}

上面的redis语句是将任务信息(json格式) rpush 到 redis 队列 queues:default 的尾部.

任务队列 Worker

Laravel 处理任务队列的进程开启方式: php artisan queue:work, 为了更好的观察, 这里使用 --once 选项来指定队列中的单一任务进行处理, 具体的更多参数请自行参考文档

php artisan queue:work --once --delay=1 --tries=3

上述执行语句参数含义:

  1. --once 仅执行一次任务, 默认是常驻进程一直执行
  2. --tries=3 任务出错最多重试3次, 默认是无限制重试
  3. --delay=1 任务出错后, 每次延迟1秒后再次执行, 默认是延迟0秒

当 Worker 启动时, 它依次执行如下步骤:

此处仍以默认队列

 

default

 为例讲解, 且

只讲解redis的相关操作

  1.  queues:default:delayed 有序集合中获取可以处理的 "延迟任务", 并 rpush  queue:default队列的尾部
    具体的执行语句:

redis> eval "Lua脚本" 2 queues:default:delayed queues:default 当前时间戳

Lua 脚本内容如下:

-- Get all of the jobs with an expired "score"...localval = redis.call("zrangebyscore", KEYS[1],"-inf", ARGV[1])-- If we have values in the array, we will remove them from the first queue-- and add them onto thedestination queue in chunks of 100, which moves-- all of the appropriate jobs onto the destination queue very safely.if(next(val) ~=nil)thenredis.call("zremrangebyrank", KEYS[1],0, #val -1)fori =1, #val,100doredis.call("rpush", KEYS[2],unpack(val, i,math.min(i+99, #val)))endendreturnval

 queue:default:reserved有序集合中获取已过期的 "reserved 任务", 并 rpush  queue:default队列的尾部

具体的执行语句:

redis> eval "Lua脚本" 2 queues:default:reserved queues:default 当前时间戳

使用的Lua脚本同步骤 1

 queue:default 队列中获取(lpop)一个任务, 增加其 attempts 次数, 并将该任务保存到 queu:default:reserved 有序集合中, 该任务的 score 值为 当前时间 + 90(任务执行超时时间)

具体的执行语句:

redis> eval “Lua脚本” 2 queues:default queues:default:reserved 任务超时时间戳

Lua脚本

- Pop the first job off of the queue... local job = redis.call("lpop", KEYS[1]) local reserved = false if(job ~= false) then -- Increment the attempt count and place job on the reserved queue... reserved = cjson.decode(job) reserved["attempts"] = reserved["attempts"] + 1 reserved = cjson.encode(reserved) redis.call("zadd", KEYS[2], ARGV[1], reserved) end return {job, reserved}

  1. 这里的 90 是根据配置而定: config("queue.connections.redis.retry_after")
    若预计任务耗时过久, 则应增加该数值, 防止任务还在执行时就被重置
  2. 在成功执行上面获取的任务后, 就将该任务从 queues:default:reserved 队列中移除掉
    具体执行语句: ZREM queues:default:reserved "具体任务"
  3. 如果执行任务失败, 此时分为2种情况:

任务失败次数未达到指定的重试次数阀值
将该任务从 queues:default:reserved 中移除, 并将该任务添加到 queue:default:delayed 有序集合中, score 为该任务下一次执行的时间戳
执行语句:

redis> EVAL "Lua脚本" 2 queues:default:delayed queues:default:reserved "失败的任务" 任务延迟执行的时间戳


Lua脚本

-- Remove the job from the current queue... redis.call("zrem", KEYS[2], ARGV[1]) -- Add the job onto the "delayed" queue... redis.call("zadd", KEYS[1], ARGV[2], ARGV[1]) return true

如果任务失败次数超过指定的重试阀值
将该任务从 queue:default:reserved 中移除
执行语句:

redis> ZREM queue:default:reserved

注意, 上述使用 Lua 脚本的目的在于操作的原子性, Redis 是单进程单线程模式, 以Lua脚本形式执行命令时可以确保执行脚本的原子性, 而不会有并发问题.

以上内容希望帮助到大家,很多PHPer在进阶的时候总会遇到一些问题和瓶颈,业务代码写多了没有方向感,不知道该从那里入手去提升,对此我整理了一些资料,包括但不限于:分布式架构、高可扩展、高性能、高并发、服务器性能调优、TP6,laravel,YII2,Redis,Swoole、Swoft、Kafka、Mysql优化、shell脚本、Docker、微服务、Nginx等多个知识点高级进阶干货需要的可以免费分享给大家,需要戳这里  PHP进阶架构师>>>视频、面试文档免费获取  

以上是 Laravel基于redis队列的解析 的全部内容, 来源链接: utcz.com/z/518826.html

回到顶部