jobqueue: Batch jobs that will end up in the default queue

JobQueueGroup::get( $type ) returns a new object for each job type even
if it's the same class (default class). Also it builds different queues
and push them separelty a couple lines lower limiting the ability to
batch push them in the default job queue object.

Given that JobQueue cares about job types being the same as the class
type (which is important for JobQueueDB implementation), adding a config
option called typeAgnostic to ignore type mismatch in JobQueue
implementations that don't care about the type (like JobQueueEventBus)

Bug: T292048
Change-Id: I151f067ca94a985c816446b545921c387b083911
This commit is contained in:
Amir Sarabadani 2021-09-29 23:03:50 +02:00
parent 2067f04d6c
commit 419c14b013
3 changed files with 46 additions and 9 deletions

View file

@ -53,6 +53,9 @@ abstract class JobQueue {
/** @var WANObjectCache */
protected $wanCache;
/** @var bool */
protected $typeAgnostic;
protected const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
protected const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
@ -61,7 +64,7 @@ abstract class JobQueue {
* @stable to call
*
* @param array $params
* - type : A job type
* - type : A job type, 'default' if typeAgnostic is set
* - domain : A DB domain ID
* - idGenerator : A GlobalIdGenerator instance.
* - wanCache : An instance of WANObjectCache to use for caching [default: none]
@ -70,6 +73,7 @@ abstract class JobQueue {
* - maxTries : Total times a job can be tried, assuming claims expire [default: 3]
* - order : Queue order, one of ("fifo", "timestamp", "random") [default: variable]
* - readOnlyReason : Mark the queue as read-only with this reason [default: false]
* - typeAgnostic : If the jobqueue should operate agnostic to the job types
* @throws JobQueueError
*
*/
@ -90,6 +94,13 @@ abstract class JobQueue {
$this->stats = $params['stats'] ?? new NullStatsdDataFactory();
$this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
$this->idGenerator = $params['idGenerator'];
if ( ( $params['typeAgnostic'] ?? false ) && !$this->supportsTypeAgnostic() ) {
throw new JobQueueError( __CLASS__ . " does not support type agnostic queues." );
}
$this->typeAgnostic = ( $params['typeAgnostic'] ?? false );
if ( $this->typeAgnostic ) {
$this->type = 'default';
}
}
/**
@ -393,7 +404,7 @@ abstract class JobQueue {
// Flag this job as an old duplicate based on its "root" job...
try {
if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
$this->incrStats( 'dupe_pops', $this->type );
$this->incrStats( 'dupe_pops', $job->getType() );
$job = DuplicateJob::newFromJob( $job ); // convert to a no-op
}
} catch ( Exception $e ) {
@ -483,7 +494,7 @@ abstract class JobQueue {
throw new JobQueueError( "Cannot register root job; missing parameters." );
}
$key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
$key = $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() );
// Callers should call JobQueueGroup::push() before this method so that if the
// insert fails, the de-duplication registration will be aborted. Having only the
// de-duplication registration succeed would cause jobs to become no-ops without
@ -522,7 +533,7 @@ abstract class JobQueue {
return false; // job has no de-deplication info
}
$key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
$key = $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() );
// Get the last time this root job was enqueued
$timestamp = $this->wanCache->get( $key );
if ( $timestamp === false || $params['rootJobTimestamp'] > $timestamp ) {
@ -536,13 +547,14 @@ abstract class JobQueue {
/**
* @param string $signature Hash identifier of the root job
* @param string $type job type
* @return string
*/
protected function getRootJobCacheKey( $signature ) {
protected function getRootJobCacheKey( $signature, $type ) {
return $this->wanCache->makeGlobalKey(
'jobqueue',
$this->domain,
$this->type,
$type,
'rootjob',
$signature
);
@ -739,6 +751,9 @@ abstract class JobQueue {
* @throws JobQueueError
*/
private function assertMatchingJobType( IJobSpecification $job ) {
if ( $this->typeAgnostic ) {
return;
}
if ( $job->getType() !== $this->type ) {
throw new JobQueueError( "Got '{$job->getType()}' job; expected '{$this->type}'." );
}
@ -756,4 +771,14 @@ abstract class JobQueue {
$this->stats->updateCount( "jobqueue.{$key}.all", $delta );
$this->stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
}
/**
* Subclasses should set this to true if they support type agnostic queues
*
* @return bool
* @since 1.38
*/
protected function supportsTypeAgnostic(): bool {
return false;
}
}

View file

@ -175,7 +175,19 @@ class JobQueueGroup {
$jobsByType = []; // (job type => list of jobs)
foreach ( $jobs as $job ) {
$jobsByType[$job->getType()][] = $job;
$type = $job->getType();
if ( isset( $this->jobTypeConfiguration[$type] ) ) {
$jobsByType[$type][] = $job;
} else {
if (
isset( $this->jobTypeConfiguration['default']['typeAgnostic'] ) &&
$this->jobTypeConfiguration['default']['typeAgnostic']
) {
$jobsByType['default'][] = $job;
} else {
$jobsByType[$type][] = $job;
}
}
}
foreach ( $jobsByType as $type => $jobs ) {

View file

@ -447,7 +447,7 @@ LUA;
}
$params = $job->getRootJobParams();
$key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
$key = $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() );
$conn = $this->getConnection();
try {
@ -478,7 +478,7 @@ LUA;
$conn = $this->getConnection();
try {
// Get the last time this root job was enqueued
$timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) );
$timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'], $job->getType() ) );
} catch ( RedisException $e ) {
throw $this->handleErrorAndMakeException( $conn, $e );
}