[JobQueue] Factored "root job" de-duplication code into base class.

* Moved the root job handling up to the base class and provided
  some default function implementations.
* Also bumped ROOTJOB_TTL from 14 to 28 days.

Change-Id: I70bc043bfc039c5d0b009e0b5d39fd2887f46093
This commit is contained in:
Aaron Schulz 2013-02-28 14:21:28 -08:00
parent fc00763f0d
commit dbba2ec623
3 changed files with 86 additions and 65 deletions

View file

@ -37,6 +37,8 @@ abstract class JobQueue {
const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions
const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
/**
* @param $params array
*/
@ -262,6 +264,15 @@ abstract class JobQueue {
wfProfileIn( __METHOD__ );
$job = $this->doPop();
wfProfileOut( __METHOD__ );
// Flag this job as an old duplicate based on its "root" job...
try {
if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
wfIncrStats( 'job-pop-duplicate' );
$job = DuplicateJob::newFromJob( $job ); // convert to a no-op
}
} catch ( MWException $e ) {} // don't lose jobs over this
return $job;
}
@ -344,7 +355,76 @@ abstract class JobQueue {
* @return bool
*/
protected function doDeduplicateRootJob( Job $job ) {
return true;
global $wgMemc;
$params = $job->getParams();
if ( !isset( $params['rootJobSignature'] ) ) {
throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
} elseif ( !isset( $params['rootJobTimestamp'] ) ) {
throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
}
$key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
// Callers should call batchInsert() and then this function so that if the insert
// fails, the de-duplication registration will be aborted. Since the insert is
// deferred till "transaction idle", do the same here, so that the ordering is
// maintained. Having only the de-duplication registration succeed would cause
// jobs to become no-ops without any actual jobs that made them redundant.
$timestamp = $wgMemc->get( $key ); // current last timestamp of this job
if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
return true; // a newer version of this root job was enqueued
}
// Update the timestamp of the last root job started at the location...
return $wgMemc->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
}
/**
* Check if the "root" job of a given job has been superseded by a newer one
*
* @param $job Job
* @return bool
* @throws MWException
*/
final protected function isRootJobOldDuplicate( Job $job ) {
if ( $job->getType() !== $this->type ) {
throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
}
wfProfileIn( __METHOD__ );
$isDuplicate = $this->doIsRootJobOldDuplicate( $job );
wfProfileOut( __METHOD__ );
return $isDuplicate;
}
/**
* @see JobQueue::isRootJobOldDuplicate()
* @param Job $job
* @return bool
*/
protected function doIsRootJobOldDuplicate( Job $job ) {
global $wgMemc;
$params = $job->getParams();
if ( !isset( $params['rootJobSignature'] ) ) {
return false; // job has no de-deplication info
} elseif ( !isset( $params['rootJobTimestamp'] ) ) {
trigger_error( "Cannot check root job; missing 'rootJobTimestamp'." );
return false;
}
// Get the last time this root job was enqueued
$timestamp = $wgMemc->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) );
// Check if a new root job was started at the location after this one's...
return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
}
/**
* @param string $signature Hash identifier of the root job
* @return string
*/
protected function getRootJobCacheKey( $signature ) {
list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
}
/**

View file

@ -28,7 +28,6 @@
* @since 1.21
*/
class JobQueueDB extends JobQueue {
const ROOTJOB_TTL = 1209600; // integer; seconds to remember root jobs (14 days)
const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
@ -252,11 +251,6 @@ class JobQueueDB extends JobQueue {
$job = Job::factory( $row->job_cmd, $title,
self::extractBlob( $row->job_params ), $row->job_id );
$job->id = $row->job_id; // XXX: work around broken subclasses
// Flag this job as an old duplicate based on its "root" job...
if ( $this->isRootJobOldDuplicate( $job ) ) {
wfIncrStats( 'job-pop-duplicate' );
$job = DuplicateJob::newFromJob( $job ); // convert to a no-op
}
break; // done
} while( true );
@ -533,30 +527,6 @@ class JobQueueDB extends JobQueue {
return true;
}
/**
* Check if the "root" job of a given job has been superseded by a newer one
*
* @param $job Job
* @return bool
*/
protected function isRootJobOldDuplicate( Job $job ) {
global $wgMemc;
$params = $job->getParams();
if ( !isset( $params['rootJobSignature'] ) ) {
return false; // job has no de-deplication info
} elseif ( !isset( $params['rootJobTimestamp'] ) ) {
trigger_error( "Cannot check root job; missing 'rootJobTimestamp'." );
return false;
}
// Get the last time this root job was enqueued
$timestamp = $wgMemc->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) );
// Check if a new root job was started at the location after this one's...
return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
}
/**
* @see JobQueue::doWaitForBackups()
* @return void
@ -671,15 +641,6 @@ class JobQueueDB extends JobQueue {
return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property );
}
/**
* @param string $signature Hash identifier of the root job
* @return string
*/
private function getRootJobCacheKey( $signature ) {
list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
}
/**
* @param $params
* @return string

View file

@ -60,7 +60,6 @@ class JobQueueRedis extends JobQueue {
protected $server; // string; server address
const ROOTJOB_TTL = 1209600; // integer; seconds to remember root jobs (14 days)
const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days)
protected $key; // string; key to prefix the queue keys with (used for testing)
@ -274,14 +273,6 @@ LUA;
$this->throwRedisException( $this->server, $conn, $e );
}
// Flag this job as an old duplicate based on its "root" job...
try {
if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
wfIncrStats( 'job-pop-duplicate' );
return DuplicateJob::newFromJob( $job ); // convert to a no-op
}
} catch ( MWException $e ) {} // don't lose jobs over this
return $job;
}
@ -407,7 +398,7 @@ LUA;
} elseif ( !isset( $params['rootJobTimestamp'] ) ) {
throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
}
$key = $this->getRootJobKey( $params['rootJobSignature'] );
$key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
$conn = $this->getConnection();
try {
@ -423,13 +414,11 @@ LUA;
}
/**
* Check if the "root" job of a given job has been superseded by a newer one
*
* @param $job Job
* @see JobQueue::doIsRootJobOldDuplicate()
* @param Job $job
* @return bool
* @throws MWException
*/
protected function isRootJobOldDuplicate( Job $job ) {
protected function doIsRootJobOldDuplicate( Job $job ) {
$params = $job->getParams();
if ( !isset( $params['rootJobSignature'] ) ) {
return false; // job has no de-deplication info
@ -441,7 +430,7 @@ LUA;
$conn = $this->getConnection();
try {
// Get the last time this root job was enqueued
$timestamp = $conn->get( $this->getRootJobKey( $params['rootJobSignature'] ) );
$timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) );
} catch ( RedisException $e ) {
$this->throwRedisException( $this->server, $conn, $e );
}
@ -674,15 +663,6 @@ LUA;
}
}
/**
* @param string $signature Hash identifier of the root job
* @return string
*/
private function getRootJobKey( $signature ) {
list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
}
/**
* @param $key string
* @return void