Skip to content
赞助商赞助商赞助商
虚位以待
虚位以待
虚位以待
虚位以待
虚位以待
虚位以待
虚位以待
虚位以待
虚位以待
虚位以待
虚位以待
虚位以待
虚位以待
虚位以待
虚位以待

队列

介绍

在构建您的 Web 应用程序时,您可能会遇到一些任务,例如解析和存储上传的 CSV 文件,这些任务在典型的 Web 请求期间执行时间过长。幸运的是,Laravel 允许您轻松创建可以在后台处理的队列作业。通过将耗时的任务移到队列中,您的应用程序可以以极快的速度响应 Web 请求,并为客户提供更好的用户体验。

Laravel 队列提供了一个统一的队列 API,支持多种不同的队列后端,例如 Amazon SQSRedis 或者甚至是关系数据库。

Laravel 的队列配置选项存储在应用程序的 config/queue.php 配置文件中。在此文件中,您将找到每个包含在框架中的队列驱动程序的连接配置,包括数据库、Amazon SQSRedisBeanstalkd 驱动程序,以及一个同步驱动程序,该驱动程序将立即执行作业(用于本地开发期间)。还包括一个 null 队列驱动程序,该驱动程序会丢弃队列中的作业。

NOTE

Laravel 现在提供了 Horizon,这是一个用于 Redis 驱动队列的美丽仪表板和配置系统。查看完整的 Horizon 文档 以获取更多信息。

连接与队列

在开始使用 Laravel 队列之前,了解“连接”和“队列”之间的区别非常重要。在您的 config/queue.php 配置文件中,有一个 connections 配置数组。此选项定义了与后端队列服务(如 Amazon SQS、Beanstalk 或 Redis)的连接。然而,任何给定的队列连接可能有多个“队列”,可以被视为不同的堆栈或队列作业的堆。

请注意,queue 配置文件中的每个连接配置示例都包含一个 queue 属性。这是将作业发送到给定连接时将被调度到的默认队列。换句话说,如果您在调度作业时没有明确定义应将其调度到哪个队列,则作业将被放置在连接配置的 queue 属性中定义的队列中:

php
use App\Jobs\ProcessPodcast;

// 这个作业被发送到默认连接的默认队列...
ProcessPodcast::dispatch();

// 这个作业被发送到默认连接的“emails”队列...
ProcessPodcast::dispatch()->onQueue('emails');

某些应用程序可能不需要将作业推送到多个队列,而是更喜欢拥有一个简单的队列。然而,将作业推送到多个队列对于希望优先处理或分段处理作业的应用程序特别有用,因为 Laravel 队列工作者允许您按优先级指定应处理哪些队列。例如,如果您将作业推送到 high 队列,您可以运行一个工作者以给予它们更高的处理优先级:

shell
php artisan queue:work --queue=high,default

驱动程序说明和先决条件

数据库

为了使用 database 队列驱动程序,您需要一个数据库表来保存作业。通常,这包含在 Laravel 的默认 0001_01_01_000002_create_jobs_table.php 数据库迁移 中;但是,如果您的应用程序不包含此迁移,您可以使用 make:queue-table Artisan 命令来创建它:

shell
php artisan make:queue-table

php artisan migrate

Redis

为了使用 redis 队列驱动程序,您应该在 config/database.php 配置文件中配置一个 Redis 数据库连接。

WARNING

redis 队列驱动程序不支持 serializercompression Redis 选项。

Redis 集群

如果您的 Redis 队列连接使用 Redis 集群,您的队列名称必须包含一个 key hash tag。这是为了确保给定队列的所有 Redis 键都放置在同一个哈希槽中:

php
'redis' => [
    'driver' => 'redis',
    'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
    'queue' => env('REDIS_QUEUE', '{default}'),
    'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
    'block_for' => null,
    'after_commit' => false,
],

阻塞

使用 Redis 队列时,您可以使用 block_for 配置选项来指定驱动程序在进入工作者循环并重新轮询 Redis 数据库之前应等待作业可用的时间。

根据您的队列负载调整此值可能比持续轮询 Redis 数据库以获取新作业更有效。例如,您可以将值设置为 5,以指示驱动程序在等待作业可用时应阻塞五秒钟:

php
'redis' => [
    'driver' => 'redis',
    'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
    'queue' => env('REDIS_QUEUE', 'default'),
    'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
    'block_for' => 5,
    'after_commit' => false,
],

WARNING

block_for 设置为 0 将导致队列工作者无限期阻塞,直到作业可用。这也将阻止信号(如 SIGTERM)在处理下一个作业之前被处理。

其他驱动程序先决条件

以下依赖项是列出的队列驱动程序所需的。这些依赖项可以通过 Composer 包管理器安装:

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~5.0
  • Redis: predis/predis ~2.0 或 phpredis PHP 扩展
  • MongoDB: mongodb/laravel-mongodb

创建作业

生成作业类

默认情况下,应用程序的所有可队列作业都存储在 app/Jobs 目录中。如果 app/Jobs 目录不存在,当您运行 make:job Artisan 命令时,它将被创建:

shell
php artisan make:job ProcessPodcast

生成的类将实现 Illuminate\Contracts\Queue\ShouldQueue 接口,指示 Laravel 该作业应被推送到队列中以异步运行。

NOTE

作业存根可以使用 存根发布 进行自定义。

类结构

作业类非常简单,通常只包含一个 handle 方法,该方法在作业被队列处理时被调用。首先,让我们看一个示例作业类。在此示例中,我们假设我们管理一个播客发布服务,并且需要在发布之前处理上传的播客文件:

php
<?php

namespace App\Jobs;

use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;

class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    /**
     * 创建一个新的作业实例。
     */
    public function __construct(
        public Podcast $podcast,
    ) {}

    /**
     * 执行作业。
     */
    public function handle(AudioProcessor $processor): void
    {
        // 处理上传的播客...
    }
}

在此示例中,请注意我们能够将 Eloquent 模型 直接传递到队列作业的构造函数中。由于作业使用的 Queueable trait,Eloquent 模型及其加载的关系将在作业处理时被优雅地序列化和反序列化。

如果您的队列作业在其构造函数中接受一个 Eloquent 模型,则只有模型的标识符会被序列化到队列中。当作业实际处理时,队列系统将自动从数据库中重新检索完整的模型实例及其加载的关系。这种模型序列化方法允许将更小的作业负载发送到您的队列驱动程序。

handle 方法依赖注入

handle 方法在作业被队列处理时被调用。请注意,我们能够在作业的 handle 方法上进行类型提示依赖项。Laravel 服务容器 会自动注入这些依赖项。

如果您希望完全控制容器如何将依赖项注入到 handle 方法中,您可以使用容器的 bindMethod 方法。bindMethod 方法接受一个回调,该回调接收作业和容器。在回调中,您可以随意调用 handle 方法。通常,您应该从 App\Providers\AppServiceProvider 服务提供者boot 方法中调用此方法:

php
use App\Jobs\ProcessPodcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Foundation\Application;

$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {
    return $job->handle($app->make(AudioProcessor::class));
});

WARNING

二进制数据,例如原始图像内容,应在传递给队列作业之前通过 base64_encode 函数进行处理。否则,作业在放置到队列中时可能无法正确序列化为 JSON。

队列关系

由于所有加载的 Eloquent 模型关系在作业被队列时也会被序列化,因此序列化的作业字符串有时可能会变得相当大。此外,当作业被反序列化并且模型关系从数据库中重新检索时,它们将被完整检索。在作业排队过程中应用的任何先前关系约束在作业反序列化时将不再适用。因此,如果您希望处理给定关系的子集,您应该在队列作业中重新约束该关系。

或者,为了防止关系被序列化,您可以在设置属性值时调用模型上的 withoutRelations 方法。此方法将返回一个没有加载关系的模型实例:

php
/**
 * 创建一个新的作业实例。
 */
public function __construct(
    Podcast $podcast,
) {
    $this->podcast = $podcast->withoutRelations();
}

如果您使用 PHP 构造函数属性提升,并且希望指示 Eloquent 模型不应序列化其关系,您可以使用 WithoutRelations 属性:

php
use Illuminate\Queue\Attributes\WithoutRelations;

/**
 * 创建一个新的作业实例。
 */
public function __construct(
    #[WithoutRelations]
    public Podcast $podcast,
) {}

如果作业接收一个 Eloquent 模型的集合或数组而不是单个模型,则集合中的模型在作业被反序列化和执行时将不会恢复其关系。这是为了防止处理大量模型的作业使用过多资源。

唯一作业

WARNING

唯一作业需要支持 的缓存驱动程序。目前,memcachedredisdynamodbdatabasefilearray 缓存驱动程序支持原子锁。此外,唯一作业约束不适用于批处理中的作业。

有时,您可能希望确保在任何时候队列中只有一个特定作业的实例。您可以通过在作业类上实现 ShouldBeUnique 接口来实现。这一接口不需要您在类上定义任何其他方法:

php
<?php

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    // ...
}

在上面的示例中,UpdateSearchIndex 作业是唯一的。因此,如果队列中已经有另一个作业实例并且尚未完成处理,则不会调度该作业。

在某些情况下,您可能希望定义一个特定的“键”来使作业唯一,或者您可能希望指定一个超时,超过该超时作业将不再保持唯一。为此,您可以在作业类上定义 uniqueIduniqueFor 属性或方法:

php
<?php

use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    /**
     * 产品实例。
     *
     * @var \App\Product
     */
    public $product;

    /**
     * 作业唯一锁将被释放的秒数。
     *
     * @var int
     */
    public $uniqueFor = 3600;

    /**
     * 获取作业的唯一 ID。
     */
    public function uniqueId(): string
    {
        return $this->product->id;
    }
}

在上面的示例中,UpdateSearchIndex 作业通过产品 ID 唯一。因此,具有相同产品 ID 的作业的新调度将被忽略,直到现有作业完成处理。此外,如果现有作业在一小时内未处理,唯一锁将被释放,并且可以将另一个具有相同唯一键的作业调度到队列中。

WARNING

如果您的应用程序从多个 Web 服务器或容器调度作业,您应该确保所有服务器都与同一个中央缓存服务器通信,以便 Laravel 可以准确确定作业是否唯一。

保持作业唯一直到处理开始

默认情况下,唯一作业在作业完成处理或所有重试尝试失败后被“解锁”。然而,在某些情况下,您可能希望在作业处理之前立即解锁作业。为此,您的作业应实现 ShouldBeUniqueUntilProcessing 合约,而不是 ShouldBeUnique 合约:

php
<?php

use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
    // ...
}

唯一作业锁

在后台,当一个 ShouldBeUnique 作业被调度时,Laravel 尝试获取一个带有 uniqueId 键的 。如果未获取到锁,则不会调度作业。此锁在作业完成处理或所有重试尝试失败时被释放。默认情况下,Laravel 将使用默认缓存驱动程序来获取此锁。然而,如果您希望使用另一个驱动程序来获取锁,您可以定义一个 uniqueVia 方法,该方法返回应使用的缓存驱动程序:

php
use Illuminate\Contracts\Cache\Repository;
use Illuminate\Support\Facades\Cache;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    // ...

    /**
     * 获取唯一作业锁的缓存驱动程序。
     */
    public function uniqueVia(): Repository
    {
        return Cache::driver('redis');
    }
}

NOTE

如果您只需要限制作业的并发处理,请使用 WithoutOverlapping 作业中间件。

加密作业

Laravel 允许您通过 加密 确保作业数据的隐私和完整性。要开始,只需将 ShouldBeEncrypted 接口添加到作业类中。一旦将此接口添加到类中,Laravel 将在将作业推送到队列之前自动加密您的作业:

php
<?php

use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueue;

class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted
{
    // ...
}

作业中间件

作业中间件允许您在队列作业的执行过程中包装自定义逻辑,从而减少作业本身的样板代码。例如,考虑以下 handle 方法,它利用 Laravel 的 Redis 速率限制功能,每五秒钟只允许一个作业处理一次:

php
use Illuminate\Support\Facades\Redis;

/**
 * 执行作业。
 */
public function handle(): void
{
    Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
        info('获得锁...');

        // 处理作业...
    }, function () {
        // 无法获得锁...

        return $this->release(5);
    });
}

虽然此代码是有效的,但 handle 方法的实现变得嘈杂,因为它被 Redis 速率限制逻辑所干扰。此外,必须为我们希望速率限制的任何其他作业复制此速率限制逻辑。

我们可以定义一个处理速率限制的作业中间件,而不是在 handle 方法中进行速率限制。Laravel 没有作业中间件的默认位置,因此您可以将作业中间件放置在应用程序中的任何位置。在此示例中,我们将中间件放置在 app/Jobs/Middleware 目录中:

php
<?php

namespace App\Jobs\Middleware;

use Closure;
use Illuminate\Support\Facades\Redis;

class RateLimited
{
    /**
     * 处理队列作业。
     *
     * @param  \Closure(object): void  $next
     */
    public function handle(object $job, Closure $next): void
    {
        Redis::throttle('key')
            ->block(0)->allow(1)->every(5)
            ->then(function () use ($job, $next) {
                // 获得锁...

                $next($job);
            }, function () use ($job) {
                // 无法获得锁...

                $job->release(5);
            });
    }
}

如您所见,像 路由中间件 一样,作业中间件接收正在处理的作业和一个回调,该回调应被调用以继续处理作业。

你可以使用 make:job-middleware Artisan 命令生成一个新的任务中间件类。创建任务中间件后,可以通过从任务的 middleware 方法返回它们来将其附加到任务上。由 make:job Artisan 命令脚手架生成的任务上不存在此方法,因此你需要手动将其添加到任务类中:

php
use App\Jobs\Middleware\RateLimited;

/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new RateLimited];
}

NOTE

任务中间件也可以分配给可队列的事件监听器可发送邮件通知

速率限制

虽然我们刚刚演示了如何编写自己的速率限制作业中间件,但 Laravel 实际上包含了一个速率限制中间件,您可以利用它来限制作业速率。像 路由速率限制器 一样,作业速率限制器是使用 RateLimiter facade 的 for 方法定义的。

例如,您可能希望允许用户每小时备份一次数据,而对高级客户不施加此类限制。为此,您可以在 AppServiceProviderboot 方法中定义一个 RateLimiter

php
use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Support\Facades\RateLimiter;

/**
 * 启动任何应用程序服务。
 */
public function boot(): void
{
    RateLimiter::for('backups', function (object $job) {
        return $job->user->vipCustomer()
            ? Limit::none()
            : Limit::perHour(1)->by($job->user->id);
    });
}

在上面的示例中,我们定义了一个每小时的速率限制;然而,您可以使用 perMinute 方法轻松定义基于分钟的速率限制。此外,您可以将任何值传递给速率限制的 by 方法;然而,此值最常用于按客户分段速率限制:

php
return Limit::perMinute(50)->by($job->user->id);

一旦您定义了速率限制,您可以使用 Illuminate\Queue\Middleware\RateLimited 中间件将速率限制器附加到您的作业上。每次作业超过速率限制时,此中间件将根据速率限制持续时间将作业释放回队列,并附加适当的延迟。

php
use Illuminate\Queue\Middleware\RateLimited;

/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new RateLimited('backups')];
}

将速率限制的作业释放回队列仍将增加作业的总 attempts 数量。您可能希望相应地调整作业类上的 triesmaxExceptions 属性。或者,您可能希望使用 retryUntil 方法 来定义作业不再尝试的时间。

使用 releaseAfter 方法,你还可以指定释放的任务在再次尝试之前必须经过的秒数:

php
/**
 * 获取任务应经过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new RateLimited('backups'))->releaseAfter(60)];
}

如果您不希望在速率限制时重试作业,您可以使用 dontRelease 方法:

php
/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new RateLimited('backups'))->dontRelease()];
}

NOTE

如果您使用 Redis,您可以使用 Illuminate\Queue\Middleware\RateLimitedWithRedis 中间件,该中间件针对 Redis 进行了微调,比基本速率限制中间件更高效。

防止作业重叠

Laravel 包含一个 Illuminate\Queue\Middleware\WithoutOverlapping 中间件,允许您根据任意键防止作业重叠。当一个队列作业正在修改一个资源时,这可能会很有帮助,因为该资源应该只由一个作业同时修改。

例如,假设您有一个队列作业更新用户的信用评分,并且您希望防止同一用户 ID 的信用评分更新作业重叠。为此,您可以从作业的 middleware 方法返回 WithoutOverlapping 中间件:

php
use Illuminate\Queue\Middleware\WithoutOverlapping;

/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new WithoutOverlapping($this->user->id)];
}

任何相同类型的重叠作业将被释放回队列。您还可以指定必须经过的秒数,然后才会再次尝试释放的作业:

php
/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
}

如果您希望立即删除任何重叠作业,以便它们不会被重试,您可以使用 dontRelease 方法:

php
/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new WithoutOverlapping($this->order->id))->dontRelease()];
}

WithoutOverlapping 中间件由 Laravel 的原子锁功能提供支持。有时,您的作业可能会意外失败或超时,以至于锁未被释放。因此,您可以使用 expireAfter 方法显式定义锁过期时间。例如,下面的示例将指示 Laravel 在作业开始处理后三分钟释放 WithoutOverlapping 锁:

php
/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];
}

WARNING

WithoutOverlapping 中间件需要支持 的缓存驱动程序。目前,memcachedredisdynamodbdatabasefilearray 缓存驱动程序支持原子锁。

在作业类之间共享锁键

默认情况下,WithoutOverlapping 中间件将仅防止相同类的作业重叠。因此,尽管两个不同的作业类可能使用相同的锁键,但它们不会被阻止重叠。然而,您可以使用 shared 方法指示 Laravel 在作业类之间应用键:

php
use Illuminate\Queue\Middleware\WithoutOverlapping;

class ProviderIsDown
{
    // ...

    public function middleware(): array
    {
        return [
            (new WithoutOverlapping("status:{$this->provider}"))->shared(),
        ];
    }
}

class ProviderIsUp
{
    // ...

    public function middleware(): array
    {
        return [
            (new WithoutOverlapping("status:{$this->provider}"))->shared(),
        ];
    }
}

异常节流

Laravel 包含一个 Illuminate\Queue\Middleware\ThrottlesExceptions 中间件,允许您对异常进行节流。一旦作业抛出给定数量的异常,所有进一步的作业执行尝试将被延迟,直到指定的时间间隔过去。此中间件对于与不稳定的第三方服务交互的作业特别有用。

例如,假设一个队列作业与第三方 API 交互,该 API 开始抛出异常。要对异常进行节流,您可以从作业的 middleware 方法返回 ThrottlesExceptions 中间件。通常,此中间件应与实现 基于时间的尝试 的作业配对:

php
use DateTime;
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new ThrottlesExceptions(10, 5 * 60)];
}

/**
 * 确定作业应超时的时间。
 */
public function retryUntil(): DateTime
{
    return now()->addMinutes(30);
}

中间件接受的第一个构造函数参数是作业可以抛出的异常数量,第二个构造函数参数是作业在被节流后应等待的秒数。在上面的代码示例中,如果作业抛出 10 个连续异常,我们将等待 5 分钟,然后再尝试作业,受 30 分钟时间限制的约束。

当作业抛出异常但异常阈值尚未达到时,作业通常会立即重试。然而,您可以在将中间件附加到作业时调用 backoff 方法来指定此类作业应延迟的分钟数:

php
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(10, 5 * 60))->backoff(5)];
}

在内部,此中间件使用 Laravel 的缓存系统来实现速率限制,并且作业的类名被用作缓存“键”。您可以在将中间件附加到作业时调用 by 方法来覆盖此键。如果您有多个作业与同一第三方服务交互,并且希望它们共享一个公共节流“桶”,这可能会很有用:

php
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(10, 10 * 60))->by('key')];
}

默认情况下,此中间件会限制每个异常。你可以通过在将中间件附加到任务时调用 when 方法来修改此行为。只有当提供给 when 方法的闭包返回 true 时,异常才会被限制:

php
use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(10, 10 * 60))->when(
        fn (Throwable $throwable) => $throwable instanceof HttpClientException
    )];
}

when 方法不同(该方法会将任务放回队列或抛出异常),deleteWhen 方法允许你在发生给定异常时完全删除任务:

php
use App\Exceptions\CustomerDeletedException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * 获取任务应该通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(2, 10 * 60))->deleteWhen(CustomerDeletedException::class)];
}

如果您希望将节流的异常报告给应用程序的异常处理程序,您可以在将中间件附加到作业时调用 report 方法。可选地,您可以提供一个闭包给 report 方法,并且只有当给定的闭包返回 true 时,异常才会被报告:

php
use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(10, 10 * 60))->report(
        fn (Throwable $throwable) => $throwable instanceof HttpClientException
    )];
}

NOTE

如果您使用 Redis,您可以使用 Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis 中间件,该中间件针对 Redis 进行了微调,比基本异常节流中间件更高效。

跳过作业

Skip 中间件允许您指定作业应被跳过/删除,而无需修改作业的逻辑。如果给定条件评估为 trueSkip::when 方法将删除作业,而如果条件评估为 falseSkip::unless 方法将删除作业:

php
use Illuminate\Queue\Middleware\Skip;

/**
 * 获取作业应通过的中间件。
 */
public function middleware(): array
{
    return [
        Skip::when($someCondition),
    ];
}

您还可以将 Closure 传递给 whenunless 方法,以进行更复杂的条件评估:

php
use Illuminate\Queue\Middleware\Skip;

/**
 * 获取作业应通过的中间件。
 */
public function middleware(): array
{
    return [
        Skip::when(function (): bool {
            return $this->shouldSkip();
        }),
    ];
}

调度作业

一旦您编写了作业类,您可以使用作业本身的 dispatch 方法来调度它。传递给 dispatch 方法的参数将传递给作业的构造函数:

php
<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储一个新的播客。
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // ...

        ProcessPodcast::dispatch($podcast);

        return redirect('/podcasts');
    }
}

如果您希望有条件地调度作业,您可以使用 dispatchIfdispatchUnless 方法:

php
ProcessPodcast::dispatchIf($accountActive, $podcast);

ProcessPodcast::dispatchUnless($accountSuspended, $podcast);

在新的 Laravel 应用程序中,sync 驱动程序是默认的队列驱动程序。此驱动程序在当前请求的前台同步执行作业,这在本地开发期间通常很方便。如果您希望实际开始将作业排队以进行后台处理,您可以在应用程序的 config/queue.php 配置文件中指定不同的队列驱动程序。

延迟调度

如果您希望指定作业不应立即可供队列工作者处理,您可以在调度作业时使用 delay 方法。例如,让我们指定作业在调度后 10 分钟内不可用:

php
<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储一个新的播客。
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // ...

        ProcessPodcast::dispatch($podcast)
            ->delay(now()->addMinutes(10));

        return redirect('/podcasts');
    }
}

在某些情况下,作业可能已配置了默认延迟。如果您需要绕过此延迟并立即调度作业以进行处理,您可以使用 withoutDelay 方法:

php
ProcessPodcast::dispatch($podcast)->withoutDelay();

WARNING

Amazon SQS 队列服务的最大延迟时间为 15 分钟。

在响应发送到浏览器后调度

或者,dispatchAfterResponse 方法会延迟调度作业,直到 HTTP 响应发送到用户的浏览器(如果您的 Web 服务器使用 FastCGI)。这仍然允许用户开始使用应用程序,即使队列作业仍在执行。通常,这仅应用于大约需要一秒钟的作业,例如发送电子邮件。由于它们在当前 HTTP 请求中处理,因此以这种方式调度的作业不需要运行队列工作者即可处理:

php
use App\Jobs\SendNotification;

SendNotification::dispatchAfterResponse();

您还可以 dispatch 一个闭包,并将 afterResponse 方法链接到 dispatch 助手,以在 HTTP 响应发送到浏览器后执行闭包:

php
use App\Mail\WelcomeMessage;
use Illuminate\Support\Facades\Mail;

dispatch(function () {
    Mail::to('taylor@example.com')->send(new WelcomeMessage);
})->afterResponse();

同步调度

如果您希望立即(同步)调度作业,您可以使用 dispatchSync 方法。使用此方法时,作业不会被排队,而是立即在当前进程中执行:

php
<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储一个新的播客。
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // 创建播客...

        ProcessPodcast::dispatchSync($podcast);

        return redirect('/podcasts');
    }
}

作业与数据库事务

虽然在数据库事务中调度作业是完全可以的,但您应该特别注意确保您的作业能够成功执行。在事务中调度作业时,可能会出现作业在父事务提交之前被工作者处理的情况。当这种情况发生时,您在数据库事务中对模型或数据库记录所做的任何更新可能尚未反映在数据库中。此外,在事务中创建的任何模型或数据库记录可能不存在于数据库中。

幸运的是,Laravel 提供了几种方法来解决此问题。首先,您可以在队列连接的配置数组中设置 after_commit 连接选项:

php
'redis' => [
    'driver' => 'redis',
    // ...
    'after_commit' => true,
],

after_commit 选项为 true 时,您可以在数据库事务中调度作业;然而,Laravel 将等待打开的父数据库事务提交后才实际调度作业。当然,如果当前没有打开的数据库事务,作业将立即调度。

如果由于事务期间发生的异常而回滚事务,则在该事务期间调度的作业将被丢弃。

NOTE

after_commit 配置选项设置为 true 还将导致任何排队的事件监听器、邮件、通知和广播事件在所有打开的数据库事务提交后被调度。

内联指定提交调度行为

如果您没有将 after_commit 队列连接配置选项设置为 true,您仍然可以指示特定作业在所有打开的数据库事务提交后调度。为此,您可以将 afterCommit 方法链接到您的调度操作:

php
use App\Jobs\ProcessPodcast;

ProcessPodcast::dispatch($podcast)->afterCommit();

同样,如果 after_commit 配置选项设置为 true,您可以指示特定作业立即调度,而无需等待任何打开的数据库事务提交:

php
ProcessPodcast::dispatch($podcast)->beforeCommit();

作业链

作业链允许您指定在主作业成功执行后应按顺序运行的队列作业列表。如果序列中的一个作业失败,则不会运行其余的作业。要执行队列作业链,您可以使用 Bus facade 提供的 chain 方法。Laravel 的命令总线是构建在队列作业调度之上的较低级别组件:

php
use App\Jobs\OptimizePodcast;
use App\Jobs\ProcessPodcast;
use App\Jobs\ReleasePodcast;
use Illuminate\Support\Facades\Bus;

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->dispatch();

除了链接作业类实例,您还可以链接闭包:

php
Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    function () {
        Podcast::update(/* ... */);
    },
])->dispatch();

WARNING

在作业中使用 $this->delete() 方法删除作业不会阻止链式作业被处理。只有当链中的作业失败时,链才会停止执行。

链接连接和队列

如果您想指定用于链式作业的连接和队列,可以使用 onConnectiononQueue 方法。这些方法指定了队列连接和队列名称,除非队列作业被明确分配到不同的连接/队列,否则将使用这些方法指定的连接和队列:

php
Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->onConnection('redis')->onQueue('podcasts')->dispatch();

向链中添加作业

有时,您可能需要从链中的另一个作业中向现有作业链前置或追加作业。您可以使用 prependToChainappendToChain 方法来实现这一点:

php
/**
 * 执行作业。
 */
public function handle(): void
{
    // ...

    // 向当前链前置作业,立即在当前作业后运行...
    $this->prependToChain(new TranscribePodcast);

    // 向当前链追加作业,在链末尾运行...
    $this->appendToChain(new TranscribePodcast);
}

链失败

在链接作业时,您可以使用 catch 方法指定一个闭包,如果链中的某个作业失败,该闭包将被调用。给定的回调将接收导致作业失败的 Throwable 实例:

php
use Illuminate\Support\Facades\Bus;
use Throwable;

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->catch(function (Throwable $e) {
    // 链中的某个作业失败...
})->dispatch();

WARNING

由于链回调被序列化并在稍后由 Laravel 队列执行,因此您不应在链回调中使用 $this 变量。

自定义队列和连接

派发到特定队列

通过将作业推送到不同的队列,您可以“分类”您的队列作业,甚至可以优先分配多少个工作者到不同的队列。请记住,这不会将作业推送到您的队列配置文件中定义的不同队列“连接”,而只是推送到单个连接中的特定队列。要指定队列,请在派发作业时使用 onQueue 方法:

php
<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储新播客。
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // 创建播客...

        ProcessPodcast::dispatch($podcast)->onQueue('processing');

        return redirect('/podcasts');
    }
}

或者,您可以通过在作业的构造函数中调用 onQueue 方法来指定作业的队列:

php
<?php

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;

class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    /**
     * 创建新的作业实例。
     */
    public function __construct()
    {
        $this->onQueue('processing');
    }
}

派发到特定连接

如果您的应用程序与多个队列连接交互,您可以使用 onConnection 方法指定将作业推送到哪个连接:

php
<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储新播客。
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // 创建播客...

        ProcessPodcast::dispatch($podcast)->onConnection('sqs');

        return redirect('/podcasts');
    }
}

您可以将 onConnectiononQueue 方法链在一起,以指定作业的连接和队列:

php
ProcessPodcast::dispatch($podcast)
    ->onConnection('sqs')
    ->onQueue('processing');

或者,您可以通过在作业的构造函数中调用 onConnection 方法来指定作业的连接:

php
<?php

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;

class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    /**
     * 创建新的作业实例。
     */
    public function __construct()
    {
        $this->onConnection('sqs');
    }
}

指定最大作业尝试次数/超时值

最大尝试次数

如果您的某个队列作业遇到错误,您可能不希望它无限重试。因此,Laravel 提供了多种方法来指定作业可以尝试的次数或时间。

指定作业可以尝试的最大次数的一种方法是通过 Artisan 命令行上的 --tries 开关。这将适用于所有由工作者处理的作业,除非正在处理的作业指定了可以尝试的次数:

shell
php artisan queue:work --tries=3

如果作业超过其最大尝试次数,它将被视为“失败”作业。有关处理失败作业的更多信息,请参阅失败作业文档。如果 --tries=0 被提供给 queue:work 命令,作业将无限重试。

您可以通过在作业类本身上定义作业可以尝试的最大次数来采取更细粒度的方法。如果在作业上指定了最大尝试次数,它将优先于命令行上提供的 --tries 值:

php
<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * 作业可以尝试的次数。
     *
     * @var int
     */
    public $tries = 5;
}

如果您需要对特定作业的最大尝试次数进行动态控制,可以在作业上定义一个 tries 方法:

php
/**
 * 确定作业可以尝试的次数。
 */
public function tries(): int
{
    return 5;
}

基于时间的尝试

作为定义作业在失败前可以尝试的次数的替代方法,您可以定义作业不再尝试的时间。这允许作业在给定时间范围内尝试任意次数。要定义作业不再尝试的时间,请在作业类中添加一个 retryUntil 方法。此方法应返回一个 DateTime 实例:

php
use DateTime;

/**
 * 确定作业应超时的时间。
 */
public function retryUntil(): DateTime
{
    return now()->addMinutes(10);
}

如果同时定义了 retryUntiltries,Laravel 会优先使用 retryUntil 方法。

NOTE

你也可以在队列事件监听器队列通知上定义 tries 属性或 retryUntil 方法。

最大异常

有时您可能希望指定作业可以尝试多次,但如果重试是由给定数量的未处理异常触发的,则应失败(而不是直接由 release 方法释放)。为此,您可以在作业类上定义一个 maxExceptions 属性:

php
<?php

namespace App\Jobs;

use Illuminate\Support\Facades\Redis;

class ProcessPodcast implements ShouldQueue
{
    /**
     * 作业可以尝试的次数。
     *
     * @var int
     */
    public $tries = 25;

    /**
     * 允许的最大未处理异常数。
     *
     * @var int
     */
    public $maxExceptions = 3;

    /**
     * 执行作业。
     */
    public function handle(): void
    {
        Redis::throttle('key')->allow(10)->every(60)->then(function () {
            // 获得锁,处理播客...
        }, function () {
            // 无法获得锁...
            return $this->release(10);
        });
    }
}

在此示例中,如果应用程序无法获得 Redis 锁,作业将被释放十秒钟,并将继续重试最多 25 次。然而,如果作业抛出三个未处理异常,作业将失败。

超时

通常,您大致知道您的队列作业需要多长时间。因此,Laravel 允许您指定“超时”值。默认情况下,超时值为 60 秒。如果作业处理时间超过超时值指定的秒数,处理作业的工作者将以错误退出。通常,工作者将由服务器上配置的进程管理器自动重启。

可以使用 Artisan 命令行上的 --timeout 开关指定作业可以运行的最大秒数:

shell
php artisan queue:work --timeout=30

如果作业因持续超时而超过其最大尝试次数,它将被标记为失败。

您还可以在作业类本身上定义作业应允许运行的最大秒数。如果在作业上指定了超时,它将优先于命令行上指定的任何超时:

php
<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * 作业在超时前可以运行的秒数。
     *
     * @var int
     */
    public $timeout = 120;
}

有时,IO 阻塞进程(如套接字或传出 HTTP 连接)可能不尊重您指定的超时。因此,在使用这些功能时,您应始终尝试使用其 API 指定超时值。例如,在使用 Guzzle 时,您应始终指定连接和请求超时值。

WARNING

必须安装 pcntl PHP 扩展才能指定作业超时。此外,作业的“超时”值应始终小于其“重试后”值。否则,作业可能会在实际完成执行或超时之前被重新尝试。

超时失败

如果您希望指示作业在超时时应标记为失败,可以在作业类上定义 $failOnTimeout 属性:

php
/**
 * 指示作业在超时时是否应标记为失败。
 *
 * @var bool
 */
public $failOnTimeout = true;

错误处理

如果在处理作业时抛出异常,作业将自动释放回队列,以便可以再次尝试。作业将继续释放,直到它已被您的应用程序允许的最大次数尝试。最大尝试次数由 queue:work Artisan 命令上使用的 --tries 开关定义。或者,最大尝试次数可以在作业类本身上定义。有关运行队列工作者的更多信息可以在下面找到

手动释放作业

有时您可能希望手动将作业释放回队列,以便可以在稍后再次尝试。您可以通过调用 release 方法来实现这一点:

php
/**
 * 执行作业。
 */
public function handle(): void
{
    // ...

    $this->release();
}

默认情况下,release 方法将作业释放回队列以立即处理。但是,您可以指示队列在给定秒数过去之前不使作业可供处理,方法是将整数或日期实例传递给 release 方法:

php
$this->release(10);

$this->release(now()->addSeconds(10));

手动失败作业

有时您可能需要手动将作业标记为“失败”。为此,您可以调用 fail 方法:

php
/**
 * 执行作业。
 */
public function handle(): void
{
    // ...

    $this->fail();
}

如果您希望因捕获的异常而将作业标记为失败,可以将异常传递给 fail 方法。或者,为了方便起见,您可以传递一个字符串错误消息,该消息将为您转换为异常:

php
$this->fail($exception);

$this->fail('出了点问题。');

NOTE

有关失败作业的更多信息,请查看处理作业失败的文档

在特定异常时使任务失败

FailOnException 任务中间件 允许你在抛出特定异常时短路重试。这允许在临时异常(如外部 API 错误)时进行重试,但在持久异常(如用户权限被撤销)时永久失败任务:

php
<?php

namespace App\Jobs;

use App\Models\User;
use Illuminate\Auth\Access\AuthorizationException;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\Middleware\FailOnException;
use Illuminate\Support\Facades\Http;

class SyncChatHistory implements ShouldQueue
{
    use InteractsWithQueue;

    public $tries = 3;

    /**
     * 创建新的任务实例。
     */
    public function __construct(
        public User $user,
    ) {}

    /**
     * 执行任务。
     */
    public function handle(): void
    {
        $user->authorize('sync-chat-history');

        $response = Http::throw()->get(
            "https://chat.laravel.test/?user={$user->uuid}"
        );

        // ...
    }

    /**
     * 获取任务应该通过的中间件。
     */
    public function middleware(): array
    {
        return [
            new FailOnException([AuthorizationException::class])
        ];
    }
}

作业批处理

Laravel 的作业批处理功能允许您轻松执行一批作业,然后在作业批处理完成执行时执行某些操作。在开始之前,您应该创建一个数据库迁移来构建一个表,该表将包含有关作业批处理的元信息,例如其完成百分比。可以使用 make:queue-batches-table Artisan 命令生成此迁移:

shell
php artisan make:queue-batches-table

php artisan migrate

定义可批处理的作业

要定义可批处理的作业,您应该正常创建可队列的作业;但是,您应该将 Illuminate\Bus\Batchable trait 添加到作业类中。此 trait 提供了一个 batch 方法,可用于检索作业正在执行的当前批处理:

php
<?php

namespace App\Jobs;

use Illuminate\Bus\Batchable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;

class ImportCsv implements ShouldQueue
{
    use Batchable, Queueable;

    /**
     * 执行作业。
     */
    public function handle(): void
    {
        if ($this->batch()->cancelled()) {
            // 确定批处理是否已取消...

            return;
        }

        // 导入 CSV 文件的一部分...
    }
}

派发批处理

要派发一批作业,您应该使用 Bus facade 的 batch 方法。当然,批处理在与完成回调结合使用时最有用。因此,您可以使用 thencatchfinally 方法为批处理定义完成回调。这些回调中的每一个在调用时都会接收一个 Illuminate\Bus\Batch 实例。在此示例中,我们将假设我们正在排队一批作业,每个作业处理给定数量的 CSV 文件行:

php
use App\Jobs\ImportCsv;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Throwable;

$batch = Bus::batch([
    new ImportCsv(1, 100),
    new ImportCsv(101, 200),
    new ImportCsv(201, 300),
    new ImportCsv(301, 400),
    new ImportCsv(401, 500),
])->before(function (Batch $batch) {
    // 批处理已创建,但尚未添加作业...
})->progress(function (Batch $batch) {
    // 单个作业已成功完成...
})->then(function (Batch $batch) {
    // 所有作业均已成功完成...
})->catch(function (Batch $batch, Throwable $e) {
    // 检测到第一个批处理作业失败...
})->finally(function (Batch $batch) {
    // 批处理已完成执行...
})->dispatch();

return $batch->id;

批处理的 ID,可以通过 $batch->id 属性访问,可用于查询 Laravel 命令总线以获取有关批处理的信息。

WARNING

由于批处理回调被序列化并在稍后由 Laravel 队列执行,因此您不应在回调中使用 $this 变量。此外,由于批处理作业被包装在数据库事务中,因此不应在作业中执行触发隐式提交的数据库语句。

命名批处理

某些工具(如 Laravel Horizon 和 Laravel Telescope)可能会为批处理提供更用户友好的调试信息,如果批处理被命名。要为批处理分配任意名称,可以在定义批处理时调用 name 方法:

php
$batch = Bus::batch([
    // ...
])->then(function (Batch $batch) {
    // 所有作业均已成功完成...
})->name('Import CSV')->dispatch();

批处理连接和队列

如果您想指定用于批处理作业的连接和队列,可以使用 onConnectiononQueue 方法。所有批处理作业必须在同一连接和队列中执行:

php
$batch = Bus::batch([
    // ...
])->then(function (Batch $batch) {
    // 所有作业均已成功完成...
})->onConnection('redis')->onQueue('imports')->dispatch();

链和批处理

您可以通过将链式作业放在数组中来定义批处理中的一组链式作业。例如,我们可以并行执行两个作业链,并在两个作业链完成处理时执行回调:

php
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;

Bus::batch([
    [
        new ReleasePodcast(1),
        new SendPodcastReleaseNotification(1),
    ],
    [
        new ReleasePodcast(2),
        new SendPodcastReleaseNotification(2),
    ],
])->then(function (Batch $batch) {
    // ...
})->dispatch();

相反,您可以通过在链中定义批处理来在中运行批处理作业。例如,您可以先运行一批作业以发布多个播客,然后再运行一批作业以发送发布通知:

php
use App\Jobs\FlushPodcastCache;
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Support\Facades\Bus;

Bus::chain([
    new FlushPodcastCache,
    Bus::batch([
        new ReleasePodcast(1),
        new ReleasePodcast(2),
    ]),
    Bus::batch([
        new SendPodcastReleaseNotification(1),
        new SendPodcastReleaseNotification(2),
    ]),
])->dispatch();

向批处理中添加作业

有时,从批处理作业中向批处理中添加其他作业可能很有用。当您需要批处理数千个作业时,这种模式很有用,因为在 Web 请求期间调度这些作业可能需要太长时间。因此,您可能希望调度一批初始“加载器”作业,以便为批处理提供更多作业:

php
$batch = Bus::batch([
    new LoadImportBatch,
    new LoadImportBatch,
    new LoadImportBatch,
])->then(function (Batch $batch) {
    // 所有作业均已成功完成...
})->name('Import Contacts')->dispatch();

在此示例中,我们将使用 LoadImportBatch 作业为批处理提供其他作业。为此,我们可以使用作业的 batch 方法访问的批处理实例上的 add 方法:

php
use App\Jobs\ImportContacts;
use Illuminate\Support\Collection;

/**
 * 执行作业。
 */
public function handle(): void
{
    if ($this->batch()->cancelled()) {
        return;
    }

    $this->batch()->add(Collection::times(1000, function () {
        return new ImportContacts;
    }));
}

WARNING

您只能从属于同一批处理的作业中向批处理中添加作业。

检查批处理

提供给批处理完成回调的 Illuminate\Bus\Batch 实例具有多种属性和方法,可帮助您与给定的作业批处理进行交互和检查:

php
// 批处理的 UUID...
$batch->id;

// 批处理的名称(如果适用)...
$batch->name;

// 分配给批处理的作业数量...
$batch->totalJobs;

// 尚未被队列处理的作业数量...
$batch->pendingJobs;

// 失败的作业数量...
$batch->failedJobs;

// 到目前为止已处理的作业数量...
$batch->processedJobs();

// 批处理的完成百分比(0-100)...
$batch->progress();

// 指示批处理是否已完成执行...
$batch->finished();

// 取消批处理的执行...
$batch->cancel();

// 指示批处理是否已取消...
$batch->cancelled();

从路由返回批处理

所有 Illuminate\Bus\Batch 实例都是 JSON 可序列化的,这意味着您可以直接从应用程序的路由中返回它们,以检索包含有关批处理的信息的 JSON 负载,包括其完成进度。这使得在应用程序的 UI 中显示有关批处理完成进度的信息变得方便。

要通过其 ID 检索批处理,可以使用 Bus facade 的 findBatch 方法:

php
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Route;

Route::get('/batch/{batchId}', function (string $batchId) {
    return Bus::findBatch($batchId);
});

取消批处理

有时您可能需要取消给定批处理的执行。这可以通过调用 Illuminate\Bus\Batch 实例上的 cancel 方法来实现:

php
/**
 * 执行作业。
 */
public function handle(): void
{
    if ($this->user->exceedsImportLimit()) {
        return $this->batch()->cancel();
    }

    if ($this->batch()->cancelled()) {
        return;
    }
}

正如您在前面的示例中可能注意到的,批处理作业通常应在继续执行之前确定其对应的批处理是否已取消。然而,为了方便起见,您可以将 SkipIfBatchCancelled 中间件分配给作业。顾名思义,此中间件将指示 Laravel 如果其对应的批处理已取消,则不处理作业:

php
use Illuminate\Queue\Middleware\SkipIfBatchCancelled;

/**
 * 获取作业应通过的中间件。
 */
public function middleware(): array
{
    return [new SkipIfBatchCancelled];
}

批处理失败

当批处理作业失败时,将调用 catch 回调(如果已分配)。此回调仅在批处理中的第一个作业失败时调用。

允许失败

当批处理中的作业失败时,Laravel 将自动将批处理标记为“已取消”。如果您愿意,可以禁用此行为,以便作业失败不会自动将批处理标记为已取消。这可以通过在派发批处理时调用 allowFailures 方法来实现:

php
$batch = Bus::batch([
    // ...
])->then(function (Batch $batch) {
    // 所有作业均已成功完成...
})->allowFailures()->dispatch();

重试失败的批处理作业

为了方便起见,Laravel 提供了一个 queue:retry-batch Artisan 命令,允许您轻松重试给定批处理的所有失败作业。queue:retry-batch 命令接受应重试其失败作业的批处理的 UUID:

shell
php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5

修剪批处理

如果不进行修剪,job_batches 表可能会非常快速地积累记录。为了解决这个问题,您应该安排 queue:prune-batches Artisan 命令每天运行:

php
use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches')->daily();

默认情况下,所有超过 24 小时的已完成批处理将被修剪。您可以在调用命令时使用 hours 选项来确定保留批处理数据的时间。例如,以下命令将删除所有在 48 小时前完成的批处理:

php
use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches --hours=48')->daily();

有时,您的 jobs_batches 表可能会积累从未成功完成的批处理的批处理记录,例如作业失败且该作业从未成功重试的批处理。您可以使用 unfinished 选项指示 queue:prune-batches 命令修剪这些未完成的批处理记录:

php
use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches --hours=48 --unfinished=72')->daily();

同样,您的 jobs_batches 表也可能会积累已取消批处理的批处理记录。您可以使用 cancelled 选项指示 queue:prune-batches 命令修剪这些已取消的批处理记录:

php
use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches --hours=48 --cancelled=72')->daily();

在 DynamoDB 中存储批处理

Laravel 还提供了在 DynamoDB 中存储批处理元信息的支持,而不是在关系数据库中。然而,您需要手动创建一个 DynamoDB 表来存储所有批处理记录。

通常,此表应命名为 job_batches,但您应根据应用程序的 queue 配置文件中的 queue.batching.table 配置值命名表。

DynamoDB 批处理表配置

job_batches 表应具有一个名为 application 的字符串主分区键和一个名为 id 的字符串主排序键。键的 application 部分将包含应用程序的名称,如应用程序的 app 配置文件中的 name 配置值所定义。由于应用程序名称是 DynamoDB 表键的一部分,您可以使用同一个表来存储多个 Laravel 应用程序的作业批处理。

此外,如果您希望利用自动批处理修剪,可以为表定义 ttl 属性。

DynamoDB 配置

接下来,安装 AWS SDK,以便您的 Laravel 应用程序可以与 Amazon DynamoDB 通信:

shell
composer require aws/aws-sdk-php

然后,将 queue.batching.driver 配置选项的值设置为 dynamodb。此外,您应在 batching 配置数组中定义 keysecretregion 配置选项。这些选项将用于 AWS 身份验证。当使用 dynamodb 驱动程序时,不需要 queue.batching.database 配置选项:

php
'batching' => [
    'driver' => env('QUEUE_BATCHING_DRIVER', 'dynamodb'),
    'key' => env('AWS_ACCESS_KEY_ID'),
    'secret' => env('AWS_SECRET_ACCESS_KEY'),
    'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    'table' => 'job_batches',
],

在 DynamoDB 中修剪批处理

在利用 DynamoDB 存储作业批处理信息时,用于修剪存储在关系数据库中的批处理的典型修剪命令将不起作用。相反,您可以利用 DynamoDB 的本机 TTL 功能自动删除旧批处理的记录。

如果您为 DynamoDB 表定义了 ttl 属性,可以定义配置参数以指示 Laravel 如何修剪批处理记录。queue.batching.ttl_attribute 配置值定义了保存 TTL 的属性名称,而 queue.batching.ttl 配置值定义了批处理记录可以从 DynamoDB 表中删除的秒数,相对于记录最后一次更新的时间:

php
'batching' => [
    'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
    'key' => env('AWS_ACCESS_KEY_ID'),
    'secret' => env('AWS_SECRET_ACCESS_KEY'),
    'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    'table' => 'job_batches',
    'ttl_attribute' => 'ttl',
    'ttl' => 60 * 60 * 24 * 7, // 7 天...
],

队列闭包

您可以将闭包派发到队列,而不是将作业类派发到队列。这对于需要在当前请求周期之外执行的快速简单任务非常有用。在将闭包派发到队列时,闭包的代码内容会被加密签名,以防止在传输过程中被修改:

php
$podcast = App\Podcast::find(1);

dispatch(function () use ($podcast) {
    $podcast->publish();
});

要为队列中的闭包指定名称,以便在队列监控面板中使用,并在 queue:work 命令中显示,可以使用 name 方法:

php
dispatch(function () {
    // ...
})->name('发布播客');

使用 catch 方法,您可以提供一个闭包,如果队列闭包在耗尽所有队列的配置重试尝试后未能成功完成,则应执行该闭包:

php
use Throwable;

dispatch(function () use ($podcast) {
    $podcast->publish();
})->catch(function (Throwable $e) {
    // 此作业已失败...
});

WARNING

由于 catch 回调被序列化并在稍后由 Laravel 队列执行,因此您不应在 catch 回调中使用 $this 变量。

运行队列工作者

queue:work 命令

Laravel 包含一个 Artisan 命令,该命令将启动队列工作者并在新作业推送到队列时处理它们。您可以使用 queue:work Artisan 命令运行工作者。请注意,一旦 queue:work 命令启动,它将继续运行,直到手动停止或关闭终端:

shell
php artisan queue:work

NOTE

要使 queue:work 进程永久在后台运行,您应该使用进程监视器(如 Supervisor)以确保队列工作者不会停止运行。

如果你想在命令的输出中包含已处理的任务 ID、连接名称和队列名称,可以在执行 queue:work 命令时包含 -v 标志:

shell
php artisan queue:work -v

请记住,队列工作者是长时间运行的进程,并将已启动的应用程序状态存储在内存中。因此,它们在启动后不会注意到代码库中的更改。因此,在部署过程中,请务必重启队列工作者。此外,请记住,应用程序创建或修改的任何静态状态在作业之间不会自动重置。

或者,您可以运行 queue:listen 命令。使用 queue:listen 命令时,当您想重新加载更新的代码或重置应用程序状态时,您无需手动重启工作者;然而,此命令的效率明显低于 queue:work 命令:

shell
php artisan queue:listen

运行多个队列工作者

要为队列分配多个工作者并并发处理作业,您只需启动多个 queue:work 进程。这可以通过在本地通过终端中的多个选项卡完成,或者在生产中使用进程管理器的配置设置完成。使用 Supervisor 时,您可以使用 numprocs 配置值。

指定连接和队列

您还可以指定工作者应使用哪个队列连接。传递给 work 命令的连接名称应对应于 config/queue.php 配置文件中定义的连接之一:

shell
php artisan queue:work redis

默认情况下,queue:work 命令仅处理给定连接上的默认队列的作业。然而,您可以通过仅处理给定连接的特定队列来进一步自定义队列工作者。例如,如果您的所有电子邮件都在 redis 队列连接上的 emails 队列中处理,您可以发出以下命令以启动仅处理该队列的工作者:

shell
php artisan queue:work redis --queue=emails

处理指定数量的作业

可以使用 --once 选项指示工作者仅处理队列中的单个作业:

shell
php artisan queue:work --once

可以使用 --max-jobs 选项指示工作者处理给定数量的作业,然后退出。此选项在与 Supervisor 结合使用时可能很有用,以便您的工作者在处理给定数量的作业后自动重启,释放它们可能积累的任何内存:

shell
php artisan queue:work --max-jobs=1000

处理所有排队的作业然后退出

可以使用 --stop-when-empty 选项指示工作者处理所有作业,然后优雅地退出。如果您希望在队列为空后关闭容器,此选项在 Docker 容器中处理 Laravel 队列时非常有用:

shell
php artisan queue:work --stop-when-empty

处理作业指定的秒数

可以使用 --max-time 选项指示工作者处理作业指定的秒数,然后退出。此选项在与 Supervisor 结合使用时可能很有用,以便您的工作者在处理作业指定的时间后自动重启,释放它们可能积累的任何内存:

shell
# 处理作业一小时然后退出...
php artisan queue:work --max-time=3600

工作者休眠时长

当队列中有作业可用时,工作者将继续处理作业而没有作业之间的延迟。然而,sleep 选项决定了如果没有作业可用,工作者将“休眠”多少秒。当然,在休眠期间,工作者将不会处理任何新作业:

shell
php artisan queue:work --sleep=3

维护模式和队列

当您的应用程序处于维护模式时,不会处理任何队列作业。作业将在应用程序退出维护模式后继续正常处理。

要强制队列工作者在启用维护模式时处理作业,可以使用 --force 选项:

shell
php artisan queue:work --force

资源考虑

守护进程队列工作者在处理每个作业之前不会“重启”框架。因此,您应在每个作业完成后释放任何重资源。例如,如果您使用 GD 库进行图像处理,完成图像处理后应使用 imagedestroy 释放内存。

队列优先级

有时您可能希望优先处理队列。例如,在 config/queue.php 配置文件中,您可以将 redis 连接的默认 queue 设置为 low。然而,有时您可能希望将作业推送到 high 优先级队列,如下所示:

php
dispatch((new Job)->onQueue('high'));

要启动一个工作者,以确保在继续处理 low 队列中的任何作业之前处理所有 high 队列作业,请将队列名称的逗号分隔列表传递给 work 命令:

shell
php artisan queue:work --queue=high,low

队列工作者和部署

由于队列工作者是长时间运行的进程,因此在不重启的情况下不会注意到代码的更改。因此,使用队列工作者部署应用程序的最简单方法是在部署过程中重启工作者。您可以通过发出 queue:restart 命令优雅地重启所有工作者:

shell
php artisan queue:restart

此命令将指示所有队列工作者在完成当前作业后优雅地退出,以便不会丢失现有作业。由于在执行 queue:restart 命令时队列工作者将退出,因此您应该运行一个进程管理器(如 Supervisor)以自动重启队列工作者。

NOTE

队列使用缓存存储重启信号,因此在使用此功能之前,您应验证应用程序已正确配置缓存驱动程序。

作业过期和超时

作业过期

config/queue.php 配置文件中,每个队列连接定义了一个 retry_after 选项。此选项指定队列连接在重试正在处理的作业之前应等待的秒数。例如,如果 retry_after 的值设置为 90,则如果作业已处理 90 秒而未被释放或删除,它将被释放回队列。通常,您应将 retry_after 值设置为作业合理完成处理所需的最大秒数。

WARNING

唯一不包含 retry_after 值的队列连接是 Amazon SQS。SQS 将根据 AWS 控制台中管理的默认可见性超时重试作业。

工作者超时

queue:work Artisan 命令公开了一个 --timeout 选项。默认情况下,--timeout 值为 60 秒。如果作业处理时间超过超时值指定的秒数,处理作业的工作者将以错误退出。通常,工作者将由服务器上配置的进程管理器自动重启:

shell
php artisan queue:work --timeout=60

retry_after 配置选项和 --timeout CLI 选项是不同的,但它们协同工作以确保作业不会丢失,并且作业仅成功处理一次。

WARNING

--timeout 值应始终比 retry_after 配置值短几秒钟。这将确保在作业重试之前始终终止处理冻结作业的工作者。如果您的 --timeout 选项长于 retry_after 配置值,您的作业可能会被处理两次。

Supervisor 配置

在生产环境中,您需要一种方法来保持 queue:work 进程运行。queue:work 进程可能会因多种原因停止运行,例如超出工作者超时或执行 queue:restart 命令。

因此,您需要配置一个进程监视器,以便在 queue:work 进程退出时检测并自动重启它们。此外,进程监视器可以让您指定要同时运行的 queue:work 进程的数量。Supervisor 是 Linux 环境中常用的进程监视器,我们将在以下文档中讨论如何配置它。

安装 Supervisor

Supervisor 是 Linux 操作系统的进程监视器,如果 queue:work 进程失败,将自动重启它们。要在 Ubuntu 上安装 Supervisor,可以使用以下命令:

shell
sudo apt-get install supervisor

NOTE

如果自己配置和管理 Supervisor 听起来很复杂,可以考虑使用 Laravel Cloud,它提供了一个完全托管的平台来运行 Laravel 队列工作者。

配置 Supervisor

Supervisor 配置文件通常存储在 /etc/supervisor/conf.d 目录中。在此目录中,您可以创建任意数量的配置文件,指示 Supervisor 如何监视您的进程。例如,让我们创建一个 laravel-worker.conf 文件,启动并监视 queue:work 进程:

ini
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600

在此示例中,numprocs 指令将指示 Supervisor 运行八个 queue:work 进程并监视所有进程,如果它们失败,将自动重启它们。您应更改配置的 command 指令以反映您所需的队列连接和工作者选项。

WARNING

您应确保 stopwaitsecs 的值大于最长运行作业消耗的秒数。否则,Supervisor 可能会在作业完成处理之前终止作业。

启动 Supervisor

创建配置文件后,您可以使用以下命令更新 Supervisor 配置并启动进程:

shell
sudo supervisorctl reread

sudo supervisorctl update

sudo supervisorctl start "laravel-worker:*"

有关 Supervisor 的更多信息,请查阅 Supervisor 文档

处理失败的任务

有时您的队列任务会失败。别担心,事情并不总是按计划进行!Laravel 提供了一种方便的方法来指定任务应尝试的最大次数。异步任务超过此尝试次数后,将插入到 failed_jobs 数据库表中。同步调度的任务失败时不会存储在此表中,其异常会立即由应用程序处理。

在新的 Laravel 应用程序中,通常已经存在用于创建 failed_jobs 表的迁移。但是,如果您的应用程序不包含此表的迁移,您可以使用 make:queue-failed-table 命令创建迁移:

shell
php artisan make:queue-failed-table

php artisan migrate

运行队列工作者进程时,您可以使用 queue:work 命令上的 --tries 开关指定任务应尝试的最大次数。如果您未为 --tries 选项指定值,任务将仅尝试一次或按任务类的 $tries 属性指定的次数尝试:

shell
php artisan queue:work redis --tries=3

使用 --backoff 选项,您可以指定 Laravel 在重试遇到异常的任务之前应等待多少秒。默认情况下,任务会立即释放回队列,以便可以再次尝试:

shell
php artisan queue:work redis --tries=3 --backoff=3

如果您希望根据每个任务配置 Laravel 在重试遇到异常的任务之前应等待多少秒,可以通过在任务类上定义 backoff 属性来实现:

php
/**
 * 在重试任务之前等待的秒数。
 *
 * @var int
 */
public $backoff = 3;

如果您需要更复杂的逻辑来确定任务的退避时间,可以在任务类上定义一个 backoff 方法:

php
/**
 * 计算在重试任务之前等待的秒数。
 */
public function backoff(): int
{
    return 3;
}

您可以通过从 backoff 方法返回一个退避值数组来轻松配置“指数”退避。在此示例中,重试延迟将为第一次重试 1 秒,第二次重试 5 秒,第三次重试 10 秒,如果还有更多尝试剩余,则每次后续重试 10 秒:

php
/**
 * 计算在重试任务之前等待的秒数。
 *
 * @return array<int, int>
 */
public function backoff(): array
{
    return [1, 5, 10];
}

处理失败任务后的清理

当某个任务失败时,您可能希望向用户发送警报或撤销任务部分完成的任何操作。为此,您可以在任务类上定义一个 failed 方法。导致任务失败的 Throwable 实例将传递给 failed 方法:

php
<?php

namespace App\Jobs;

use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Throwable;

class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    /**
     * 创建一个新的任务实例。
     */
    public function __construct(
        public Podcast $podcast,
    ) {}

    /**
     * 执行任务。
     */
    public function handle(AudioProcessor $processor): void
    {
        // 处理上传的播客...
    }

    /**
     * 处理任务失败。
     */
    public function failed(?Throwable $exception): void
    {
        // 发送用户通知失败等...
    }
}

WARNING

在调用 failed 方法之前,会实例化任务的新实例;因此,在 handle 方法中可能发生的任何类属性修改都将丢失。

重试失败的任务

要查看已插入到 failed_jobs 数据库表中的所有失败任务,您可以使用 queue:failed Artisan 命令:

shell
php artisan queue:failed

queue:failed 命令将列出任务 ID、连接、队列、失败时间和有关任务的其他信息。任务 ID 可用于重试失败的任务。例如,要重试 ID 为 ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 的失败任务,请发出以下命令:

shell
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece

如有必要,您可以将多个 ID 传递给命令:

shell
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d

您还可以重试特定队列的所有失败任务:

shell
php artisan queue:retry --queue=name

要重试所有失败的任务,请执行 queue:retry 命令并将 all 作为 ID 传递:

shell
php artisan queue:retry all

如果您想删除失败的任务,可以使用 queue:forget 命令:

shell
php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d

NOTE

使用 Horizon 时,您应该使用 horizon:forget 命令删除失败的任务,而不是 queue:forget 命令。

要从 failed_jobs 表中删除所有失败的任务,您可以使用 queue:flush 命令:

shell
php artisan queue:flush

忽略缺失的模型

将 Eloquent 模型注入任务时,模型会在放入队列之前自动序列化,并在任务处理时从数据库中重新检索。但是,如果模型在任务等待工作者处理时被删除,您的任务可能会因 ModelNotFoundException 而失败。

为了方便起见,您可以通过将任务的 deleteWhenMissingModels 属性设置为 true 来选择自动删除缺失模型的任务。当此属性设置为 true 时,Laravel 将安静地丢弃任务而不引发异常:

php
/**
 * 如果模型不再存在,则删除任务。
 *
 * @var bool
 */
public $deleteWhenMissingModels = true;

修剪失败的任务

您可以通过调用 queue:prune-failed Artisan 命令来修剪应用程序的 failed_jobs 表中的记录:

shell
php artisan queue:prune-failed

默认情况下,所有超过 24 小时的失败任务记录将被修剪。如果您为命令提供 --hours 选项,则仅保留在最近 N 小时内插入的失败任务记录。例如,以下命令将删除所有超过 48 小时的失败任务记录:

shell
php artisan queue:prune-failed --hours=48

在 DynamoDB 中存储失败的任务

Laravel 还提供支持将失败的任务记录存储在 DynamoDB 中,而不是关系数据库表中。但是,您必须手动创建一个 DynamoDB 表来存储所有失败的任务记录。通常,此表应命名为 failed_jobs,但您应根据应用程序的 queue 配置文件中的 queue.failed.table 配置值命名表。

failed_jobs 表应具有一个名为 application 的字符串主分区键和一个名为 uuid 的字符串主排序键。键的 application 部分将包含应用程序的名称,如应用程序的 app 配置文件中的 name 配置值所定义。由于应用程序名称是 DynamoDB 表键的一部分,您可以使用同一个表来存储多个 Laravel 应用程序的失败任务。

此外,请确保安装 AWS SDK,以便您的 Laravel 应用程序可以与 Amazon DynamoDB 通信:

shell
composer require aws/aws-sdk-php

接下来,将 queue.failed.driver 配置选项的值设置为 dynamodb。此外,您应在失败任务配置数组中定义 keysecretregion 配置选项。这些选项将用于与 AWS 进行身份验证。使用 dynamodb 驱动程序时,不需要 queue.failed.database 配置选项:

php
'failed' => [
    'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
    'key' => env('AWS_ACCESS_KEY_ID'),
    'secret' => env('AWS_SECRET_ACCESS_KEY'),
    'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    'table' => 'failed_jobs',
],

禁用失败任务存储

您可以通过将 queue.failed.driver 配置选项的值设置为 null 来指示 Laravel 丢弃失败的任务而不存储它们。通常,这可以通过 QUEUE_FAILED_DRIVER 环境变量来实现:

ini
QUEUE_FAILED_DRIVER=null

失败任务事件

如果您希望注册一个事件监听器,该监听器将在任务失败时调用,您可以使用 Queue facade 的 failing 方法。例如,我们可以从 Laravel 附带的 AppServiceProviderboot 方法中附加一个闭包到此事件:

php
<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobFailed;

class AppServiceProvider extends ServiceProvider
{
    /**
     * 注册任何应用程序服务。
     */
    public function register(): void
    {
        // ...
    }

    /**
     * 启动任何应用程序服务。
     */
    public function boot(): void
    {
        Queue::failing(function (JobFailed $event) {
            // $event->connectionName
            // $event->job
            // $event->exception
        });
    }
}

从队列中清除任务

NOTE

使用 Horizon 时,您应该使用 horizon:clear 命令从队列中清除任务,而不是 queue:clear 命令。

如果您希望从默认连接的默认队列中删除所有任务,可以使用 queue:clear Artisan 命令:

shell
php artisan queue:clear

您还可以提供 connection 参数和 queue 选项以从特定连接和队列中删除任务:

shell
php artisan queue:clear redis --queue=emails

WARNING

从队列中清除任务仅适用于 SQS、Redis 和数据库队列驱动程序。此外,SQS 消息删除过程最多需要 60 秒,因此在您清除队列后最多 60 秒内发送到 SQS 队列的任务也可能会被删除。

监控您的队列

如果您的队列突然涌入大量任务,可能会导致队列超负荷,从而导致任务完成的等待时间过长。如果您愿意,Laravel 可以在队列任务数量超过指定阈值时提醒您。

要开始,您应该安排 queue:monitor 命令每分钟运行一次。该命令接受您希望监控的队列名称以及您期望的任务数量阈值:

shell
php artisan queue:monitor redis:default,redis:deployments --max=100

仅安排此命令不足以触发通知以提醒您队列的超负荷状态。当命令遇到任务数量超过阈值的队列时,将调度一个 Illuminate\Queue\Events\QueueBusy 事件。您可以在应用程序的 AppServiceProvider 中监听此事件,以便向您或您的开发团队发送通知:

php
use App\Notifications\QueueHasLongWaitTime;
use Illuminate\Queue\Events\QueueBusy;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Notification;

/**
 * 启动任何应用程序服务。
 */
public function boot(): void
{
    Event::listen(function (QueueBusy $event) {
        Notification::route('mail', 'dev@example.com')
            ->notify(new QueueHasLongWaitTime(
                $event->connection,
                $event->queue,
                $event->size
            ));
    });
}

测试

在测试调度任务的代码时,您可能希望指示 Laravel 不实际执行任务本身,因为任务的代码可以直接测试并与调度它的代码分开测试。当然,要测试任务本身,您可以实例化一个任务实例并在测试中直接调用 handle 方法。

您可以使用 Queue facade 的 fake 方法来防止队列任务实际被推送到队列。在调用 Queue facade 的 fake 方法后,您可以断言应用程序尝试将任务推送到队列:

php
<?php

use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;

test('orders can be shipped', function () {
    Queue::fake();

    // 执行订单发货...

    // 断言没有任务被推送...
    Queue::assertNothingPushed();

    // 断言任务被推送到给定队列...
    Queue::assertPushedOn('queue-name', ShipOrder::class);

    // 断言任务被推送了两次...
    Queue::assertPushed(ShipOrder::class, 2);

    // 断言任务没有被推送...
    Queue::assertNotPushed(AnotherJob::class);

    // 断言闭包被推送到队列...
    Queue::assertClosurePushed();

    // 断言推送的任务总数...
    Queue::assertCount(3);
});
php
<?php

namespace Tests\Feature;

use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
use Tests\TestCase;

class ExampleTest extends TestCase
{
    public function test_orders_can_be_shipped(): void
    {
        Queue::fake();

        // 执行订单发货...

        // 断言没有任务被推送...
        Queue::assertNothingPushed();

        // 断言任务被推送到给定队列...
        Queue::assertPushedOn('queue-name', ShipOrder::class);

        // 断言任务被推送了两次...
        Queue::assertPushed(ShipOrder::class, 2);

        // 断言任务没有被推送...
        Queue::assertNotPushed(AnotherJob::class);

        // 断言闭包被推送到队列...
        Queue::assertClosurePushed();

        // 断言推送的任务总数...
        Queue::assertCount(3);
    }
}

您可以将闭包传递给 assertPushedassertNotPushed 方法,以断言推送的任务通过给定的“真值测试”。如果至少有一个任务被推送并通过给定的真值测试,则断言将成功:

php
Queue::assertPushed(function (ShipOrder $job) use ($order) {
    return $job->order->id === $order->id;
});

模拟部分任务

如果您只需要模拟特定任务而允许其他任务正常执行,可以将应模拟的任务类名传递给 fake 方法:

php
test('orders can be shipped', function () {
    Queue::fake([
        ShipOrder::class,
    ]);

    // 执行订单发货...

    // 断言任务被推送了两次...
    Queue::assertPushed(ShipOrder::class, 2);
});
php
public function test_orders_can_be_shipped(): void
{
    Queue::fake([
        ShipOrder::class,
    ]);

    // 执行订单发货...

    // 断言任务被推送了两次...
    Queue::assertPushed(ShipOrder::class, 2);
}

您可以使用 except 方法模拟所有任务,除了指定的一组任务:

php
Queue::fake()->except([
    ShipOrder::class,
]);

测试任务链

要测试任务链,您需要利用 Bus facade 的模拟功能。Bus facade 的 assertChained 方法可用于断言任务链已被调度。assertChained 方法接受一个链式任务数组作为其第一个参数:

php
use App\Jobs\RecordShipment;
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Support\Facades\Bus;

Bus::fake();

// ...

Bus::assertChained([
    ShipOrder::class,
    RecordShipment::class,
    UpdateInventory::class
]);

如上例所示,链式任务数组可以是任务类名的数组。但是,您也可以提供实际任务实例的数组。这样做时,Laravel 将确保任务实例与应用程序调度的链式任务具有相同的类和属性值:

php
Bus::assertChained([
    new ShipOrder,
    new RecordShipment,
    new UpdateInventory,
]);

您可以使用 assertDispatchedWithoutChain 方法断言任务被推送而没有任务链:

php
Bus::assertDispatchedWithoutChain(ShipOrder::class);

测试链修改

如果链式任务在现有链中添加或附加任务,您可以使用任务的 assertHasChain 方法断言任务具有预期的剩余任务链:

php
$job = new ProcessPodcast;

$job->handle();

$job->assertHasChain([
    new TranscribePodcast,
    new OptimizePodcast,
    new ReleasePodcast,
]);

assertDoesntHaveChain 方法可用于断言任务的剩余链为空:

php
$job->assertDoesntHaveChain();

测试链式批处理

如果您的任务链包含一批任务,您可以通过在链断言中插入 Bus::chainedBatch 定义来断言链式批处理符合您的期望:

php
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;

Bus::assertChained([
    new ShipOrder,
    Bus::chainedBatch(function (PendingBatch $batch) {
        return $batch->jobs->count() === 3;
    }),
    new UpdateInventory,
]);

测试任务批处理

Bus facade 的 assertBatched 方法可用于断言一批任务已被调度。传递给 assertBatched 方法的闭包接收一个 Illuminate\Bus\PendingBatch 实例,可用于检查批处理中的任务:

php
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;

Bus::fake();

// ...

Bus::assertBatched(function (PendingBatch $batch) {
    return $batch->name == 'import-csv' &&
           $batch->jobs->count() === 10;
});

您可以使用 assertBatchCount 方法断言调度了给定数量的批处理:

php
Bus::assertBatchCount(3);

您可以使用 assertNothingBatched 断言没有批处理被调度:

php
Bus::assertNothingBatched();

测试任务/批处理交互

此外,您可能偶尔需要测试单个任务与其底层批处理的交互。例如,您可能需要测试任务是否取消了其批处理的进一步处理。为此,您需要通过 withFakeBatch 方法为任务分配一个模拟批处理。withFakeBatch 方法返回一个包含任务实例和模拟批处理的元组:

php
[$job, $batch] = (new ShipOrder)->withFakeBatch();

$job->handle();

$this->assertTrue($batch->cancelled());
$this->assertEmpty($batch->added);

测试任务/队列交互

有时,您可能需要测试队列任务将自己重新释放到队列。或者,您可能需要测试任务是否删除了自己。您可以通过实例化任务并调用 withFakeQueueInteractions 方法来测试这些队列交互。

一旦任务的队列交互被模拟,您可以调用任务的 handle 方法。在调用任务后,可以使用 assertReleasedassertDeletedassertNotDeletedassertFailedassertFailedWithassertNotFailed 方法对任务的队列交互进行断言:

php
use App\Exceptions\CorruptedAudioException;
use App\Jobs\ProcessPodcast;

$job = (new ProcessPodcast)->withFakeQueueInteractions();

$job->handle();

$job->assertReleased(delay: 30);
$job->assertDeleted();
$job->assertNotDeleted();
$job->assertFailed();
$job->assertFailedWith(CorruptedAudioException::class);
$job->assertNotFailed();

任务事件

使用 Queue facade 上的 beforeafter 方法,您可以指定在处理队列任务之前或之后执行的回调。这些回调是执行额外日志记录或为仪表板增加统计数据的绝佳机会。通常,您应该从服务提供者boot 方法中调用这些方法。例如,我们可以使用 Laravel 附带的 AppServiceProvider

php
<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;

class AppServiceProvider extends ServiceProvider
{
    /**
     * 注册任何应用程序服务。
     */
    public function register(): void
    {
        // ...
    }

    /**
     * 启动任何应用程序服务。
     */
    public function boot(): void
    {
        Queue::before(function (JobProcessing $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });

        Queue::after(function (JobProcessed $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });
    }
}

使用 Queue facade 上的 looping 方法,您可以指定在工作者尝试从队列获取任务之前执行的回调。例如,您可以注册一个闭包以回滚任何由先前失败的任务留下的未完成事务:

php
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Queue;

Queue::looping(function () {
    while (DB::transactionLevel() > 0) {
        DB::rollBack();
    }
});