转载 thinkphp 5.1 可参考 https://blog.csdn.net/weixin_43864139/article/details/105146397?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522162203372516780261974942%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=162203372516780261974942&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~baidu_landing_v2~default-1-105146397.first_rank_v2_pc_rank_v29&utm_term=thinkphp5.1+%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97&spm=1018.2226.3001.4187

  https://www.jianshu.com/p/f5e33215c13c

  参考资料

  官方文档 https://github.com/coolseven/notes/tree/master/thinkphp-queue

  是ThinkPHP官方提供的一个消息队列服务,是专门支持队列服务的扩展包。消息队列适用于大并发或返回结果时间比较长且需要批量操作的第三方接口,可用于短信发送、邮件发送、APP推送。消息队列可进行发布、获取、执行、删除、重发、失败处理、延迟执行、超时控制等操作。

  支持消息队列的基本特性

  消息的发布、获取、执行、删除、重发、失败处理、延迟执行、超时控制等

  队列的多队列、内存限制、启动、停止、守护等

  消息队列可降级位同步执行

  首先查看ThinkPHP框架版本,然后进入Packagist官网搜索,并根据ThinkPHP版本选择对应版本。

  地址:https://packagist.org/packages/topthink/think-queue

  本文采用的ThinkPHP的版本为,查询选择的版本为。

  可直接使用Composer为当前项目安装消息队列插件

  也可以项目根目录下文件添加配置项

  添加完成后使用更新中配置项的版本。

  安装完成后,会在项目配置目录下生成配置文件。

  内置了Redis、Database、Topthink、Sync四种驱动

  Redis驱动

  如果组件使用Redis驱动,那么需要提前安装Redis服务以及PHP的Redis扩展。

  安装Redis服务

  本机采用的是Windows系统,安装Redis服务首先需要获取安装包,可在GitHub官网搜索Redis下载解压安装。

  Redis 下载地址:https://github.com/microsoftarchive/redis

  关于安装配置的细节这里过度赘述,详情可参见《Redis安装配置》。

  安装Redis可视化管理工具

  Redis Desktop Manager 下载地址:https://github.com/uglide/RedisDesktopManager/releases

  PHP安装Redis扩展

  php-redis扩展下载地址:https://pecl.php.net/package/redis

  修改配置文件

  配置文件中的任务过期时间需要重点关注,这里的任务实际上指代的就是消息。由于采用Redis驱动,消息队列中的消息会保存到Redis的List数据结构中。

  参数用于指定任务的过期时间,单位为秒。那么什么是过期任务呢?过期任务是任务的状态为执行中,任务的开始时刻 + 过期时间 > 当前时刻。

  为时表示不会检查过期的任务telegram的官网最新在哪呢,执行超时的任务会一直留在消息队列中,需要开发者另行处理(删除或重发),因此性能相对较高。

  不为则表示会在每次获取下一个任务之前检查并重发过期(执行超时)的任务。

  消息与队列的保存方式

  img

  Redis中消息队列名称

  在Redis中每一个队列都有三个与之对应,以队列为例,在Redis中保存的方式如下:

  键名为,类型为列表,表示待执行的任务列表

  键名为,类型为有序集合,表示延迟执行和定时执行的任务集合。

  键名为,类型为有序集合,表示执行中的任务集合。

  注意使用冒号分隔符,只是用来表示相关键名的关联性,本身是没有特殊含义的,这是一种常见组织的方式。

  在有序集合中每个元素代表要给任务,该元素的为队列的入队时间戳,任务的为JSON格式,保存了任务的执行情况和业务数据。

  Redis驱动下为了实现任务的延迟执行和过期重发,任务将在这三个键中来回转移。

  Database驱动

  驱动是采用数据库做消息队列缓存,相比较Redis而言是不推荐。

  使用数据库驱动需要创建存放消息的数据表

  消息和队列的保存方式

  在Database驱动中,每个任务对应到表的一行,字段telegram 下载地方是多少用来区分不同的队列,字段保存了消息的执行者和业务数据,字段采用JSON格式的字符串来保存消息。
telegram中文网站
  img

  目录结构

  消息队列中的角色

  类名的角色为命令行,负责解析命令行参数,控制队列的启动和重启。

  类名的角色为驱动,负责队列的创建以及消息的入队出队等操作。

  类型的角色为任务,负责将消息转化为一个任务对象,供消费者使用。

  生产者负责消息的创建与发布

  消费者负责任务的接收与执行

  img

  类关系

  执行流程

  命令行开始监听队列

  执行进程获取新消息

  消息队列返回一个可用的实例 3.1 生产者推送新消息到消息队列 3.2 消息队列返回是否推送成功给生产者

  执行进程调用的方法

  消息解析的,实例化一个消费者,并调用消费者实例的方法。

  消费者读取消息内容,处理业务逻辑,删除或重发该消息 或 。

  消息从Database或Redis中删除消息或重发消息

  消息返回消息处理结果给执行进程

  执行进程在终端输出响应或结束运行

  消息的创建与推送

  消息的消费与删除

  任务发布

  任务处理

  注意:这里会将消息与任务视为同一概念

  消息创建与推送

  在业务控制器中创建一个新消息并推送到指定的队列中

  首先创建消息需要引入类

  创建消息时需指定当前消息所归属消息队列的名称

  如果是Redis驱动对应的就是数据列表的名称

  img

  Redis中消息队列名称

  如果是Database驱动对应的就是表中字段中的内容

  创建消息时需要指定当前消息将会由哪个类来负责处理(消费者),当轮到该消息时,系统将生成一个该类的实例,并调用其方法。

  这里是采用手动指定消息处理类的方式,更合理的做法是事先定义好消息名称与消费者类名的映射关系,然后根据某个可以获取该映射关系的类来推送消息,这样生产者只需要知道消息的名称,而无需指定具体哪个消费者来处理。

  创建消息时需要指定当前任务所需的业务数据,注意数据不能是资源类型,业务数据最终将转化为形式的字符串。

  最后将创建的消息推送到消息队列并等待对应的消费者去执行

  使用方法将消息推送到消息队列,其返回值根据驱动不同而不同,如果是Redis驱动则成功返回随机字符串失败返回,如果是Database驱动则成功返回失败返回。

  例如:在游戏结束后,大厅服务器会发送游戏战绩数据给HTTP接口,接口获取数据后对其进行加工处理最终得到入库所需的数据,期间还会涉及到向第三方接口推送数据等等。如果采用同步处理的方式,大厅服务器只有等到所有的处理完毕后才能得到得到结构,由于大厅服务器会根据接口返回的数据判断当前战绩是否写入成功,若接口返回数据时间过长,此时服务端将一直处于等待状态,连接不会被断开,这种情况对于使用越来越频繁的接口来说,几乎是一种噩梦。

  完整代码

  消息的消费与删除

  创建消费者类,用于处理队列中的任务。

  创建消费者类,并编写方法。

  发布任务

  访问接口查看推送是否成功

  处理任务

  切换到当前终端到项目根目录

  实际使用过程中应安装这样的通用进程管理工具,它会监控的进程,一旦失败会帮助重启,详情可参见 《Supervisor》 。

  简单来总结下使用流程

  安装Supervisor并编写应用程序配置脚本,脚本主要用来运行命令。

  运行Supervisor服务,它会读取主进程和应用程序配置。

  运行自己编写的消息队列并根据日志查看是否正常运行

  Work模式 queue:work

  用于启动一个工作进程来处理消息队列

  参数说明

  是否循环执行,如果不加该参数则该命令处理完下一个消息就退出。

  要处理的队列的名称

  如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0。

  系统处于维护状态时,是否仍然处理任务,并未找到相关说明。

  该进程允许使用的内存上限,以M为单位。

  如果队列中无任务则sleep多少秒后重新检查(work+daemon模式)或退出(listen或非daemon模式)

  如果任务已经超过尝试次数上限,则触发“任务尝试数超限”事件,默认为0。

  Daemon模式的执行流程

  img

  Daemon模式

  命令行参数

  是否一直执行

  处理的队列的名称

  重发失败任务时延迟多少秒才执行

  系统处于维护状态时是否仍然处理任务

  该进程允许使用的内存上限,以为单位。

  入股队列中无任务则多少秒后重新检查

  任务重发多少次之后进入失败处理逻辑

  如何从缓冲中得到上次重启的时间?

  从缓存得到上次重启的事件

  如何判断是否退出循环呢?

  内存超限: >= 参数

  重启时间有更新:外部通过命令更新了重启时间的缓存

  Listen模式 queue:listen

  用于启动一个进程,然后由进程通过来周期性地创建一次性的进程来消费消息队列,并且限制该进程的执行事件,同时通过管道来监听进程的输出。

  监听队列的名称

  如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0。

  该进程允许使用的内存上限,以为单位。

  如果队列中无任务,则多长时间后重新检查。

  如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0。

  工作进程允许执行的最长时间,以秒telegram的官网的最新下载地址是多少为单位。

  Work模式和Listen模式的异同点

  两者都可以用于处理消息队列中的任务,区别在于:

  执行原理不同

  Work模式是单进程的处理模式,按照是否设置参数又可以分为单次执行和循环执行两种模式。单次执行不添加参数,该模式下Work进程在从处理完下一个消息后直接结束当前进程。当队列为空时会一段时间然后退出。循环执行添加了参数,该模式下Work进程会循环地处理队列中的消息直到内存超出参数配置才结束进程。当队列为空时会在每次循环中一段时间。

  Listen命令是“双进程+管道”的处理模式,Listen命令所在的进程会循环地创建单次执行模式的Work进程,每次创建的Work进程只消费一个消息就会结束,然后Listen进程再创建一个新的Work进程。Listen进程会定时检查当前的Work进程执行时间是否超过了参数的值,如果已经超过则Listen进程会杀掉所有Work进程,然后抛出异常。Listen进程会通过管道来监听当前的Work进程的输出,当Work进程有输出时Listen进程会将输出写入到。Listen进程会定时通过函数来监控当前的Work进程是否仍再运行,Work进程消费完一个任务之后,Work进程就结束了,其状态会变成,此时Listen进程就会重新创建一个新的Work进程并对其计时,新的Work进程开始消费下一个任务。

  结束时机不同

  Listen命令中Listen进程和Work进程会在以下情况下结束:Listen进程会定时检查当前的Work进程的执行时间是否超过了参数的值,如果已经超时此时Listen进程会杀掉当前的Work进程,然后抛出一个异常并结束Listen进程。Listen进程会定时检查自身使用的内存是否超过了参数的值,如果已经超过此时Listen进程会直接掉,Work进程也会自动结束。

  性能不同

  Work命令是在脚本内部做循环,框架脚本在命名执行的初期就已经加载完毕。而Listen模式则是处理完一个任务之后新开一个Work进程,此时会重新加载框架脚本。因此Work模式的性能会比Listen模式高。注意当代码有更新时Work模式下需要手动去执行命令重启队列来使改动生效,而Listen模式会自动生效无需其它操作。

  超时控制能力

  Work模式本质上既不能控制进程自身的运行时间,也无法限制执行中的任务的执行时间。举例来说,假如在某次上线后消费者的方法中添加一段死循环。

  这个循环将永远不能停止,直到任务所在的进程超过内存限制或者由管理员手动结束。这个过程不会由任何的警告。更严重的是如果配置了,那么这个死循环的任务可能会污染到同样处理队列的其它Work进程,最后好几个Work进程将被卡死在这段死循环中。

  Work模式下的超时控制能力实际应理解为多个Work进程配合下的过期任务重发能力。

  Listen命令可以限制Listen进程创建的Work进程的最大执行时间,Listen命令可以通过参数限制Work进程允许运行的最长时间,超过该时间限制后Work进程会被强制杀死,Listen进程本身也会抛出异常并结束。

  与之间的区别

  在配置文件中设置,在Listen命令的命令行参数中设置。和是两个不同层次上的概念:是指任务的过期时间,这个时间是全局的影响到所有的Work进程,不管是独立的Work命令还是Listen模式下创建的Work进程。针对的对象是任务。是指Work进程的超时时间,这个时间只针对当前执行的Listen命令有效,针对的对象是Work进程。

  使用场景不同

  Work命令的适用场景是任务数量较多、性能要求较高、任务的执行时间较短、消费者类中不存在死循环///等容易导致bug的逻辑。

  Listen命令的适用场景是任务数量较少、任务的执行时间较长(如生成大型的Excel报表等)、任务的执行时间需要有严格限制。

  消息队列处理一个任务的具体流程

  img

  消息队列处理一个任务的具体流程

  重发超时的任务

  超时任务是指任务处于执行中,当前时间 – 任务开始执行的时刻 > 时间
telegram的官网的最新下载的网址是多少
  重发是指将任务的状态还原为未执行,并将任务的已执行次数加1。

  获取下一个有效任务

  有效任务是指未执行、最早可执行的时间 <= 当前时间、按时间先后排序(先进先出)

  任务次数超限逻辑

  任务的已尝试次数大于命令行中的参数,命令行中的参数大于0。

  触发次数超限事件 内置的次数超限事件标签,是否定义了行为,未定义则不处理直接返回,已定义则对次数超限的任务进行处理。

  消费当前的任务

  从对象的属性中解析出消费者类,创建消费者类的实例,执行消费者类的实例的方法。

  需要在中手动删除任务,参数表示当前任务对象,参数表示当前的任务数据即创建队列时传入的参数。

  消息队列的开始、停止、重启

  开始一个消息队列

  停止所有的消息队列

  重启所有的消息队列

  多模块多任务的处理

  多模块

  单模块项目推荐时间作为任务类的命名空间,多任务项目可使用作为任务类的命名空间,也可以放在任意可以自动加载到的地方。

  多任务

  如果一个任务类中有多个小任务的话,在发布任务的时候,需要使用任务的“类名@方法名”的形式,例如,注意命令行中的参数不执行的解析。

  消息的延迟执行与定时任务

  延迟执行是相对于即使执行的,是用来限制某个任务的最早可执行时刻。在到达该时刻之前该任务会被跳过,可以利用该功能实现定时任务。

  延迟执行的使用方式

  在生产者业务代码中

  在消费者类中

  在命令行中

  如果消费者类的方法抛出了异常且任务未被删除时,将自动重发该任务。重发时会设置下次执行前延迟多少秒,默认为0。

  消息重发时机有三种情况:

  在消费者类中手动重发

  Work进程自动重发需要同时满足以下两个条件:消费者类的方法抛出异常、任务未被删除

  当配置了不为时,Work进程内部每次查询可用任务之前会自动重发已过期的任务。

  注意:在Database模式下,2.7.1和2.7.2中的重发逻辑是先删除原来的任务,然后插入一个新的任务。2.7.3中的重发机制是直接更新原任务。在Redis模式下,三种重发都是先删除再插入。不管是那种重发方式,重发之后任务的已尝试次数会在原来的基础上加1。

  此外,消费者类中需要注意,如果方法中抛出异常,将出现两种情况:

  如果不需要自动重发的话,请在抛出异常之前将任务删除,否则会被框架自动重发。

  如果需要自动重发的话,请直接抛出异常,不要在方法中手动使用,这样将导致任务被重发两次,产生两个一样的新任务。

  Redis驱动下的任务重发细节

  在Redis驱动下为了实现任务的延迟执行和过期重发,任务将在这三个中来回转移。

  在Database模式下消息处理的消息流程中,如果配置的不是那么的进程每次在获取下一个可执行任务之前,会先尝试重发所有过期的任务。而在Redis驱动下这个步骤则做了更多的事情,详情如下:

  从的中查询出有哪些任务在当前时刻已经可以开始执行,然后将这些任务转移到的的尾部。

  从的中查询出有哪些任务在当前时刻已经过期,然后将这些任务转移到的的尾部。

  尝试从的的头部取出一个任务,如果取出成功,则将这个任务转移到的的头部,同时将这个任务实例化成任务对象,交给消费者去执行。

  Redis队列中的过期任务重发步骤,执行前:

  img

  Redis队列中的过期任务重发步骤,执行前

  Redis队列中的过期任务重发步骤,执行后:telegram 的官网的最新的下载地址在哪里

  img

  Redis队列中的过期任务重发步骤,执行后telegram的的官网下载网站怎么找

  当同时满足以下条件时将触发任务失败回调:

  命令行的参数的值大于0

  任务的已尝试次数大于命令行的参数

  开发者添加了事件标签及其对应的回调代码

  消费者类中定义了方法用于接收任务失败的通知

  注意,标签需要在安装之后手动去文件中添加。

  -任务完成后使用删除任务

  在消费者类的方法中使用检查任务已执行次数,对于次数异常的做相应的处理。

  在消费者类的方法在中根据业务数据来判断该任务是否已经执行过,以避免该任务被重复执行。

  编写失败回调事件将事件中失败的任务及时通知给开发人员

  队列的稳定性和拓展性

  稳定性:不管是模式还是模式,建议使用或自定义的脚本去定时检查进程是否正常。

  拓展性:当某个队列的消费者不足时在给这个队列添加进程即可

  最好配置本地的Redis,使用远程Redis曾出现无法解释的原因。

  停止原正在运行的

  重新加载服务

  未完待续…

  **

telegram的官网网站在哪呢  19人点赞**

  **

  **mq

  **

  更多精彩内容,就在简书APP

  

  "小礼物走一走,来简书关注我"

  共1人赞赏

  

  JunChow520阅读提升上限,实践突破下限。

  总资产913 (约49.33元)共写了129.3W字获得2,182个赞共957个粉丝

  全部评论2只看作者按时间倒序按时间正序

  img
telegram的官方 下载在哪呢
  For_All_I_Care

  3楼 02.25 15:25

  怎么composer成功的?我在tp6安装报错 Your requirements could not be resolved to an installable set of packages. Problem 1 – topthink/think-queue[v3.0.0, …, v3.0.5] require symfony/process ^4.2 -> found symfony/process[v4.2.0, …, v4.4.19] but it conflicts with your root composer.json require (^5.2). – Root composer.json requires topthink/think-queue ^3.0 -> satisfiable by topthink/think-queue[v3.0.0, …, v3.0.5]. Use the option –with-all-dependencies (-W) to allow upgrades, downgrades and removals for packages currently locked to specific versions. Installation failed, reverting https://blog.csdn.net/qq_54337367/article/details/composer.json and https://blog.csdn.net/qq_54337367/article/details/composer.lock to their original content.

  ** 赞** 回复

  img

  hainuo

  2楼 2020.03.15 10:58

  不错这是一篇非常详细的使用指引了,很棒 不知道对于失败任务是如何处理? 我使用mysql作为队列的原因就是因为这个事情 失败的任务,可以重新执行一次 (也用过其他的方式比如 钉钉通知 然后发现失败的数据 查找是一个问题 )

  ** 赞** 回复

  被以下专题收入,发现更多相似内容telegram的的官网的最新下载地方在哪里

  imgphp

  推荐阅读更多精彩内容**

  TP5学习笔记九 队列

  依赖 redis-server redis php扩展 参考此教程 安装 composer 最好通过compose…

  imghfm0922阅读 1,951评论 0赞 1

  thinkphp-queue 浅析

  传统的程序执行流程一般是 即时|同步|串行的,在某些场景下,会存在并发低,吞吐量低,响应时间长等问题。在大型系统中…

  img红尘一落君莫笑阅读 10,331评论 2赞 5

  img

  Thinkphp 3.2.3 redis缓存

  转载00天火00文章保存一下先。。 说明 好像是tp3.2的bug,自带的redis驱动不是那么好用。。。找了方法…

  imggeeooooz阅读 1,895评论 0赞 2

  img

  redis常用知识点

  1.1 资料 ,最好的入门小册子,可以先于一切文档之前看,免费。 作者Antirez的博客,Antirez维护的R…

  imgJefferyLcm阅读 16,272评论 1赞 52

  2017-10-26

  今天早上我还在睡梦中,突然手机响了。睁开睡眼,发现是同事来的,一分钟接完电话,看向旁边的儿子。发现原本熟睡在我旁边…

  img蝴蝶的颜色_aff4阅读 41评论 0赞 0

  img

  imgJunChow520关注总资产913 (约49.33元)Go protobuf阅读 807vite postcss阅读 22typescript type阅读 24推荐阅读laravel中队列的使用阅读 51每日一学:你知道如何在 RabbitMQ 中实现 Work queues工作队列模式吗?阅读 83Kafka 客户端生产者使用详解(Java)阅读 181Dubbo——时间轮(Time Wheel)算法应用阅读 391史上最便捷搭建 Zookeeper 的方法!阅读 162

  **

  **评论2

  **赞19

  **

seo