队列
简介
在构建 Web 应用程序时,可能会遇到一些任务(例如解析和存储上传的 CSV 文件)在典型的 Web 请求中执行耗时过长。幸运的是,Laravel 允许您轻松创建可在后台处理的队列任务。通过将耗时任务移至队列,您的应用程序可以以极快的速度响应 Web 请求,并为客户提供更好的用户体验。
Laravel 队列为各种不同的队列后端(如 Amazon SQS、Redis 甚至关系数据库)提供了统一的队列 API。
Laravel 的队列配置选项存储在应用程序的 config/queue.php 配置文件中。在此文件中,您将找到框架包含的每个队列驱动的连接配置,包括数据库、Amazon SQS、Redis 和 Beanstalkd 驱动,以及一个将立即执行任务的同步驱动(用于开发或测试期间)。还包含一个 null 队列驱动,它会丢弃队列任务。
NOTE
Laravel Horizon 是一个适用于 Redis 驱动队列的精美仪表盘和配置系统。查看完整的 Horizon 文档 了解更多信息。
连接 vs. 队列
在开始使用 Laravel 队列之前,理解“连接”与“队列”之间的区别很重要。在您的 config/queue.php 配置文件中,有一个 connections 配置数组。此选项定义了与后端队列服务(如 Amazon SQS、Beanstalk 或 Redis)的连接。然而,任何一个给定的队列连接可能有多个“队列”,可以将其视为不同堆栈或队列任务的集合。
请注意,queue 配置文件中的每个连接配置示例都包含一个 queue 属性。这是将任务发送到给定连接时的默认队列。换句话说,如果您分发任务时没有明确指定应该分发的队列,任务将被放置在连接配置的 queue 属性定义的队列中:
use App\Jobs\ProcessPodcast;
// 此任务被发送到默认连接的默认队列...
ProcessPodcast::dispatch();
// 此任务被发送到默认连接的 "emails" 队列...
ProcessPodcast::dispatch()->onQueue('emails');某些应用程序可能永远不需要将任务推送到多个队列,而更倾向于使用一个简单的队列。然而,将任务推送到多个队列对于希望按优先级或分段处理任务的应用程序特别有用,因为 Laravel 队列工作器允许您指定按优先级处理哪些队列。例如,如果您将任务推送到 high 队列,您可以运行一个赋予它们更高处理优先级的工作器:
php artisan queue:work --queue=high,default驱动注意事项与前提条件
数据库
要使用 database 队列驱动,您需要一个数据库表来存放任务。通常,这包含在 Laravel 默认的 0001_01_01_000002_create_jobs_table.php 数据库迁移 中;但是,如果您的应用程序不包含此迁移,您可以使用 make:queue-table Artisan 命令创建它:
php artisan make:queue-table
php artisan migrateRedis
要使用 redis 队列驱动,您应该在 config/database.php 配置文件中配置一个 Redis 数据库连接。
WARNING
redis 队列驱动不支持 serializer 和 compression Redis 选项。
Redis 集群
如果您的 Redis 队列连接使用 Redis 集群,您的队列名称必须包含一个 键哈希标签。这是为了确保给定队列的所有 Redis 键都放置在同一个哈希槽中:
'redis' => [
'driver' => 'redis',
'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', '{default}'),
'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
'block_for' => null,
'after_commit' => false,
],阻塞
使用 Redis 队列时,您可以使用 block_for 配置选项指定驱动在工作器循环迭代并重新轮询 Redis 数据库之前,应等待任务变为可用的时间。
根据队列负载调整此值可能比持续轮询 Redis 数据库以获取新任务更高效。例如,您可以将值设置为 5,指示驱动在等待任务可用时应阻塞五秒钟:
'redis' => [
'driver' => 'redis',
'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', 'default'),
'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
'block_for' => 5,
'after_commit' => false,
];WARNING
将 block_for 设置为 0 将导致队列工作器无限期阻塞,直到有任务可用。这也会阻止处理诸如 SIGTERM 之类的信号,直到下一个任务被处理。
其他驱动前提条件
列出的队列驱动需要以下依赖项。这些依赖项可通过 Composer 包管理器安装:
- Amazon SQS:
aws/aws-sdk-php ~3.0 - Beanstalkd:
pda/pheanstalk ~5.0 - Redis:
predis/predis ~2.0或 phpredis PHP 扩展 - MongoDB:
mongodb/laravel-mongodb
创建任务
生成任务类
默认情况下,应用程序的所有可队列化任务都存储在 app/Jobs 目录中。如果 app/Jobs 目录不存在,当您运行 make:job Artisan 命令时将会创建它:
php artisan make:job ProcessPodcast生成的类将实现 Illuminate\Contracts\Queue\ShouldQueue 接口,向 Laravel 表明该任务应推送到队列以异步运行。
NOTE
可以使用 桩文件发布 自定义任务桩文件。
类结构
任务类非常简单,通常只包含一个 handle 方法,该方法在队列处理任务时被调用。让我们来看一个示例任务类。在这个例子中,我们假设管理一个播客发布服务,需要在发布之前处理上传的播客文件:
<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* 创建一个新的任务实例。
*/
public function __construct(
public Podcast $podcast,
) {}
/**
* 执行任务。
*/
public function handle(AudioProcessor $processor): void
{
// 处理上传的播客...
}
}在这个示例中,注意我们能够直接将一个 Eloquent 模型 传递给队列任务的构造函数。由于任务使用了 Queueable trait,Eloquent 模型及其加载的关系在任务处理时会被优雅地序列化和反序列化。
如果您的队列任务在其构造函数中接受一个 Eloquent 模型,只有模型的标识符会被序列化到队列中。当任务实际被处理时,队列系统将自动从数据库中重新检索完整的模型实例及其加载的关系。这种模型序列化方法使得发送到队列驱动的任务载荷小得多。
handle 方法依赖注入
handle 方法在队列处理任务时被调用。注意我们可以在任务的 handle 方法中类型提示依赖项。Laravel 服务容器 会自动注入这些依赖。
如果您希望完全控制容器如何将依赖注入到 handle 方法中,可以使用容器的 bindMethod 方法。bindMethod 方法接受一个回调,该回调接收任务和容器。在回调中,您可以自由地以任何方式调用 handle 方法。通常,您应该在 App\Providers\AppServiceProvider 服务提供者 的 boot 方法中调用此方法:
use App\Jobs\ProcessPodcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Foundation\Application;
$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {
return $job->handle($app->make(AudioProcessor::class));
});WARNING
二进制数据(例如原始图像内容)在传递给队列任务之前,应通过 base64_encode 函数进行处理。否则,任务在放入队列时可能无法正确序列化为 JSON。
队列化关系
由于所有加载的 Eloquent 模型关系在任务排队时也会被序列化,序列化的任务字符串有时可能会变得相当大。此外,当任务被反序列化并从数据库重新检索模型关系时,这些关系将被完整检索。在任务排队过程中模型序列化之前应用的任何先前关系约束,在任务反序列化时将不再应用。因此,如果您希望处理某个关系的子集,应该在队列任务中重新约束该关系。
或者,为了防止关系被序列化,您可以在设置属性值时在模型上调用 withoutRelations 方法。此方法将返回一个不包含已加载关系的模型实例:
/**
* 创建一个新的任务实例。
*/
public function __construct(
Podcast $podcast,
) {
$this->podcast = $podcast->withoutRelations();
}如果你只需要移除特定关联而保留其他关联,可以使用 withoutRelation 方法:
$this->podcast = $podcast->withoutRelation('comments');如果您正在使用 PHP 构造函数属性提升,并且希望表明一个 Eloquent 模型不应序列化其关系,您可以使用 WithoutRelations 属性:
use Illuminate\Queue\Attributes\WithoutRelations;
/**
* 创建一个新的任务实例。
*/
public function __construct(
#[WithoutRelations]
public Podcast $podcast,
) {}为了方便,如果您希望序列化所有模型而不带关系,可以将 WithoutRelations 属性应用于整个类,而不是为每个模型应用该属性:
<?php
namespace App\Jobs;
use App\Models\DistributionPlatform;
use App\Models\Podcast;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Illuminate\Queue\Attributes\WithoutRelations;
#[WithoutRelations]
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* 创建一个新的任务实例。
*/
public function __construct(
public Podcast $podcast,
public DistributionPlatform $platform,
) {}
}如果任务接收的是一个 Eloquent 模型的集合或数组,而不是单个模型,那么当任务被反序列化并执行时,该集合中的模型的关系将不会被恢复。这是为了防止处理大量模型的任务消耗过多资源。
唯一任务
WARNING
唯一任务需要一个支持 锁 的缓存驱动。目前,memcached、redis、dynamodb、database、file 和 array 缓存驱动支持原子锁。
WARNING
唯一任务的约束不适用于批次内的任务。
有时,您可能希望确保在任何时间点队列中只有一个特定任务的实例。您可以通过在任务类上实现 ShouldBeUnique 接口来实现。该接口不需要您在类上定义任何额外的方法:
<?php
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
// ...
}在上面的示例中,UpdateSearchIndex 任务是唯一的。因此,如果该任务的另一个实例已经在队列中且尚未处理完成,则该任务不会被分发。
在某些情况下,您可能希望定义一个特定的“键”使任务唯一,或者希望指定一个超时时间,超过该时间任务不再保持唯一。要实现这一点,您可以使用 UniqueFor 属性并在任务类上定义一个 uniqueId 方法:
<?php
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Queue\Attributes\UniqueFor;
#[UniqueFor(3600)]
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
/**
* 产品实例。
*
* @var \App\Models\Product
*/
public $product;
/**
* 获取任务的唯一 ID。
*/
public function uniqueId(): string
{
return $this->product->id;
}
}在上面的示例中,UpdateSearchIndex 任务按产品 ID 唯一。因此,任何具有相同产品 ID 的新任务分发都将被忽略,直到现有任务完成处理。此外,如果现有任务在一小时内未处理,唯一锁将被释放,另一个具有相同唯一键的任务可以被分发到队列。
WARNING
如果您的应用程序从多个 Web 服务器或容器分发任务,您应确保所有服务器与同一个中央缓存服务器通信,以便 Laravel 能准确判断任务是否唯一。
保持任务唯一直到处理开始
默认情况下,唯一任务在任务完成处理或所有重试尝试失败后“解锁”。但是,可能存在您希望任务在处理之前立即解锁的情况。要实现这一点,您的任务应实现 ShouldBeUniqueUntilProcessing 契约而不是 ShouldBeUnique 契约:
<?php
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
// ...
}唯一任务锁
在幕后,当分发一个 ShouldBeUnique 任务时,Laravel 会尝试获取一个具有 uniqueId 键的 锁。如果该锁已被持有,则任务不会被分发。当任务完成处理或所有重试尝试失败时,该锁将被释放。默认情况下,Laravel 将使用默认缓存驱动获取此锁。但是,如果您希望使用另一个驱动来获取锁,可以定义一个 uniqueVia 方法,返回应使用的缓存驱动:
use Illuminate\Contracts\Cache\Repository;
use Illuminate\Support\Facades\Cache;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
// ...
/**
* 获取用于唯一任务锁的缓存驱动。
*/
public function uniqueVia(): Repository
{
return Cache::driver('redis');
}
}NOTE
如果您只需要限制任务的并发处理,请改用 WithoutOverlapping 任务中间件。
加密任务
Laravel 允许您通过 加密 确保任务数据的隐私和完整性。首先,只需将 ShouldBeEncrypted 接口添加到任务类。一旦将该接口添加到类中,Laravel 将在将任务推送到队列之前自动加密您的任务:
<?php
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueue;
class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted
{
// ...
}任务中间件
任务中间件允许您围绕队列任务的执行封装自定义逻辑,减少任务本身的样板代码。例如,考虑以下 handle 方法,它利用 Laravel 的 Redis 频率限制功能,只允许每五秒处理一个任务:
use Illuminate\Support\Facades\Redis;
/**
* 执行任务。
*/
public function handle(): void
{
Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
info('获得锁...');
// 处理任务...
}, function () {
// 无法获得锁...
return $this->release(5);
});
}虽然这段代码有效,但 handle 方法的实现变得嘈杂,因为它混杂了 Redis 频率限制逻辑。此外,对于任何其他我们想要进行频率限制的任务,都必须重复此频率限制逻辑。与其在 handle 方法中进行频率限制,我们可以定义一个处理频率限制的任务中间件:
<?php
namespace App\Jobs\Middleware;
use Closure;
use Illuminate\Support\Facades\Redis;
class RateLimited
{
/**
* 处理队列任务。
*
* @param \Closure(object): void $next
*/
public function handle(object $job, Closure $next): void
{
Redis::throttle('key')
->block(0)->allow(1)->every(5)
->then(function () use ($job, $next) {
// 获得锁...
$next($job);
}, function () use ($job) {
// 无法获得锁...
$job->release(5);
});
}
}如您所见,类似于 路由中间件,任务中间件接收正在处理的任务和一个应被调用的回调,以继续处理任务。
您可以使用 make:job-middleware Artisan 命令生成一个新的任务中间件类。创建任务中间件后,可以通过从任务的 middleware 方法返回它们来将其附加到任务。make:job Artisan 命令创建的任务上不存在此方法,因此您需要手动将其添加到任务类:
use App\Jobs\Middleware\RateLimited;
/**
* 获取任务应通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new RateLimited];
}NOTE
任务中间件也可以分配给 可队列化的事件监听器、邮件 和 通知。
频率限制
尽管我们刚刚演示了如何编写自己的频率限制任务中间件,但 Laravel 实际上包含了一个您可用于对任务进行频率限制的中间件。像 路由频率限制器 一样,任务频率限制器使用 RateLimiter 门面的 for 方法定义。
例如,您可能希望允许用户每小时备份一次数据,而对高级客户不施加此类限制。要实现这一点,您可以在 AppServiceProvider 的 boot 方法中定义一个 RateLimiter:
use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Support\Facades\RateLimiter;
/**
* 引导任何应用服务。
*/
public function boot(): void
{
RateLimiter::for('backups', function (object $job) {
return $job->user->vipCustomer()
? Limit::none()
: Limit::perHour(1)->by($job->user->id);
});
}在上面的示例中,我们定义了一个小时频率限制;但是,您可以使用 perMinute 方法轻松定义基于分钟的频率限制。此外,您可以向频率限制的 by 方法传递任何您想要的值;但是,此值最常用于按客户细分频率限制:
return Limit::perMinute(50)->by($job->user->id);一旦您定义了频率限制,您就可以使用 Illuminate\Queue\Middleware\RateLimited 中间件将频率限制器附加到您的任务。每次任务超过频率限制时,此中间件将根据频率限制持续时间将任务释放回队列并带适当的延迟:
use Illuminate\Queue\Middleware\RateLimited;
/**
* 获取任务应通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new RateLimited('backups')];
}将频率受限的任务释放回队列仍会增加任务的总 尝试次数。您可能希望相应地调整任务类上的 Tries 和 MaxExceptions 属性。或者,您可能希望使用 retryUntil 方法 定义任务不应再被尝试的时间。
使用 releaseAfter 方法,您还可以指定释放的任务再次尝试之前必须经过的秒数:
/**
* 获取任务应通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new RateLimited('backups'))->releaseAfter(60)];
}如果您不希望任务在频率受限时被重试,可以使用 dontRelease 方法:
/**
* 获取任务应通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new RateLimited('backups'))->dontRelease()];
}使用 Redis 进行频率限制
如果您使用 Redis,可以使用 Illuminate\Queue\Middleware\RateLimitedWithRedis 中间件,该中间件针对 Redis 进行了优化,比基本的频率限制中间件更高效:
use Illuminate\Queue\Middleware\RateLimitedWithRedis;
public function middleware(): array
{
return [new RateLimitedWithRedis('backups')];
}connection 方法可用于指定中间件应使用的 Redis 连接:
return [(new RateLimitedWithRedis('backups'))->connection('limiter')];防止任务重叠
Laravel 包含一个 Illuminate\Queue\Middleware\WithoutOverlapping 中间件,允许您基于任意键防止任务重叠。当队列任务正在修改一个一次只能由一个任务修改的资源时,这很有帮助。
例如,假设您有一个更新用户信用评分的队列任务,并且希望防止同一用户 ID 的信用评分更新任务重叠。要实现这一点,您可以从任务的 middleware 方法返回 WithoutOverlapping 中间件:
use Illuminate\Queue\Middleware\WithoutOverlapping;
/**
* 获取任务应通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new WithoutOverlapping($this->user->id)];
}将重叠的任务释放回队列仍会增加任务的总尝试次数。您可能希望相应地调整任务类上的 Tries 和 MaxExceptions 属性。例如,将 Tries 保留为默认的 1 将阻止任何重叠任务稍后重试。
任何相同类型的重叠任务都将被释放回队列。您还可以指定释放的任务再次尝试之前必须经过的秒数:
/**
* 获取任务应通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
}如果您希望立即删除任何重叠任务,以便它们不会被重试,可以使用 dontRelease 方法:
/**
* 获取任务应通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->dontRelease()];
}WithoutOverlapping 中间件由 Laravel 的原子锁功能提供支持。有时,您的任务可能会意外失败或超时,导致锁未被释放。因此,您可以使用 expireAfter 方法显式定义锁过期时间。例如,下面的示例将指示 Laravel 在任务开始处理三分钟后释放 WithoutOverlapping 锁:
/**
* 获取任务应通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];
}WARNING
WithoutOverlapping 中间件需要一个支持 锁 的缓存驱动。目前,memcached、redis、dynamodb、database、file 和 array 缓存驱动支持原子锁。
跨任务类共享锁键
默认情况下,WithoutOverlapping 中间件仅阻止相同类的重叠任务。因此,尽管两个不同的任务类可能使用相同的锁键,它们不会被阻止重叠。但是,您可以使用 shared 方法指示 Laravel 跨任务类应用该键:
use Illuminate\Queue\Middleware\WithoutOverlapping;
class ProviderIsDown
{
// ...
public function middleware(): array
{
return [
(new WithoutOverlapping("status:{$this->provider}"))->shared(),
];
}
}
class ProviderIsUp
{
// ...
public function middleware(): array
{
return [
(new WithoutOverlapping("status:{$this->provider}"))->shared(),
];
}
}异常节流
Laravel 包含一个 Illuminate\Queue\Middleware\ThrottlesExceptions 中间件,允许您对异常进行节流。一旦任务抛出一定数量的异常,所有进一步执行任务的尝试都将延迟,直到指定的时间间隔过去。此中间件对于与不稳定的第三方服务交互的任务特别有用。
例如,假设一个队列任务与一个开始抛出异常的第三方 API 交互。要对异常进行节流,您可以从任务的 middleware 方法返回 ThrottlesExceptions 中间件。通常,此中间件应与实现了 基于时间的尝试 的任务配对使用:
use DateTime;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* 获取任务应通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new ThrottlesExceptions(10, 5 * 60)];
}
/**
* 确定任务应超时的时间。
*/
public function retryUntil(): DateTime
{
return now()->plus(minutes: 30);
}中间件接受的第一个构造函数参数是任务在被节流之前可以抛出的异常数量,第二个构造函数参数是在任务被节流后再次尝试之前应经过的秒数。在上面的代码示例中,如果任务连续抛出 10 个异常,我们将在再次尝试该任务之前等待 5 分钟,并受 30 分钟时间限制的约束。
当任务抛出异常但异常阈值尚未达到时,任务通常会立即重试。但是,您可以通过在将中间件附加到任务时调用 backoff 方法来指定此类任务应延迟的分钟数:
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* 获取任务应通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 5 * 60))->backoff(5)];
}在内部,此中间件使用 Laravel 的缓存系统来实现频率限制,并且任务类名被用作缓存“键”。您可以通过在将中间件附加到任务时调用 by 方法来覆盖此键。如果您有多个任务与同一个第三方服务交互,并且希望它们共享一个公共的节流“桶”,以确保它们遵守一个共享限制,这可能很有用:
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* 获取任务应通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10 * 60))->by('key')];
}默认情况下,此中间件会对每个异常进行节流。您可以通过在将中间件附加到任务时调用 when 方法来修改此行为。只有当提供给 when 方法的闭包返回 true 时,该异常才会被节流:
use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* 获取任务应通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10 * 60))->when(
fn (Throwable $throwable) => $throwable instanceof HttpClientException
)];
}与 when 方法(将任务释放回队列或抛出异常)不同,deleteWhen 方法允许您在发生给定异常时完全删除任务:
use App\Exceptions\CustomerDeletedException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* 获取任务应通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(2, 10 * 60))->deleteWhen(CustomerDeletedException::class)];
}如果您希望将节流的异常报告给应用程序的异常处理程序,可以通过在将中间件附加到任务时调用 report 方法来实现。或者,您可以为 report 方法提供一个闭包,并且仅当给定闭包返回 true 时才报告异常:
use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* 获取任务应通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10 * 60))->report(
fn (Throwable $throwable) => $throwable instanceof HttpClientException
)];
}使用 Redis 进行异常节流
如果您使用 Redis,可以使用 Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis 中间件,该中间件针对 Redis 进行了优化,比基本的异常节流中间件更高效:
use Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis;
public function middleware(): array
{
return [new ThrottlesExceptionsWithRedis(10, 10 * 60)];
}connection 方法可用于指定中间件应使用的 Redis 连接:
return [(new ThrottlesExceptionsWithRedis(10, 10 * 60))->connection('limiter')];跳过任务
Skip 中间件允许您指定应跳过/删除任务,而无需修改任务的逻辑。Skip::when 方法将在给定条件评估为 true 时删除任务,而 Skip::unless 方法将在条件评估为 false 时删除任务:
use Illuminate\Queue\Middleware\Skip;
/**
* 获取任务应通过的中间件。
*/
public function middleware(): array
{
return [
Skip::when($condition),
];
}您还可以将 Closure 传递给 when 和 unless 方法,以进行更复杂的条件评估:
use Illuminate\Queue\Middleware\Skip;
/**
* 获取任务应通过的中间件。
*/
public function middleware(): array
{
return [
Skip::when(function (): bool {
return $this->shouldSkip();
}),
];
}分发任务
一旦您编写了任务类,您可以使用任务本身的 dispatch 方法分发它。传递给 dispatch 方法的参数将传递给任务的构造函数:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* 存储一个新播客。
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// ...
ProcessPodcast::dispatch($podcast);
return redirect('/podcasts');
}
}如果您希望有条件地分发任务,可以使用 dispatchIf 和 dispatchUnless 方法:
ProcessPodcast::dispatchIf($accountActive, $podcast);
ProcessPodcast::dispatchUnless($accountSuspended, $podcast);在新的 Laravel 应用程序中,database 连接被定义为默认队列。您可以通过更改应用程序 .env 文件中的 QUEUE_CONNECTION 环境变量来指定不同的默认队列连接。
延迟分发
如果您希望指定一个任务不应立即可供队列工作器处理,可以在分发任务时使用 delay 方法。例如,让我们指定一个任务在分发后 10 分钟内不应可用于处理:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* 存储一个新播客。
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// ...
ProcessPodcast::dispatch($podcast)
->delay(now()->plus(minutes: 10));
return redirect('/podcasts');
}
}在某些情况下,任务可能配置了默认延迟。如果您需要绕过此延迟并立即处理任务,可以使用 withoutDelay 方法:
ProcessPodcast::dispatch($podcast)->withoutDelay();WARNING
Amazon SQS 队列服务的最大延迟时间为 15 分钟。
同步分发
如果您希望立即(同步地)分发一个任务,可以使用 dispatchSync 方法。使用此方法时,任务不会被排队,并将在当前进程中立即执行:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* 存储一个新播客。
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// 创建播客...
ProcessPodcast::dispatchSync($podcast);
return redirect('/podcasts');
}
}延迟分发
使用延迟同步分发,您可以分发一个任务以在当前进程中处理,但在 HTTP 响应发送给用户之后。这允许您同步处理“队列”任务,而不会减慢用户的应用程序体验。要延迟同步任务的执行,请将任务分发到 deferred 连接:
RecordDelivery::dispatch($order)->onConnection('deferred');deferred 连接也作为默认的 故障转移队列。
类似地,background 连接在 HTTP 响应发送给用户之后处理任务;但是,该任务在一个单独生成的 PHP 进程中处理,允许 PHP-FPM / 应用程序工作器可用于处理另一个传入的 HTTP 请求:
RecordDelivery::dispatch($order)->onConnection('background');任务与数据库事务
虽然在数据库事务中分发任务是完全可行的,但您应特别注意确保您的任务能够成功执行。在事务中分发任务时,有可能在父事务提交之前,任务就被工作器处理了。当发生这种情况时,您在数据库事务期间对模型或数据库记录所做的任何更新可能尚未反映在数据库中。此外,在事务中创建的任何模型或数据库记录可能在数据库中不存在。
幸运的是,Laravel 提供了几种解决此问题的方法。首先,您可以在队列连接的配置数组中设置 after_commit 连接选项:
'redis' => [
'driver' => 'redis',
// ...
'after_commit' => true,
],当 after_commit 选项为 true 时,您可以在数据库事务中分发任务;但是,Laravel 将等待所有打开的父数据库事务提交后才实际分发任务。当然,如果当前没有打开的数据事务,任务将立即分发。
如果由于事务期间发生的异常而回滚事务,则在该事务期间分发的任务将被丢弃。
NOTE
将 after_commit 配置选项设置为 true 也会导致所有队列化的事件监听器、邮件、通知和广播事件在所有打开的数据库事务提交后才被分发。
内联指定提交分发行为
如果您未将 after_commit 队列连接配置选项设置为 true,您仍然可以指示某个特定任务应在所有打开的数据库事务提交后才被分发。为此,您可以将 afterCommit 方法链式调用到您的分发操作上:
use App\Jobs\ProcessPodcast;
ProcessPodcast::dispatch($podcast)->afterCommit();同样,如果 after_commit 配置选项设置为 true,您可以指示某个特定任务应立即分发,而无需等待任何打开的数据库事务提交:
ProcessPodcast::dispatch($podcast)->beforeCommit();任务链
任务链允许您指定一个队列任务列表,这些任务应在主任务成功执行后按顺序运行。如果序列中的一个任务失败,则其余任务将不会运行。要执行队列任务链,您可以使用 Bus 门面提供的 chain 方法。Laravel 的命令总线是一个较低级别的组件,队列任务分发构建在其之上:
use App\Jobs\OptimizePodcast;
use App\Jobs\ProcessPodcast;
use App\Jobs\ReleasePodcast;
use Illuminate\Support\Facades\Bus;
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->dispatch();除了链式调用任务类实例外,您还可以链式调用闭包:
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
function () {
Podcast::update(/* ... */);
},
])->dispatch();WARNING
在任务中使用 $this->delete() 方法删除任务不会阻止链式任务的执行。只有当链中的某个任务失败时,链才会停止执行。
链的连接和队列
如果您希望指定链式任务应使用的连接和队列,可以使用 onConnection 和 onQueue 方法。除非队列任务被明确分配了不同的连接/队列,否则这些方法将指定应使用的队列连接和队列名称:
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->onConnection('redis')->onQueue('podcasts')->dispatch();向链中添加任务
有时,您可能需要在现有任务链中从该链的另一个任务内部前置或追加一个任务。您可以使用 prependToChain 和 appendToChain 方法来实现:
/**
* 执行任务。
*/
public function handle(): void
{
// ...
// 前置到当前链,在当前任务之后立即运行任务...
$this->prependToChain(new TranscribePodcast);
// 追加到当前链,在链的末尾运行任务...
$this->appendToChain(new TranscribePodcast);
}链失败
当链式任务时,您可以使用 catch 方法指定一个闭包,该闭包应在链中的某个任务失败时被调用。给定的回调将接收导致任务失败的 Throwable 实例:
use Illuminate\Support\Facades\Bus;
use Throwable;
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->catch(function (Throwable $e) {
// 链中的某个任务失败了...
})->dispatch();WARNING
由于链回调被序列化并在稍后由 Laravel 队列执行,您不应在链回调中使用 $this 变量。
自定义队列与连接
分发到特定队列
通过将任务推送到不同的队列,您可以对队列任务进行“分类”,甚至可以优先分配多个工作器到不同的队列。请记住,这不会将任务推送到队列配置文件中定义的不同队列“连接”,而只是推送到单个连接内的特定队列。要指定队列,请在分发任务时使用 onQueue 方法:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* 存储一个新播客。
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// 创建播客...
ProcessPodcast::dispatch($podcast)->onQueue('processing');
return redirect('/podcasts');
}
}或者,您可以在任务的构造函数中调用 onQueue 方法来指定任务的队列:
<?php
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* 创建一个新的任务实例。
*/
public function __construct()
{
$this->onQueue('processing');
}
}分发到特定连接
如果您的应用程序与多个队列连接交互,您可以使用 onConnection 方法指定将任务推送到哪个连接:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* 存储一个新播客。
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// 创建播客...
ProcessPodcast::dispatch($podcast)->onConnection('sqs');
return redirect('/podcasts');
}
}您可以链式调用 onConnection 和 onQueue 方法来指定任务的连接和队列:
ProcessPodcast::dispatch($podcast)
->onConnection('sqs')
->onQueue('processing');或者,您可以在任务的构造函数中调用 onConnection 方法来指定任务的连接:
<?php
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* 创建一个新的任务实例。
*/
public function __construct()
{
$this->onConnection('sqs');
}
}队列路由
您可以使用 Queue 门面的 route 方法为特定任务类定义默认的连接和队列。当您希望确保某些任务始终使用特定队列,而无需在任务上指定连接或队列时,这很有用。
除了路由特定任务类,您还可以将接口、trait 或父类传递给 route 方法。当您这样做时,任何实现该接口、使用该 trait 或扩展该父类的任务将自动使用配置的连接和队列。
通常,您应在服务提供者的 boot 方法中调用 route 方法:
use App\Concerns\RequiresVideo;
use App\Jobs\ProcessPodcast;
use App\Jobs\ProcessVideo;
use Illuminate\Support\Facades\Queue;
/**
* 引导任何应用服务。
*/
public function boot(): void
{
Queue::route(ProcessPodcast::class, connection: 'redis', queue: 'podcasts');
Queue::route(RequiresVideo::class, queue: 'video');
}当指定了连接而没有队列时,任务将被发送到默认队列:
Queue::route(ProcessPodcast::class, connection: 'redis');您还可以通过向 route 方法传递数组来一次路由多个任务类:
Queue::route([
ProcessPodcast::class => ['podcasts', 'redis'], // 队列和连接
ProcessVideo::class => 'videos', // 仅队列(使用默认连接)
]);NOTE
队列路由仍然可以被任务按每个任务覆盖。
指定最大尝试次数/超时值
最大尝试次数
任务尝试是 Laravel 队列系统的核心概念,并为许多高级功能提供支持。虽然它们一开始可能看起来令人困惑,但在修改默认配置之前,理解它们的工作原理很重要。
当一个任务被分发时,它被推送到队列上。然后一个工作器获取它并尝试执行它。这是一次任务尝试。
然而,一次尝试并不一定意味着任务的 handle 方法被执行了。尝试也可以通过多种方式被“消耗”:
- 任务在执行过程中遇到未处理的异常。
- 任务使用
$this->release()手动释放回队列。 - 中间件如
WithoutOverlapping或RateLimited未能获得锁并释放任务。 - 任务超时。
- 任务的
handle方法运行并完成,没有抛出异常。
您可能不希望无限期地尝试一个任务。因此,Laravel 提供了多种方式来指定任务可以被尝试的次数或时间。
NOTE
默认情况下,Laravel 只会尝试一次任务。如果您的任务使用像 WithoutOverlapping 或 RateLimited 这样的中间件,或者您正在手动释放任务,您可能需要通过 tries 选项增加允许的尝试次数。
指定任务可被尝试的最大次数的一种方法是通过 Artisan 命令行上的 --tries 开关。这将应用于由工作器处理的所有任务,除非被处理的任务指定了可被尝试的次数:
php artisan queue:work --tries=3如果任务超过其最大尝试次数,它将被视为“失败”任务。有关处理失败任务的更多信息,请查阅 失败任务文档。如果向 queue:work 命令提供 --tries=0,该任务将被无限期重试。
您可以通过在任务类上使用 Tries 属性定义任务可被尝试的最大次数,从而采取更细粒度的方法。如果在任务上指定了最大尝试次数,它将优先于命令行上提供的 --tries 值:
<?php
namespace App\Jobs;
use Illuminate\Queue\Attributes\Tries;
#[Tries(5)]
class ProcessPodcast implements ShouldQueue
{
// ...
}如果您需要对特定任务的最大尝试次数进行动态控制,可以在任务上定义一个 tries 方法:
/**
* 确定任务可被尝试的次数。
*/
public function tries(): int
{
return 5;
}基于时间的尝试
作为定义任务失败前可尝试次数的替代方法,您可以定义一个任务不应再被尝试的时间。这允许在给定时间范围内尝试任意次数的任务。要定义任务不应再被尝试的时间,请在您的任务类中添加一个 retryUntil 方法。此方法应返回一个 DateTime 实例:
use DateTime;
/**
* 确定任务应超时的时间。
*/
public function retryUntil(): DateTime
{
return now()->plus(minutes: 10);
}如果同时定义了 retryUntil 和 tries,Laravel 会优先考虑 retryUntil 方法。
最大异常次数
有时您可能希望指定一个任务可以被尝试多次,但如果重试是由给定数量的未处理异常触发的(而不是由 release 方法直接释放),则应该失败。要实现这一点,您可以在任务类上使用 Tries 和 MaxExceptions 属性:
<?php
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Illuminate\Queue\Attributes\MaxExceptions;
use Illuminate\Queue\Attributes\Tries;
use Illuminate\Support\Facades\Redis;
#[Tries(25)]
#[MaxExceptions(3)]
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* 执行任务。
*/
public function handle(): void
{
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// 获得锁,处理播客...
}, function () {
// 无法获得锁...
return $this->release(10);
});
}
}在此示例中,如果应用程序无法获得 Redis 锁,任务将释放十秒,并将继续重试最多 25 次。但是,如果任务抛出三个未处理的异常,任务将失败。
超时
通常,您大致知道预期队列任务需要多长时间。因此,Laravel 允许您指定一个“超时”值。默认情况下,超时值为 60 秒。如果任务处理时间超过超时值指定的秒数,处理该任务的工作器将退出并报错。通常,工作器将由 在服务器上配置的进程管理器 自动重启。
任务可以运行的最大秒数可以使用 Artisan 命令行上的 --timeout 开关指定:
php artisan queue:work --timeout=30如果任务因持续超时而超过其最大尝试次数,它将被标记为失败。
您还可以使用任务类上的 Timeout 属性定义任务允许运行的最大秒数。如果在任务上指定了超时,它将优先于命令行上指定的任何超时:
<?php
namespace App\Jobs;
use Illuminate\Queue\Attributes\Timeout;
#[Timeout(120)]
class ProcessPodcast implements ShouldQueue
{
// ...
}有时,IO 阻塞进程(如套接字或传出 HTTP 连接)可能不遵守您指定的超时。因此,在使用这些功能时,您应始终尝试使用它们的 API 来指定超时。例如,使用 Guzzle 时,您应始终指定连接和请求超时值。
超时失败
如果您希望指示一个任务在超时时应被标记为 失败,可以在任务类上使用 FailOnTimeout 属性:
<?php
namespace App\Jobs;
use Illuminate\Queue\Attributes\FailOnTimeout;
#[FailOnTimeout]
class ProcessPodcast implements ShouldQueue
{
// ...
}NOTE
默认情况下,当任务超时时,它会消耗一次尝试并释放回队列(如果允许重试)。但是,如果您将任务配置为在超时时失败,它将不会被重试,无论为尝试设置的值如何。
SQS FIFO 与公平队列
Laravel 支持 Amazon SQS FIFO(先进先出) 队列,允许您按照发送的确切顺序处理任务,同时通过消息去重确保恰好一次处理。
FIFO 队列需要一个消息组 ID 来确定哪些任务可以并行处理。具有相同组 ID 的任务按顺序处理,而具有不同组 ID 的消息可以并发处理。
Laravel 提供了一个流畅的 onGroup 方法,用于在分发任务时指定消息组 ID:
ProcessOrder::dispatch($order)
->onGroup("customer-{$order->customer_id}");SQS FIFO 队列支持消息去重以确保恰好一次处理。在您的任务类中实现一个 deduplicationId 方法,以提供自定义去重 ID:
<?php
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ProcessSubscriptionRenewal implements ShouldQueue
{
use Queueable;
// ...
/**
* 获取任务的去重 ID。
*/
public function deduplicationId(): string
{
return "renewal-{$this->subscription->id}";
}
}FIFO 监听器、邮件和通知
当使用 FIFO 队列时,您还需要在监听器、邮件和通知上定义消息组。或者,您可以将这些对象的队列化实例分发到非 FIFO 队列。
要为 队列化事件监听器 定义消息组,请在监听器上定义一个 messageGroup 方法。您还可以选择定义一个 deduplicationId 方法:
<?php
namespace App\Listeners;
class SendShipmentNotification
{
// ...
/**
* 获取任务的消息组。
*/
public function messageGroup(): string
{
return 'shipments';
}
/**
* 获取任务的去重 ID。
*/
public function deduplicationId(): string
{
return "shipment-notification-{$this->shipment->id}";
}
}当发送将在 FIFO 队列上排队的 邮件消息 时,您应在发送通知时调用 onGroup 方法,并可选择调用 withDeduplicator 方法:
use App\Mail\InvoicePaid;
use Illuminate\Support\Facades\Mail;
$invoicePaid = (new InvoicePaid($invoice))
->onGroup('invoices')
->withDeduplicator(fn () => 'invoices-'.$invoice->id);
Mail::to($request->user())->send($invoicePaid);当发送将在 FIFO 队列上排队的 通知 时,您应在发送通知时调用 onGroup 方法,并可选择调用 withDeduplicator 方法:
use App\Notifications\InvoicePaid;
$invoicePaid = (new InvoicePaid($invoice))
->onGroup('invoices')
->withDeduplicator(fn () => 'invoices-'.$invoice->id);
$user->notify($invoicePaid);队列故障转移
failover 队列驱动在将任务推送到队列时提供自动故障转移功能。如果 failover 配置的主队列连接因任何原因失败,Laravel 将自动尝试将任务推送到列表中的下一个配置连接。这对于在队列可靠性至关重要的生产环境中确保高可用性特别有用。
要配置故障转移队列连接,请指定 failover 驱动,并提供一个要按顺序尝试的连接名称数组。默认情况下,Laravel 在应用程序的 config/queue.php 配置文件中包含一个示例故障转移配置:
'failover' => [
'driver' => 'failover',
'connections' => [
'redis',
'database',
'sync',
],
],一旦您配置了使用 failover 驱动的连接,您需要将故障转移连接设置为应用程序 .env 文件中的默认队列连接,以使用故障转移功能:
QUEUE_CONNECTION=failover接下来,为故障转移连接列表中的每个连接启动至少一个工作器:
php artisan queue:work redis
php artisan queue:work databaseNOTE
您不需要为使用 sync、background 或 deferred 队列驱动的连接运行工作器,因为这些驱动在当前 PHP 进程中处理任务。
当队列连接操作失败并激活故障转移时,Laravel 将调度 Illuminate\Queue\Events\QueueFailedOver 事件,允许您报告或记录队列连接已失败。
NOTE
如果您使用 Laravel Horizon,请记住 Horizon 仅管理 Redis 队列。如果您的故障转移列表包含 database,您应在 Horizon 旁边运行常规的 php artisan queue:work database 进程。
错误处理
如果在处理任务时抛出异常,该任务将自动释放回队列,以便可以再次尝试。该任务将继续被释放,直到达到应用程序允许的最大尝试次数。最大尝试次数由 queue:work Artisan 命令上使用的 --tries 开关定义。或者,最大尝试次数可以在任务类本身上定义。有关运行队列工作器的更多信息 可以在下面找到。
手动释放任务
有时您可能希望手动将任务释放回队列,以便稍后再次尝试。您可以通过调用 release 方法来实现:
/**
* 执行任务。
*/
public function handle(): void
{
// ...
$this->release();
}默认情况下,release 方法会将任务释放回队列以立即处理。但是,您可以通过向 release 方法传递一个整数或日期实例来指示队列在经过一定秒数后才使任务可用于处理:
$this->release(10);
$this->release(now()->plus(seconds: 10));手动标记任务失败
有时您可能需要手动将任务标记为“失败”。为此,您可以调用 fail 方法:
/**
* 执行任务。
*/
public function handle(): void
{
// ...
$this->fail();
}如果您希望因捕获的异常而将任务标记为失败,可以将该异常传递给 fail 方法。或者,为了方便,您可以传递一个字符串错误消息,该消息将为您转换为异常:
$this->fail($exception);
$this->fail('出错了。');NOTE
有关失败任务的更多信息,请查看 处理任务失败文档。
在特定异常上使任务失败
FailOnException 任务中间件 允许您在抛出特定异常时短路重试。这允许对瞬时异常(如外部 API 错误)进行重试,但在持久性异常(如用户权限被撤销)上永久失败任务:
<?php
namespace App\Jobs;
use App\Models\User;
use Illuminate\Auth\Access\AuthorizationException;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Illuminate\Queue\Attributes\Tries;
use Illuminate\Queue\Middleware\FailOnException;
use Illuminate\Support\Facades\Http;
#[Tries(3)]
class SyncChatHistory implements ShouldQueue
{
use Queueable;
/**
* 创建一个新的任务实例。
*/
public function __construct(
public User $user,
) {}
/**
* 执行任务。
*/
public function handle(): void
{
$this->user->authorize('sync-chat-history');
$response = Http::throw()->get(
"https://chat.laravel.test/?user={$this->user->uuid}"
);
// ...
}
/**
* 获取任务应通过的中间件。
*/
public function middleware(): array
{
return [
new FailOnException([AuthorizationException::class])
];
}
}任务批处理
Laravel 的任务批处理功能允许您轻松地并行执行一组任务,然后在任务批次执行完成后执行某些操作。
在开始之前,您应该创建一个数据库迁移,以构建一个包含任务批次元信息(例如它们的完成百分比)的数据表。可以使用 make:queue-batches-table Artisan 命令生成此迁移:
php artisan make:queue-batches-table
php artisan migrate定义可批处理任务
要定义一个可批处理任务,您应该像往常一样 创建一个队列任务;但是,您应该将 Illuminate\Bus\Batchable trait 添加到任务类。此 trait 提供了一个 batch 方法,可用于检索任务正在执行的当前批次:
<?php
namespace App\Jobs;
use Illuminate\Bus\Batchable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ImportCsv implements ShouldQueue
{
use Batchable, Queueable;
/**
* 执行任务。
*/
public function handle(): void
{
if ($this->batch()->cancelled()) {
// 确定批次是否已被取消...
return;
}
// 导入 CSV 文件的一部分...
}
}分发批次
要分发一批任务,您应该使用 Bus 门面的 batch 方法。当然,批处理在与完成回调结合时主要是有用的。因此,您可以使用 then、catch 和 finally 方法来为批次定义完成回调。当调用这些回调时,每个回调将接收一个 Illuminate\Bus\Batch 实例。
当运行多个队列工作器时,批次中的任务将并行处理。因此,任务完成的顺序可能与它们添加到批次的顺序不同。有关如何按顺序运行一系列任务的信息,请查阅我们的 任务链和批次 文档。
在此示例中,我们将假设我们正在排队一批任务,每个任务处理 CSV 文件中的给定行数:
use App\Jobs\ImportCsv;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Throwable;
$batch = Bus::batch([
new ImportCsv(1, 100),
new ImportCsv(101, 200),
new ImportCsv(201, 300),
new ImportCsv(301, 400),
new ImportCsv(401, 500),
])->before(function (Batch $batch) {
// 批次已创建但尚未添加任何任务...
})->progress(function (Batch $batch) {
// 单个任务已成功完成...
})->then(function (Batch $batch) {
// 所有任务成功完成...
})->catch(function (Batch $batch, Throwable $e) {
// 检测到批次任务失败...
})->finally(function (Batch $batch) {
// 批次已完成执行...
})->dispatch();
return $batch->id;批次的 ID,可通过 $batch->id 属性访问,可用于在批次分发后 查询 Laravel 命令总线 以获取有关批次的信息。
WARNING
由于批次回调被序列化并在稍后由 Laravel 队列执行,您不应在回调中使用 $this 变量。此外,由于批处理任务被包装在数据库事务中,不应在任务中执行触发隐式提交的数据库语句。
命名批次
某些工具(如 Laravel Horizon 和 Laravel Telescope)如果批次被命名,可能会为批次提供更友好的调试信息。要为批次分配任意名称,您可以在定义批次时调用 name 方法:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// 所有任务成功完成...
})->name('Import CSV')->dispatch();批次的连接和队列
如果您希望指定批处理任务应使用的连接和队列,可以使用 onConnection 和 onQueue 方法。所有批处理任务必须在相同的连接和队列中执行:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// 所有任务成功完成...
})->onConnection('redis')->onQueue('imports')->dispatch();链与批次
您可以通过将链式任务放在一个数组中,在批次内定义一组 链式任务。例如,我们可以并行执行两个任务链,并在两个任务链都完成处理时执行回调:
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
Bus::batch([
[
new ReleasePodcast(1),
new SendPodcastReleaseNotification(1),
],
[
new ReleasePodcast(2),
new SendPodcastReleaseNotification(2),
],
])->then(function (Batch $batch) {
// 所有任务成功完成...
})->dispatch();相反,您可以通过在链中定义批次,在 链 中运行任务批次。例如,您可以先运行一批任务来发布多个播客,然后运行一批任务来发送发布通知:
use App\Jobs\FlushPodcastCache;
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Support\Facades\Bus;
Bus::chain([
new FlushPodcastCache,
Bus::batch([
new ReleasePodcast(1),
new ReleasePodcast(2),
]),
Bus::batch([
new SendPodcastReleaseNotification(1),
new SendPodcastReleaseNotification(2),
]),
])->dispatch();向批次添加任务
有时,从批处理任务内部向批次添加额外任务可能很有用。当您需要批处理数千个可能在 Web 请求期间分发耗时过长的任务时,此模式很有用。因此,您可能希望分批发一个初始的“加载器”任务批次,该批次用更多任务填充批次:
$batch = Bus::batch([
new LoadImportBatch,
new LoadImportBatch,
new LoadImportBatch,
])->then(function (Batch $batch) {
// 所有任务成功完成...
})->name('Import Contacts')->dispatch();在此示例中,我们将使用 LoadImportBatch 任务用额外的任务填充批次。为此,我们可以在通过任务的 batch 方法访问的批次实例上使用 add 方法:
use App\Jobs\ImportContacts;
use Illuminate\Support\Collection;
/**
* 执行任务。
*/
public function handle(): void
{
if ($this->batch()->cancelled()) {
return;
}
$this->batch()->add(Collection::times(1000, function () {
return new ImportContacts;
}));
}WARNING
您只能从属于同一批次的任务内向批次添加任务。
检查批次
提供给批次完成回调的 Illuminate\Bus\Batch 实例具有各种属性和方法,可帮助您与给定的任务批次交互和检查:
// 批次的 UUID...
$batch->id;
// 批次的名称(如果适用)...
$batch->name;
// 分配给批次的任务数量...
$batch->totalJobs;
// 尚未由队列处理的任务数量...
$batch->pendingJobs;
// 失败的任务数量...
$batch->failedJobs;
// 迄今为止已处理的任务数量...
$batch->processedJobs();
// 批次的完成百分比(0-100)...
$batch->progress();
// 指示批次是否已完成执行...
$batch->finished();
// 取消批次的执行...
$batch->cancel();
// 指示批次是否已被取消...
$batch->cancelled();从路由返回批次
所有 Illuminate\Bus\Batch 实例都是 JSON 可序列化的,这意味着您可以直接从应用程序的路由之一返回它们,以检索包含批次信息(包括其完成进度)的 JSON 载荷。这使得在应用程序的 UI 中显示有关批次完成进度的信息变得很方便。
要按 ID 检索批次,您可以使用 Bus 门面的 findBatch 方法:
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Route;
Route::get('/batch/{batchId}', function (string $batchId) {
return Bus::findBatch($batchId);
});取消批次
有时您可能需要取消给定批次的执行。这可以通过在 Illuminate\Bus\Batch 实例上调用 cancel 方法来实现:
/**
* 执行任务。
*/
public function handle(): void
{
if ($this->user->exceedsImportLimit()) {
$this->batch()->cancel();
return;
}
if ($this->batch()->cancelled()) {
return;
}
}正如您在前面的示例中可能注意到的,批处理任务通常应在继续执行之前确定其对应的批次是否已被取消。但是,为了方便,您可以改为将 SkipIfBatchCancelled 中间件 分配给任务。顾名思义,如果任务的对应批次已被取消,此中间件将指示 Laravel 不处理该任务:
use Illuminate\Queue\Middleware\SkipIfBatchCancelled;
/**
* 获取任务应通过的中间件。
*/
public function middleware(): array
{
return [new SkipIfBatchCancelled];
}批次失败
当批处理任务失败时,catch 回调(如果已分配)将被调用。此回调仅针对批次中第一个失败的任务调用。
允许失败
当批次中的任务失败时,Laravel 将自动将批次标记为“已取消”。如果您愿意,可以禁用此行为,以便任务失败不会自动将批次标记为已取消。这可以通过在分发批次时调用 allowFailures 方法来实现:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// 所有任务成功完成...
})->allowFailures()->dispatch();您可以选择向 allowFailures 方法提供一个闭包,该闭包将在每个任务失败时执行:
$batch = Bus::batch([
// ...
])->allowFailures(function (Batch $batch, $exception) {
// 处理单个任务失败...
})->dispatch();重试失败的批处理任务
为方便起见,Laravel 提供了一个 queue:retry-batch Artisan 命令,允许您轻松重试给定批次的所有失败任务。此命令接受应重试失败任务的批次的 UUID:
php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5修剪批次
如果不进行修剪,job_batches 表可能会非常快速地累积记录。为了缓解这种情况,您应 安排 queue:prune-batches Artisan 命令每天运行:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches')->daily();默认情况下,所有超过 24 小时的已完成批次将被修剪。您可以在调用命令时使用 hours 选项来确定保留批次数据的时间。例如,以下命令将删除所有超过 48 小时前完成的批次:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches --hours=48')->daily();有时,您的 job_batches 表可能会累积从未成功完成的批次的批次记录,例如任务失败且该任务从未成功重试的批次。您可以使用 unfinished 选项指示 queue:prune-batches 命令修剪这些未完成的批次记录:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches --hours=48 --unfinished=72')->daily();同样,您的 job_batches 表也可能累积已取消批次的批次记录。您可以使用 cancelled 选项指示 queue:prune-batches 命令修剪这些已取消的批次记录:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches --hours=48 --cancelled=72')->daily();在 DynamoDB 中存储批次
Laravel 还支持将批次元信息存储在 DynamoDB 中,而不是关系数据库中。但是,您需要手动创建一个 DynamoDB 表来存储所有批次记录。
通常,此表应命名为 job_batches,但您应根据应用程序 queue 配置文件中 queue.batching.table 配置值的值来命名该表。
DynamoDB 批次表配置
job_batches 表应有一个名为 application 的字符串主分区键和一个名为 id 的字符串主排序键。键的 application 部分将包含您的应用程序名称,该名称由应用程序 app 配置文件中的 name 配置值定义。由于应用程序名称是 DynamoDB 表键的一部分,您可以使用同一个表为多个 Laravel 应用程序存储任务批次。
此外,如果您希望利用 自动批次修剪,可以为表定义 ttl 属性。
DynamoDB 配置
接下来,安装 AWS SDK,以便您的 Laravel 应用程序可以与 Amazon DynamoDB 通信:
composer require aws/aws-sdk-php然后,将 queue.batching.driver 配置选项的值设置为 dynamodb。此外,您应在 batching 配置数组中定义 key、secret 和 region 配置选项。这些选项将用于与 AWS 进行身份验证。当使用 dynamodb 驱动时,queue.batching.database 配置选项是不必要的:
'batching' => [
'driver' => env('QUEUE_BATCHING_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'job_batches',
],在 DynamoDB 中修剪批次
当使用 DynamoDB 存储作业批次信息时,用于修剪存储在关系数据库中的批次的典型修剪命令将不起作用。相反,您可以利用 DynamoDB 的原生 TTL 功能 自动删除旧批次的记录。
如果您使用 ttl 属性定义了 DynamoDB 表,则可以定义配置参数来指示 Laravel 如何修剪批次记录。queue.batching.ttl_attribute 配置值定义持有 TTL 的属性的名称,而 queue.batching.ttl 配置值定义在批次记录可以从 DynamoDB 表中删除之前,相对于记录上次更新的时间,应经过的秒数:
'batching' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'job_batches',
'ttl_attribute' => 'ttl',
'ttl' => 60 * 60 * 24 * 7, // 7 天...
],队列化闭包
除了将任务类分发到队列,您还可以分发一个闭包。这对于需要在当前请求周期之外执行的快速、简单任务非常有用。当将闭包分发到队列时,闭包的代码内容会被加密签名,以便在传输过程中不会被修改:
use App\Models\Podcast;
$podcast = Podcast::find(1);
dispatch(function () use ($podcast) {
$podcast->publish();
});要为队列化闭包分配一个名称(可由队列报告仪表板使用,并由 queue:work 命令显示),您可以使用 name 方法:
dispatch(function () {
// ...
})->name('Publish Podcast');使用 catch 方法,您可以提供一个闭包,该闭包应在队列化闭包在耗尽所有队列的 配置重试尝试 后未能成功完成时执行:
use Throwable;
dispatch(function () use ($podcast) {
$podcast->publish();
})->catch(function (Throwable $e) {
// 此任务已失败...
});WARNING
由于 catch 回调被序列化并在稍后由 Laravel 队列执行,您不应在 catch 回调中使用 $this 变量。
运行队列工作器
queue:work 命令
Laravel 包含一个 Artisan 命令,该命令将启动一个队列工作器,并在新任务推送到队列时处理它们。您可以使用 queue:work Artisan 命令运行工作器。请注意,一旦 queue:work 命令启动,它将一直运行,直到手动停止或关闭终端:
php artisan queue:workNOTE
要让 queue:work 进程永久在后台运行,您应该使用进程监视器(如 Supervisor)来确保队列工作器不会停止运行。
如果您希望在处理的任务 ID、连接名称和队列名称包含在命令的输出中,可以在调用 queue:work 命令时包含 -v 标志:
php artisan queue:work -v请记住,队列工作器是长时间运行的进程,并在内存中存储已启动的应用程序状态。因此,它们启动后不会注意到代码库的更改。因此,在部署过程中,请务必 重启您的队列工作器。此外,请记住,您的应用程序创建或修改的任何静态状态都不会在任务之间自动重置。
或者,您可以运行 queue:listen 命令。当使用 queue:listen 命令时,当您想要重新加载更新的代码或重置应用程序状态时,不必手动重启工作器;但是,此命令的效率远低于 queue:work 命令:
php artisan queue:listen运行多个队列工作器
要为队列分配多个工作器并并发处理任务,您只需启动多个 queue:work 进程。这可以在本地通过终端中的多个选项卡完成,或在生产环境中使用进程管理器的配置设置来完成。当使用 Supervisor 时,您可以使用 numprocs 配置值。
指定连接和队列
您还可以指定工作器应使用的队列连接。传递给 work 命令的连接名称应对应于您在 config/queue.php 配置文件中定义的连接之一:
php artisan queue:work redis默认情况下,queue:work 命令仅处理给定连接上的默认队列。但是,您可以通过仅处理给定连接的特定队列来进一步自定义队列工作器。例如,如果您所有的电子邮件都在 redis 队列连接的 emails 队列中处理,您可以发出以下命令来启动仅处理该队列的工作器:
php artisan queue:work redis --queue=emails处理指定数量的任务
--once 选项可用于指示工作器仅从队列中处理单个任务:
php artisan queue:work --once--max-jobs 选项可用于指示工作器处理给定数量的任务然后退出。此选项在与 Supervisor 结合使用时可能很有用,以便您的工人在处理一定数量的任务后自动重启,释放它们可能积累的任何内存:
php artisan queue:work --max-jobs=1000处理所有排队任务然后退出
--stop-when-empty 选项可用于指示工作器处理所有任务,然后正常退出。如果您希望在队列为空后关闭容器,在 Docker 容器内处理 Laravel 队列时,此选项可能很有用:
php artisan queue:work --stop-when-empty处理任务给定秒数
--max-time 选项可用于指示工作器处理任务给定的秒数然后退出。此选项在与 Supervisor 结合使用时可能很有用,以便您的工人在处理一定时间的任务后自动重启,释放它们可能积累的任何内存:
# 处理任务一小时然后退出...
php artisan queue:work --max-time=3600工作器休眠持续时间
当队列中有可用任务时,工作器将继续处理任务,任务之间没有延迟。但是,sleep 选项决定了如果没有可用任务,工作器将“休眠”多少秒。当然,在休眠期间,工作器不会处理任何新任务:
php artisan queue:work --sleep=3维护模式与队列
当您的应用程序处于 维护模式 时,将不会处理任何队列任务。一旦应用程序退出维护模式,任务将继续正常处理。
要强制队列工作器即使在启用维护模式的情况下也处理任务,您可以使用 --force 选项:
php artisan queue:work --force资源考虑
守护进程队列工作器在处理每个任务之前不会“重启”框架。因此,您应在每个任务完成后释放任何重型资源。例如,如果您使用 GD 库 进行图像处理,应在处理完图像后使用 imagedestroy 释放内存。
队列优先级
有时您可能希望优先处理队列的方式。例如,在您的 config/queue.php 配置文件中,您可以将 redis 连接的默认 queue 设置为 low。但是,偶尔您可能希望将任务推送到 high 优先级队列,如下所示:
dispatch((new Job)->onQueue('high'));要启动一个工作器,验证所有 high 队列任务在处理 low 队列上的任何任务之前被处理,请将逗号分隔的队列名称列表传递给 work 命令:
php artisan queue:work --queue=high,low队列工作器与部署
由于队列工作器是长时间运行的进程,如果不重启,它们不会注意到代码的更改。因此,使用队列工作器部署应用程序的最简单方法是在部署过程中重启工作器。您可以通过发出 queue:restart 命令来优雅地重启所有工作器:
php artisan queue:restart此命令将指示所有队列工作器在处理完当前任务后正常退出,以便不会丢失现有任务。由于在执行 queue:restart 命令时队列工作器将退出,您应该运行一个进程管理器(如 Supervisor)来自动重启队列工作器。
NOTE
队列使用 缓存 来存储重启信号,因此在使用此功能之前,您应该验证是否为应用程序正确配置了缓存驱动。
任务过期与超时
任务过期
在您的 config/queue.php 配置文件中,每个队列连接定义了一个 retry_after 选项。此选项指定队列连接在重试正在处理的任务之前应等待的秒数。例如,如果 retry_after 的值设置为 90,则如果任务已处理 90 秒而未释放或删除,它将被释放回队列。通常,您应将 retry_after 值设置为您的任务合理完成处理所需的最大秒数。
WARNING
唯一不包含 retry_after 值的队列连接是 Amazon SQS。SQS 将根据在 AWS 控制台中管理的 默认可见性超时 重试任务。
工作器超时
queue:work Artisan 命令公开了一个 --timeout 选项。默认情况下,--timeout 值为 60 秒。如果任务处理时间超过超时值指定的秒数,处理该任务的工作器将退出并报错。通常,工作器将由 在服务器上配置的进程管理器 自动重启:
php artisan queue:work --timeout=60retry_after 配置选项和 --timeout CLI 选项是不同的,但协同工作以确保任务不会丢失,并且任务只成功处理一次。
WARNING
--timeout 值应始终至少比您的 retry_after 配置值短几秒。这将确保处理冻结任务的工作器总是在任务重试之前被终止。如果您的 --timeout 选项长于您的 retry_after 配置值,您的任务可能会被处理两次。
暂停与恢复队列工作器
有时您可能需要暂时阻止队列工作器处理新任务,而不完全停止工作器。例如,您可能希望在系统维护期间暂停任务处理。Laravel 提供了 queue:pause 和 queue:continue Artisan 命令来暂停和恢复队列工作器。
要暂停特定队列,请提供队列连接名称和队列名称:
php artisan queue:pause database:default在此示例中,database 是队列连接名称,default 是队列名称。一旦队列被暂停,任何从该队列处理任务的工作器将继续完成当前任务,但在队列恢复之前不会接收任何新任务。
要恢复处理已暂停队列上的任务,请使用 queue:continue 命令:
php artisan queue:continue database:default恢复队列后,工作器将立即开始从该队列处理新任务。请注意,暂停队列并不会停止工作器进程本身——它只是阻止工作器处理指定队列的新任务。
工作器重启和暂停信号
默认情况下,队列工作器在每个任务迭代中轮询缓存驱动以获取重启和暂停信号。虽然这种轮询对于响应 queue:restart 和 queue:pause 命令至关重要,但它确实引入了少量性能开销。
如果您需要优化性能并且不需要这些中断功能,可以通过在 Queue 门面上调用 withoutInterruptionPolling 方法来全局禁用此轮询。这通常应在您的 AppServiceProvider 的 boot 方法中完成:
use Illuminate\Support\Facades\Queue;
/**
* 引导任何应用服务。
*/
public function boot(): void
{
Queue::withoutInterruptionPolling();
}或者,您可以通过在 Illuminate\Queue\Worker 类上设置静态属性 $restartable 或 $pausable 来单独禁用重启或暂停轮询:
use Illuminate\Queue\Worker;
/**
* 引导任何应用服务。
*/
public function boot(): void
{
Worker::$restartable = false;
Worker::$pausable = false;
}WARNING
当中断轮询被禁用时,工作器将不会响应 queue:restart 或 queue:pause 命令(取决于禁用了哪些功能)。
Supervisor 配置
在生产环境中,您需要一种方法来保持您的 queue:work 进程运行。queue:work 进程可能因各种原因停止运行,例如工作器超时或执行 queue:restart 命令。
因此,您需要配置一个进程监视器,它可以检测您的 queue:work 进程何时退出并自动重启它们。此外,进程监视器可以允许您指定要同时运行多少个 queue:work 进程。Supervisor 是 Linux 环境中常用的进程监视器,我们将在以下文档中讨论如何配置它。
安装 Supervisor
Supervisor 是 Linux 操作系统的进程监视器,如果您的 queue:work 进程失败,它将自动重启它们。要在 Ubuntu 上安装 Supervisor,您可以使用以下命令:
sudo apt-get install supervisorNOTE
如果自己配置和管理 Supervisor 听起来很繁琐,可以考虑使用 Laravel Cloud,它提供了一个完全托管的平台来运行 Laravel 队列工作器。
配置 Supervisor
Supervisor 配置文件通常存储在 /etc/supervisor/conf.d 目录中。在此目录中,您可以创建任意数量的配置文件,指示 supervisor 如何监视您的进程。例如,让我们创建一个 laravel-worker.conf 文件,启动并监视 queue:work 进程:
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600在此示例中,numprocs 指令将指示 Supervisor 运行八个 queue:work 进程并监视所有进程,如果它们失败则自动重启。您应该更改配置的 command 指令以反映您所需的队列连接和工作器选项。
WARNING
您应确保 stopwaitsecs 的值大于最长运行任务消耗的秒数。否则,Supervisor 可能会在处理完成之前终止任务。
启动 Supervisor
一旦配置文件创建完成,您可以使用以下命令更新 Supervisor 配置并启动进程:
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start "laravel-worker:*"有关 Supervisor 的更多信息,请查阅 Supervisor 文档。
处理失败任务
有时您的队列任务会失败。别担心,事情并不总是按计划进行!Laravel 包含一种方便的方法来 指定任务应被尝试的最大次数。在异步任务超过此尝试次数后,它将被插入 failed_jobs 数据库表中。失败的 同步分发任务 不会存储在此表中,其异常会立即由应用程序处理。
创建 failed_jobs 表的迁移通常已存在于新的 Laravel 应用程序中。但是,如果您的应用程序不包含此表的迁移,您可以使用 make:queue-failed-table 命令创建迁移:
php artisan make:queue-failed-table
php artisan migrate当运行 队列工作器 进程时,您可以使用 queue:work 命令上的 --tries 开关指定任务应被尝试的最大次数。如果您未为 --tries 选项指定值,任务将仅尝试一次,或按照任务类的 Tries 属性指定的次数尝试:
php artisan queue:work redis --tries=3使用 --backoff 选项,您可以指定 Laravel 在重试遇到异常的任务之前应等待多少秒。默认情况下,任务会立即释放回队列,以便可以再次尝试:
php artisan queue:work redis --tries=3 --backoff=3如果您希望在每个任务的基础上配置 Laravel 在重试遇到异常的任务之前应等待的秒数,可以在您的任务类上使用 Backoff 属性:
<?php
namespace App\Jobs;
use Illuminate\Queue\Attributes\Backoff;
#[Backoff(3)]
class ProcessPodcast implements ShouldQueue
{
// ...
}如果您需要更复杂的逻辑来确定任务的回退时间,可以在任务类上定义一个 backoff 方法:
/**
* 计算在重试任务之前等待的秒数。
*/
public function backoff(): int
{
return 3;
}您可以通过定义回退值数组来轻松配置“指数”回退。在此示例中,第一次重试的延迟将为 1 秒,第二次重试为 5 秒,第三次重试为 10 秒,并且如果还有更多剩余尝试,则随后的每次重试都为 10 秒:
<?php
namespace App\Jobs;
use Illuminate\Queue\Attributes\Backoff;
#[Backoff([1, 5, 10])]
class ProcessPodcast implements ShouldQueue
{
// ...
}清理失败任务
当特定任务失败时,您可能希望向用户发送警报或还原任务部分完成的任何操作。要实现这一点,您可以在任务类上定义一个 failed 方法。导致任务失败的 Throwable 实例将被传递给 failed 方法:
<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Throwable;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* 创建一个新的任务实例。
*/
public function __construct(
public Podcast $podcast,
) {}
/**
* 执行任务。
*/
public function handle(AudioProcessor $processor): void
{
// 处理上传的播客...
}
/**
* 处理任务失败。
*/
public function failed(?Throwable $exception): void
{
// 向用户发送失败通知等...
}
}WARNING
在调用 failed 方法之前会实例化一个新的任务实例;因此,在 handle 方法中可能发生的任何类属性修改都将丢失。
失败的任务不一定是遇到未处理异常的任务。当任务耗尽所有允许的尝试次数时,也可能被视为失败。这些尝试可以通过多种方式消耗:
- 任务超时。
- 任务在执行过程中遇到未处理的异常。
- 任务被手动或通过中间件释放回队列。
如果最后一次尝试因任务执行期间抛出的异常而失败,该异常将被传递给任务的 failed 方法。但是,如果任务因达到允许的最大尝试次数而失败,则 $exception 将是 Illuminate\Queue\MaxAttemptsExceededException 的一个实例。类似地,如果任务因超过配置的超时而失败,则 $exception 将是 Illuminate\Queue\TimeoutExceededException 的一个实例。
重试失败任务
要查看已插入 failed_jobs 数据库表中的所有失败任务,您可以使用 queue:failed Artisan 命令:
php artisan queue:failedqueue:failed 命令将列出任务的 ID、连接、队列、失败时间以及其他信息。任务 ID 可用于重试失败的任务。例如,要重试 ID 为 ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 的失败任务,请发出以下命令:
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece如有必要,您可以将多个 ID 传递给该命令:
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d您还可以重试特定队列的所有失败任务:
php artisan queue:retry --queue=name要重试所有失败的任务,请执行 queue:retry 命令并将 all 作为 ID 传递:
php artisan queue:retry all如果您想删除一个失败的任务,可以使用 queue:forget 命令:
php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24dNOTE
当使用 Horizon 时,您应使用 horizon:forget 命令而不是 queue:forget 命令来删除失败的任务。
要从 failed_jobs 表中删除所有失败的任务,您可以使用 queue:flush 命令:
php artisan queue:flushqueue:flush 命令会从您的队列中移除所有失败的任务记录,无论失败任务有多旧。您可以使用 --hours 选项仅删除特定小时数之前或更早失败的任务:
php artisan queue:flush --hours=48忽略缺失模型
当将 Eloquent 模型注入任务时,模型会在放入队列之前自动序列化,并在处理任务时从数据库中重新检索。但是,如果模型在任务等待工作器处理期间被删除,您的任务可能会因 ModelNotFoundException 而失败。
为了方便起见,您可以选择使用任务类上的 DeleteWhenMissingModels 属性自动删除包含缺失模型的任务。当存在此属性时,Laravel 将静默丢弃该任务而不引发异常:
<?php
namespace App\Jobs;
use Illuminate\Queue\Attributes\DeleteWhenMissingModels;
#[DeleteWhenMissingModels]
class ProcessPodcast implements ShouldQueue
{
// ...
}修剪失败任务
您可以通过调用 queue:prune-failed Artisan 命令来修剪应用程序 failed_jobs 表中的记录:
php artisan queue:prune-failed默认情况下,所有超过 24 小时的失败任务记录将被修剪。如果您向命令提供 --hours 选项,则只会保留过去 N 小时内插入的失败任务记录。例如,以下命令将删除所有超过 48 小时前插入的失败任务记录:
php artisan queue:prune-failed --hours=48在 DynamoDB 中存储失败任务
Laravel 还支持将失败任务记录存储在 DynamoDB 中,而不是关系数据库表中。但是,您必须手动创建一个 DynamoDB 表来存储所有失败任务记录。通常,此表应命名为 failed_jobs,但您应根据应用程序 queue 配置文件中 queue.failed.table 配置值的值来命名该表。
failed_jobs 表应有一个名为 application 的字符串主分区键和一个名为 uuid 的字符串主排序键。键的 application 部分将包含您的应用程序名称,该名称由应用程序 app 配置文件中的 name 配置值定义。由于应用程序名称是 DynamoDB 表键的一部分,您可以使用同一个表为多个 Laravel 应用程序存储失败任务。
此外,确保安装 AWS SDK,以便您的 Laravel 应用程序可以与 Amazon DynamoDB 通信:
composer require aws/aws-sdk-php接下来,将 queue.failed.driver 配置选项的值设置为 dynamodb。此外,您应在失败任务配置数组中定义 key、secret 和 region 配置选项。这些选项将用于与 AWS 进行身份验证。当使用 dynamodb 驱动时,queue.failed.database 配置选项是不必要的:
'failed' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'failed_jobs',
],禁用失败任务存储
您可以通过将 queue.failed.driver 配置选项的值设置为 null 来指示 Laravel 丢弃失败任务而不存储它们。通常,这可以通过 QUEUE_FAILED_DRIVER 环境变量实现:
QUEUE_FAILED_DRIVER=null失败任务事件
如果您想注册一个在任务失败时将调用的事件监听器,可以使用 Queue 门面的 failing 方法。例如,我们可以从 Laravel 附带的 AppServiceProvider 的 boot 方法中将一个闭包附加到此事件:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobFailed;
class AppServiceProvider extends ServiceProvider
{
/**
* 注册任何应用服务。
*/
public function register(): void
{
// ...
}
/**
* 引导任何应用服务。
*/
public function boot(): void
{
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}
}从队列中清除任务
NOTE
当使用 Horizon 时,您应使用 horizon:clear 命令而不是 queue:clear 命令来清除队列中的任务。
如果您想从默认连接的默认队列中删除所有任务,可以使用 queue:clear Artisan 命令:
php artisan queue:clear您还可以提供 connection 参数和 queue 选项,以从特定连接和队列中删除任务:
php artisan queue:clear redis --queue=emailsWARNING
从队列中清除任务仅适用于 SQS、Redis 和数据库队列驱动。此外,SQS 消息删除过程最多需要 60 秒,因此在您清除队列后 60 秒内发送到 SQS 队列的任务也可能被删除。
监控队列
如果您的队列突然收到大量任务涌入,可能会不堪重负,导致任务完成等待时间过长。如果您愿意,Laravel 可以在队列任务计数超过指定阈值时提醒您。
首先,您应安排 queue:monitor 命令 每分钟运行一次。该命令接受您希望监视的队列名称以及您期望的任务计数阈值:
php artisan queue:monitor redis:default,redis:deployments --max=100仅安排此命令本身并不足以触发通知提醒您队列过载状态。当命令遇到任务计数超过阈值的队列时,将调度一个 Illuminate\Queue\Events\QueueBusy 事件。您可以在应用程序的 AppServiceProvider 中监听此事件,以便向您或您的开发团队发送通知:
use App\Notifications\QueueHasLongWaitTime;
use Illuminate\Queue\Events\QueueBusy;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Notification;
/**
* 引导任何应用服务。
*/
public function boot(): void
{
Event::listen(function (QueueBusy $event) {
Notification::route('mail', 'dev@example.com')
->notify(new QueueHasLongWaitTime(
$event->connectionName,
$event->queue,
$event->size
));
});
}测试
在测试分发任务的代码时,您可能希望指示 Laravel 不实际执行任务本身,因为任务的代码可以直接测试,并与分发它的代码分开测试。当然,要测试任务本身,您可以实例化一个任务实例,并在测试中直接调用 handle 方法。
您可以使用 Queue 门面的 fake 方法来防止队列任务实际被推送到队列。调用 Queue 门面的 fake 方法后,您可以断言应用程序尝试将任务推送到队列:
<?php
use App\Jobs\AnotherJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
test('订单可以发货', function () {
Queue::fake();
// 执行订单发货...
// 断言没有任务被推送...
Queue::assertNothingPushed();
// 断言任务被推送到给定队列...
Queue::assertPushedOn('queue-name', ShipOrder::class);
// 断言任务被推送
Queue::assertPushed(ShipOrder::class);
// 断言任务被推送了两次...
Queue::assertPushedTimes(ShipOrder::class, 2);
// 断言任务未被推送...
Queue::assertNotPushed(AnotherJob::class);
// 断言闭包被推送到队列...
Queue::assertClosurePushed();
// 断言闭包未被推送...
Queue::assertClosureNotPushed();
// 断言被推送的任务总数...
Queue::assertCount(3);
});<?php
namespace Tests\Feature;
use App\Jobs\AnotherJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
use Tests\TestCase;
class ExampleTest extends TestCase
{
public function test_orders_can_be_shipped(): void
{
Queue::fake();
// 执行订单发货...
// 断言没有任务被推送...
Queue::assertNothingPushed();
// 断言任务被推送到给定队列...
Queue::assertPushedOn('queue-name', ShipOrder::class);
// 断言任务被推送
Queue::assertPushed(ShipOrder::class);
// 断言任务被推送了两次...
Queue::assertPushedTimes(ShipOrder::class, 2);
// 断言任务未被推送...
Queue::assertNotPushed(AnotherJob::class);
// 断言闭包被推送到队列...
Queue::assertClosurePushed();
// 断言闭包未被推送...
Queue::assertClosureNotPushed();
// 断言被推送的任务总数...
Queue::assertCount(3);
}
}您可以向 assertPushed、assertNotPushed、assertClosurePushed 或 assertClosureNotPushed 方法传递一个闭包,以断言推送的任务通过了给定的“真值测试”。如果至少有一个推送的任务通过了给定的真值测试,则断言将成功:
use Illuminate\Queue\CallQueuedClosure;
Queue::assertPushed(function (ShipOrder $job) use ($order) {
return $job->order->id === $order->id;
});
Queue::assertClosurePushed(function (CallQueuedClosure $job) {
return $job->name === 'validate-order';
});伪造部分任务
如果您只需要伪造特定任务,而允许其他任务正常执行,您可以将应伪造的任务类名称传递给 fake 方法:
test('订单可以发货', function () {
Queue::fake([
ShipOrder::class,
]);
// 执行订单发货...
// 断言任务被推送了两次...
Queue::assertPushedTimes(ShipOrder::class, 2);
});public function test_orders_can_be_shipped(): void
{
Queue::fake([
ShipOrder::class,
]);
// 执行订单发货...
// 断言任务被推送了两次...
Queue::assertPushedTimes(ShipOrder::class, 2);
}您可以使用 except 方法伪造所有任务,但指定的一组任务除外:
Queue::fake()->except([
ShipOrder::class,
]);测试任务链
要测试任务链,您需要使用 Bus 门面的伪造功能。Bus 门面的 assertChained 方法可用于断言一个 任务链 已被分发。assertChained 方法接受一个链式任务数组作为其第一个参数:
use App\Jobs\RecordShipment;
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Support\Facades\Bus;
Bus::fake();
// ...
Bus::assertChained([
ShipOrder::class,
RecordShipment::class,
UpdateInventory::class
]);如您在上面的示例中所见,链式任务数组可以是任务类名的数组。但是,您也可以提供实际任务实例的数组。这样做时,Laravel 将确保任务实例属于相同的类,并且具有与应用程序分发的链式任务相同的属性值:
Bus::assertChained([
new ShipOrder,
new RecordShipment,
new UpdateInventory,
]);您可以使用 assertDispatchedWithoutChain 方法断言任务被推送时没有任务链:
Bus::assertDispatchedWithoutChain(ShipOrder::class);测试链修改
如果链式任务 将任务前置或追加到现有链,您可以使用任务的 assertHasChain 方法来断言该任务具有预期的剩余任务链:
$job = new ProcessPodcast;
$job->handle();
$job->assertHasChain([
new TranscribePodcast,
new OptimizePodcast,
new ReleasePodcast,
]);assertDoesntHaveChain 方法可用于断言任务的剩余链为空:
$job->assertDoesntHaveChain();测试链式批次
如果您的任务链 包含一批任务,您可以通过在链断言中插入一个 Bus::chainedBatch 定义来断言链式批次符合您的期望:
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;
Bus::assertChained([
new ShipOrder,
Bus::chainedBatch(function (PendingBatch $batch) {
return $batch->jobs->count() === 3;
}),
new UpdateInventory,
]);测试任务批次
Bus 门面的 assertBatched 方法可用于断言一个 任务批次 已被分发。传递给 assertBatched 方法的闭包接收一个 Illuminate\Bus\PendingBatch 实例,可用于检查批次中的任务:
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;
Bus::fake();
// ...
Bus::assertBatched(function (PendingBatch $batch) {
return $batch->name == 'Import CSV' &&
$batch->jobs->count() === 10;
});hasJobs 方法可用于待处理批次,以验证批次包含预期的任务。该方法接受任务实例、类名或闭包的数组:
Bus::assertBatched(function (PendingBatch $batch) {
return $batch->hasJobs([
new ProcessCsvRow(row: 1),
new ProcessCsvRow(row: 2),
new ProcessCsvRow(row: 3),
]);
});当使用闭包时,闭包将接收任务实例。预期的任务类型将从闭包的类型提示中推断:
Bus::assertBatched(function (PendingBatch $batch) {
return $batch->hasJobs([
fn (ProcessCsvRow $job) => $job->row === 1,
fn (ProcessCsvRow $job) => $job->row === 2,
fn (ProcessCsvRow $job) => $job->row === 3,
]);
});您可以使用 assertBatchCount 方法来断言已分发给定数量的批次:
Bus::assertBatchCount(3);您可以使用 assertNothingBatched 来断言没有分发任何批次:
Bus::assertNothingBatched();测试任务/批次交互
此外,您有时可能需要测试单个任务与其底层批次的交互。例如,您可能需要测试任务是否取消了其批次的进一步处理。要实现这一点,您需要通过 withFakeBatch 方法为任务分配一个伪造的批次。withFakeBatch 方法返回一个包含任务实例和伪造批次的元组:
[$job, $batch] = (new ShipOrder)->withFakeBatch();
$job->handle();
$this->assertTrue($batch->cancelled());
$this->assertEmpty($batch->added);测试任务/队列交互
有时,您可能需要测试队列任务是否 将自己释放回队列。或者,您可能需要测试任务是否删除了自身。您可以通过实例化任务并调用 withFakeQueueInteractions 方法来测试这些队列交互。
一旦任务的队列交互被伪造,您就可以在任务上调用 handle 方法。调用任务后,可以使用各种断言方法来验证任务的队列交互:
use App\Exceptions\CorruptedAudioException;
use App\Jobs\ProcessPodcast;
$job = (new ProcessPodcast)->withFakeQueueInteractions();
$job->handle();
$job->assertReleased(delay: 30);
$job->assertDeleted();
$job->assertNotDeleted();
$job->assertFailed();
$job->assertFailedWith(CorruptedAudioException::class);
$job->assertNotFailed();任务事件
使用 Queue 门面 上的 before 和 after 方法,您可以指定在处理队列任务之前或之后执行的回调。这些回调是执行额外日志记录或为仪表板增加统计信息的好机会。通常,您应在 服务提供者 的 boot 方法中调用这些方法。例如,我们可以使用 Laravel 附带的 AppServiceProvider:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
class AppServiceProvider extends ServiceProvider
{
/**
* 注册任何应用服务。
*/
public function register(): void
{
// ...
}
/**
* 引导任何应用服务。
*/
public function boot(): void
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}
}使用 Queue 门面 上的 looping 方法,您可以指定在工作器尝试从队列中获取任务之前执行的回调。例如,您可以注册一个闭包来回滚先前失败的任务留下的任何未提交事务:
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Queue;
Queue::looping(function () {
while (DB::transactionLevel() > 0) {
DB::rollBack();
}
});