jobqueue: cleaned up JobQueue exception handling
* Added JobQueueError exceptions. * Periodic tasks that fail are logged and skipped. * JobQueueFederated properly fails over now. Change-Id: I9d9f0dae548a9dde693a7cd25c255a8bfbf37899
This commit is contained in:
parent
db51c53b8f
commit
450e9c3258
6 changed files with 265 additions and 166 deletions
|
|
@ -665,6 +665,8 @@ $wgAutoloadLocalClasses = array(
|
|||
'JobQueueAggregatorMemc' => 'includes/job/aggregator/JobQueueAggregatorMemc.php',
|
||||
'JobQueueAggregatorRedis' => 'includes/job/aggregator/JobQueueAggregatorRedis.php',
|
||||
'JobQueueDB' => 'includes/job/JobQueueDB.php',
|
||||
'JobQueueConnectionError' => 'includes/job/JobQueue.php',
|
||||
'JobQueueError' => 'includes/job/JobQueue.php',
|
||||
'JobQueueGroup' => 'includes/job/JobQueueGroup.php',
|
||||
'JobQueueFederated' => 'includes/job/JobQueueFederated.php',
|
||||
'JobQueueRedis' => 'includes/job/JobQueueRedis.php',
|
||||
|
|
|
|||
|
|
@ -168,7 +168,7 @@ abstract class JobQueue {
|
|||
* not distinguishable from the race condition between isEmpty() and pop().
|
||||
*
|
||||
* @return bool
|
||||
* @throws MWException
|
||||
* @throws JobQueueError
|
||||
*/
|
||||
final public function isEmpty() {
|
||||
wfProfileIn( __METHOD__ );
|
||||
|
|
@ -190,7 +190,7 @@ abstract class JobQueue {
|
|||
* If caching is used, this number might be out of date for a minute.
|
||||
*
|
||||
* @return integer
|
||||
* @throws MWException
|
||||
* @throws JobQueueError
|
||||
*/
|
||||
final public function getSize() {
|
||||
wfProfileIn( __METHOD__ );
|
||||
|
|
@ -212,7 +212,7 @@ abstract class JobQueue {
|
|||
* If caching is used, this number might be out of date for a minute.
|
||||
*
|
||||
* @return integer
|
||||
* @throws MWException
|
||||
* @throws JobQueueError
|
||||
*/
|
||||
final public function getAcquiredCount() {
|
||||
wfProfileIn( __METHOD__ );
|
||||
|
|
@ -234,7 +234,7 @@ abstract class JobQueue {
|
|||
* If caching is used, this number might be out of date for a minute.
|
||||
*
|
||||
* @return integer
|
||||
* @throws MWException
|
||||
* @throws JobQueueError
|
||||
* @since 1.22
|
||||
*/
|
||||
final public function getDelayedCount() {
|
||||
|
|
@ -259,7 +259,7 @@ abstract class JobQueue {
|
|||
* If caching is used, this number might be out of date for a minute.
|
||||
*
|
||||
* @return integer
|
||||
* @throws MWException
|
||||
* @throws JobQueueError
|
||||
*/
|
||||
final public function getAbandonedCount() {
|
||||
wfProfileIn( __METHOD__ );
|
||||
|
|
@ -284,7 +284,7 @@ abstract class JobQueue {
|
|||
* @param $jobs Job|Array
|
||||
* @param $flags integer Bitfield (supports JobQueue::QOS_ATOMIC)
|
||||
* @return bool Returns false on failure
|
||||
* @throws MWException
|
||||
* @throws JobQueueError
|
||||
*/
|
||||
final public function push( $jobs, $flags = 0 ) {
|
||||
return $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags );
|
||||
|
|
@ -298,7 +298,7 @@ abstract class JobQueue {
|
|||
* @param array $jobs List of Jobs
|
||||
* @param $flags integer Bitfield (supports JobQueue::QOS_ATOMIC)
|
||||
* @return bool Returns false on failure
|
||||
* @throws MWException
|
||||
* @throws JobQueueError
|
||||
*/
|
||||
final public function batchPush( array $jobs, $flags = 0 ) {
|
||||
if ( !count( $jobs ) ) {
|
||||
|
|
@ -333,7 +333,7 @@ abstract class JobQueue {
|
|||
* Outside callers should use JobQueueGroup::pop() instead of this function.
|
||||
*
|
||||
* @return Job|bool Returns false if there are no jobs
|
||||
* @throws MWException
|
||||
* @throws JobQueueError
|
||||
*/
|
||||
final public function pop() {
|
||||
global $wgJobClasses;
|
||||
|
|
@ -374,7 +374,7 @@ abstract class JobQueue {
|
|||
*
|
||||
* @param $job Job
|
||||
* @return bool
|
||||
* @throws MWException
|
||||
* @throws JobQueueError
|
||||
*/
|
||||
final public function ack( Job $job ) {
|
||||
if ( $job->getType() !== $this->type ) {
|
||||
|
|
@ -421,7 +421,7 @@ abstract class JobQueue {
|
|||
*
|
||||
* @param $job Job
|
||||
* @return bool
|
||||
* @throws MWException
|
||||
* @throws JobQueueError
|
||||
*/
|
||||
final public function deduplicateRootJob( Job $job ) {
|
||||
if ( $job->getType() !== $this->type ) {
|
||||
|
|
@ -466,7 +466,7 @@ abstract class JobQueue {
|
|||
*
|
||||
* @param $job Job
|
||||
* @return bool
|
||||
* @throws MWException
|
||||
* @throws JobQueueError
|
||||
*/
|
||||
final protected function isRootJobOldDuplicate( Job $job ) {
|
||||
if ( $job->getType() !== $this->type ) {
|
||||
|
|
@ -511,7 +511,7 @@ abstract class JobQueue {
|
|||
* Deleted all unclaimed and delayed jobs from the queue
|
||||
*
|
||||
* @return bool Success
|
||||
* @throws MWException
|
||||
* @throws JobQueueError
|
||||
* @since 1.22
|
||||
*/
|
||||
final public function delete() {
|
||||
|
|
@ -535,7 +535,7 @@ abstract class JobQueue {
|
|||
* This does nothing for certain queue classes.
|
||||
*
|
||||
* @return void
|
||||
* @throws MWException
|
||||
* @throws JobQueueError
|
||||
*/
|
||||
final public function waitForBackups() {
|
||||
wfProfileIn( __METHOD__ );
|
||||
|
|
@ -600,7 +600,7 @@ abstract class JobQueue {
|
|||
* Note: results may be stale if the queue is concurrently modified.
|
||||
*
|
||||
* @return Iterator
|
||||
* @throws MWException
|
||||
* @throws JobQueueError
|
||||
*/
|
||||
abstract public function getAllQueuedJobs();
|
||||
|
||||
|
|
@ -609,7 +609,7 @@ abstract class JobQueue {
|
|||
* Note: results may be stale if the queue is concurrently modified.
|
||||
*
|
||||
* @return Iterator
|
||||
* @throws MWException
|
||||
* @throws JobQueueError
|
||||
* @since 1.22
|
||||
*/
|
||||
public function getAllDelayedJobs() {
|
||||
|
|
@ -640,3 +640,10 @@ abstract class JobQueue {
|
|||
throw new MWException( "Queue namespacing not supported for this queue type." );
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @ingroup JobQueue
|
||||
* @since 1.22
|
||||
*/
|
||||
class JobQueueError extends MWException {}
|
||||
class JobQueueConnectionError extends JobQueueError {}
|
||||
|
|
|
|||
|
|
@ -80,9 +80,13 @@ class JobQueueDB extends JobQueue {
|
|||
}
|
||||
|
||||
list( $dbr, $scope ) = $this->getSlaveDB();
|
||||
$found = $dbr->selectField( // unclaimed job
|
||||
'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__
|
||||
);
|
||||
try {
|
||||
$found = $dbr->selectField( // unclaimed job
|
||||
'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__
|
||||
);
|
||||
} catch ( DBError $e ) {
|
||||
$this->throwDBException( $e );
|
||||
}
|
||||
$this->cache->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG );
|
||||
|
||||
return !$found;
|
||||
|
|
@ -100,11 +104,15 @@ class JobQueueDB extends JobQueue {
|
|||
return $size;
|
||||
}
|
||||
|
||||
list( $dbr, $scope ) = $this->getSlaveDB();
|
||||
$size = (int)$dbr->selectField( 'job', 'COUNT(*)',
|
||||
array( 'job_cmd' => $this->type, 'job_token' => '' ),
|
||||
__METHOD__
|
||||
);
|
||||
try {
|
||||
list( $dbr, $scope ) = $this->getSlaveDB();
|
||||
$size = (int)$dbr->selectField( 'job', 'COUNT(*)',
|
||||
array( 'job_cmd' => $this->type, 'job_token' => '' ),
|
||||
__METHOD__
|
||||
);
|
||||
} catch ( DBError $e ) {
|
||||
$this->throwDBException( $e );
|
||||
}
|
||||
$this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
|
||||
|
||||
return $size;
|
||||
|
|
@ -127,10 +135,14 @@ class JobQueueDB extends JobQueue {
|
|||
}
|
||||
|
||||
list( $dbr, $scope ) = $this->getSlaveDB();
|
||||
$count = (int)$dbr->selectField( 'job', 'COUNT(*)',
|
||||
array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ),
|
||||
__METHOD__
|
||||
);
|
||||
try {
|
||||
$count = (int)$dbr->selectField( 'job', 'COUNT(*)',
|
||||
array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ),
|
||||
__METHOD__
|
||||
);
|
||||
} catch ( DBError $e ) {
|
||||
$this->throwDBException( $e );
|
||||
}
|
||||
$this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
|
||||
|
||||
return $count;
|
||||
|
|
@ -156,14 +168,18 @@ class JobQueueDB extends JobQueue {
|
|||
}
|
||||
|
||||
list( $dbr, $scope ) = $this->getSlaveDB();
|
||||
$count = (int)$dbr->selectField( 'job', 'COUNT(*)',
|
||||
array(
|
||||
'job_cmd' => $this->type,
|
||||
"job_token != {$dbr->addQuotes( '' )}",
|
||||
"job_attempts >= " . $dbr->addQuotes( $this->maxTries )
|
||||
),
|
||||
__METHOD__
|
||||
);
|
||||
try {
|
||||
$count = (int)$dbr->selectField( 'job', 'COUNT(*)',
|
||||
array(
|
||||
'job_cmd' => $this->type,
|
||||
"job_token != {$dbr->addQuotes( '' )}",
|
||||
"job_attempts >= " . $dbr->addQuotes( $this->maxTries )
|
||||
),
|
||||
__METHOD__
|
||||
);
|
||||
} catch ( DBError $e ) {
|
||||
$this->throwDBException( $e );
|
||||
}
|
||||
$wgMemc->set( $key, $count, self::CACHE_TTL_SHORT );
|
||||
|
||||
return $count;
|
||||
|
|
@ -269,43 +285,47 @@ class JobQueueDB extends JobQueue {
|
|||
}
|
||||
|
||||
list( $dbw, $scope ) = $this->getMasterDB();
|
||||
$dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
|
||||
$autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
|
||||
$dbw->clearFlag( DBO_TRX ); // make each query its own transaction
|
||||
$scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) {
|
||||
$dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
|
||||
} );
|
||||
try {
|
||||
$dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
|
||||
$autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
|
||||
$dbw->clearFlag( DBO_TRX ); // make each query its own transaction
|
||||
$scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) {
|
||||
$dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
|
||||
} );
|
||||
|
||||
$uuid = wfRandomString( 32 ); // pop attempt
|
||||
$job = false; // job popped off
|
||||
do { // retry when our row is invalid or deleted as a duplicate
|
||||
// Try to reserve a row in the DB...
|
||||
if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) {
|
||||
$row = $this->claimOldest( $uuid );
|
||||
} else { // random first
|
||||
$rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
|
||||
$gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
|
||||
$row = $this->claimRandom( $uuid, $rand, $gte );
|
||||
}
|
||||
// Check if we found a row to reserve...
|
||||
if ( !$row ) {
|
||||
$this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG );
|
||||
break; // nothing to do
|
||||
}
|
||||
JobQueue::incrStats( 'job-pop', $this->type );
|
||||
// Get the job object from the row...
|
||||
$title = Title::makeTitleSafe( $row->job_namespace, $row->job_title );
|
||||
if ( !$title ) {
|
||||
$dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
|
||||
wfDebug( "Row has invalid title '{$row->job_title}'." );
|
||||
continue; // try again
|
||||
}
|
||||
$job = Job::factory( $row->job_cmd, $title,
|
||||
self::extractBlob( $row->job_params ), $row->job_id );
|
||||
$job->metadata['id'] = $row->job_id;
|
||||
$job->id = $row->job_id; // XXX: work around broken subclasses
|
||||
break; // done
|
||||
} while ( true );
|
||||
$uuid = wfRandomString( 32 ); // pop attempt
|
||||
$job = false; // job popped off
|
||||
do { // retry when our row is invalid or deleted as a duplicate
|
||||
// Try to reserve a row in the DB...
|
||||
if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) {
|
||||
$row = $this->claimOldest( $uuid );
|
||||
} else { // random first
|
||||
$rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
|
||||
$gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
|
||||
$row = $this->claimRandom( $uuid, $rand, $gte );
|
||||
}
|
||||
// Check if we found a row to reserve...
|
||||
if ( !$row ) {
|
||||
$this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG );
|
||||
break; // nothing to do
|
||||
}
|
||||
JobQueue::incrStats( 'job-pop', $this->type );
|
||||
// Get the job object from the row...
|
||||
$title = Title::makeTitleSafe( $row->job_namespace, $row->job_title );
|
||||
if ( !$title ) {
|
||||
$dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
|
||||
wfDebug( "Row has invalid title '{$row->job_title}'." );
|
||||
continue; // try again
|
||||
}
|
||||
$job = Job::factory( $row->job_cmd, $title,
|
||||
self::extractBlob( $row->job_params ), $row->job_id );
|
||||
$job->metadata['id'] = $row->job_id;
|
||||
$job->id = $row->job_id; // XXX: work around broken subclasses
|
||||
break; // done
|
||||
} while ( true );
|
||||
} catch ( DBError $e ) {
|
||||
$this->throwDBException( $e );
|
||||
}
|
||||
|
||||
return $job;
|
||||
}
|
||||
|
|
@ -461,16 +481,20 @@ class JobQueueDB extends JobQueue {
|
|||
}
|
||||
|
||||
list( $dbw, $scope ) = $this->getMasterDB();
|
||||
$dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
|
||||
$autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
|
||||
$dbw->clearFlag( DBO_TRX ); // make each query its own transaction
|
||||
$scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) {
|
||||
$dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
|
||||
} );
|
||||
try {
|
||||
$dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
|
||||
$autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
|
||||
$dbw->clearFlag( DBO_TRX ); // make each query its own transaction
|
||||
$scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) {
|
||||
$dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
|
||||
} );
|
||||
|
||||
// Delete a row with a single DELETE without holding row locks over RTTs...
|
||||
$dbw->delete( 'job',
|
||||
array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ );
|
||||
// Delete a row with a single DELETE without holding row locks over RTTs...
|
||||
$dbw->delete( 'job',
|
||||
array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ );
|
||||
} catch ( DBError $e ) {
|
||||
$this->throwDBException( $e );
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
@ -516,7 +540,11 @@ class JobQueueDB extends JobQueue {
|
|||
protected function doDelete() {
|
||||
list( $dbw, $scope ) = $this->getMasterDB();
|
||||
|
||||
$dbw->delete( 'job', array( 'job_cmd' => $this->type ) );
|
||||
try {
|
||||
$dbw->delete( 'job', array( 'job_cmd' => $this->type ) );
|
||||
} catch ( DBError $e ) {
|
||||
$this->throwDBException( $e );
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
@ -555,20 +583,25 @@ class JobQueueDB extends JobQueue {
|
|||
*/
|
||||
public function getAllQueuedJobs() {
|
||||
list( $dbr, $scope ) = $this->getSlaveDB();
|
||||
return new MappedIterator(
|
||||
$dbr->select( 'job', '*', array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ),
|
||||
function( $row ) use ( $scope ) {
|
||||
$job = Job::factory(
|
||||
$row->job_cmd,
|
||||
Title::makeTitle( $row->job_namespace, $row->job_title ),
|
||||
strlen( $row->job_params ) ? unserialize( $row->job_params ) : false,
|
||||
$row->job_id
|
||||
);
|
||||
$job->metadata['id'] = $row->job_id;
|
||||
$job->id = $row->job_id; // XXX: work around broken subclasses
|
||||
return $job;
|
||||
}
|
||||
);
|
||||
try {
|
||||
return new MappedIterator(
|
||||
$dbr->select( 'job', '*',
|
||||
array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ),
|
||||
function( $row ) use ( $scope ) {
|
||||
$job = Job::factory(
|
||||
$row->job_cmd,
|
||||
Title::makeTitle( $row->job_namespace, $row->job_title ),
|
||||
strlen( $row->job_params ) ? unserialize( $row->job_params ) : false,
|
||||
$row->job_id
|
||||
);
|
||||
$job->metadata['id'] = $row->job_id;
|
||||
$job->id = $row->job_id; // XXX: work around broken subclasses
|
||||
return $job;
|
||||
}
|
||||
);
|
||||
} catch ( DBError $e ) {
|
||||
$this->throwDBException( $e );
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -578,75 +611,79 @@ class JobQueueDB extends JobQueue {
|
|||
*/
|
||||
public function recycleAndDeleteStaleJobs() {
|
||||
$now = time();
|
||||
list( $dbw, $scope ) = $this->getMasterDB();
|
||||
$count = 0; // affected rows
|
||||
list( $dbw, $scope ) = $this->getMasterDB();
|
||||
|
||||
if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
|
||||
return $count; // already in progress
|
||||
}
|
||||
try {
|
||||
if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
|
||||
return $count; // already in progress
|
||||
}
|
||||
|
||||
// Remove claims on jobs acquired for too long if enabled...
|
||||
if ( $this->claimTTL > 0 ) {
|
||||
$claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
|
||||
// Get the IDs of jobs that have be claimed but not finished after too long.
|
||||
// These jobs can be recycled into the queue by expiring the claim. Selecting
|
||||
// the IDs first means that the UPDATE can be done by primary key (less deadlocks).
|
||||
$res = $dbw->select( 'job', 'job_id',
|
||||
array(
|
||||
'job_cmd' => $this->type,
|
||||
"job_token != {$dbw->addQuotes( '' )}", // was acquired
|
||||
"job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
|
||||
"job_attempts < {$dbw->addQuotes( $this->maxTries )}" ), // retries left
|
||||
__METHOD__
|
||||
// Remove claims on jobs acquired for too long if enabled...
|
||||
if ( $this->claimTTL > 0 ) {
|
||||
$claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
|
||||
// Get the IDs of jobs that have be claimed but not finished after too long.
|
||||
// These jobs can be recycled into the queue by expiring the claim. Selecting
|
||||
// the IDs first means that the UPDATE can be done by primary key (less deadlocks).
|
||||
$res = $dbw->select( 'job', 'job_id',
|
||||
array(
|
||||
'job_cmd' => $this->type,
|
||||
"job_token != {$dbw->addQuotes( '' )}", // was acquired
|
||||
"job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
|
||||
"job_attempts < {$dbw->addQuotes( $this->maxTries )}" ), // retries left
|
||||
__METHOD__
|
||||
);
|
||||
$ids = array_map(
|
||||
function( $o ) {
|
||||
return $o->job_id;
|
||||
}, iterator_to_array( $res )
|
||||
);
|
||||
if ( count( $ids ) ) {
|
||||
// Reset job_token for these jobs so that other runners will pick them up.
|
||||
// Set the timestamp to the current time, as it is useful to now that the job
|
||||
// was already tried before (the timestamp becomes the "released" time).
|
||||
$dbw->update( 'job',
|
||||
array(
|
||||
'job_token' => '',
|
||||
'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release
|
||||
array(
|
||||
'job_id' => $ids ),
|
||||
__METHOD__
|
||||
);
|
||||
$count += $dbw->affectedRows();
|
||||
JobQueue::incrStats( 'job-recycle', $this->type, $dbw->affectedRows() );
|
||||
$this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG );
|
||||
}
|
||||
}
|
||||
|
||||
// Just destroy any stale jobs...
|
||||
$pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
|
||||
$conds = array(
|
||||
'job_cmd' => $this->type,
|
||||
"job_token != {$dbw->addQuotes( '' )}", // was acquired
|
||||
"job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
|
||||
);
|
||||
if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
|
||||
$conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
|
||||
}
|
||||
// Get the IDs of jobs that are considered stale and should be removed. Selecting
|
||||
// the IDs first means that the UPDATE can be done by primary key (less deadlocks).
|
||||
$res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
|
||||
$ids = array_map(
|
||||
function( $o ) {
|
||||
return $o->job_id;
|
||||
}, iterator_to_array( $res )
|
||||
);
|
||||
if ( count( $ids ) ) {
|
||||
// Reset job_token for these jobs so that other runners will pick them up.
|
||||
// Set the timestamp to the current time, as it is useful to now that the job
|
||||
// was already tried before (the timestamp becomes the "released" time).
|
||||
$dbw->update( 'job',
|
||||
array(
|
||||
'job_token' => '',
|
||||
'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release
|
||||
array(
|
||||
'job_id' => $ids ),
|
||||
__METHOD__
|
||||
);
|
||||
$dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ );
|
||||
$count += $dbw->affectedRows();
|
||||
JobQueue::incrStats( 'job-recycle', $this->type, $dbw->affectedRows() );
|
||||
$this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG );
|
||||
JobQueue::incrStats( 'job-abandon', $this->type, $dbw->affectedRows() );
|
||||
}
|
||||
}
|
||||
|
||||
// Just destroy any stale jobs...
|
||||
$pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
|
||||
$conds = array(
|
||||
'job_cmd' => $this->type,
|
||||
"job_token != {$dbw->addQuotes( '' )}", // was acquired
|
||||
"job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
|
||||
);
|
||||
if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
|
||||
$conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
|
||||
$dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
|
||||
} catch ( DBError $e ) {
|
||||
$this->throwDBException( $e );
|
||||
}
|
||||
// Get the IDs of jobs that are considered stale and should be removed. Selecting
|
||||
// the IDs first means that the UPDATE can be done by primary key (less deadlocks).
|
||||
$res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
|
||||
$ids = array_map(
|
||||
function( $o ) {
|
||||
return $o->job_id;
|
||||
}, iterator_to_array( $res )
|
||||
);
|
||||
if ( count( $ids ) ) {
|
||||
$dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ );
|
||||
$count += $dbw->affectedRows();
|
||||
JobQueue::incrStats( 'job-abandon', $this->type, $dbw->affectedRows() );
|
||||
}
|
||||
|
||||
$dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
|
||||
|
||||
return $count;
|
||||
}
|
||||
|
|
@ -655,14 +692,22 @@ class JobQueueDB extends JobQueue {
|
|||
* @return Array (DatabaseBase, ScopedCallback)
|
||||
*/
|
||||
protected function getSlaveDB() {
|
||||
return $this->getDB( DB_SLAVE );
|
||||
try {
|
||||
return $this->getDB( DB_SLAVE );
|
||||
} catch ( DBConnectionError $e ) {
|
||||
throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Array (DatabaseBase, ScopedCallback)
|
||||
*/
|
||||
protected function getMasterDB() {
|
||||
return $this->getDB( DB_MASTER );
|
||||
try {
|
||||
return $this->getDB( DB_MASTER );
|
||||
} catch ( DBConnectionError $e ) {
|
||||
throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -737,4 +782,12 @@ class JobQueueDB extends JobQueue {
|
|||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param DBError $e
|
||||
* @throws JobQueueError
|
||||
*/
|
||||
protected function throwDBException( DBError $e ) {
|
||||
throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -138,9 +138,13 @@ class JobQueueFederated extends JobQueue {
|
|||
}
|
||||
|
||||
foreach ( $this->partitionQueues as $queue ) {
|
||||
if ( !$queue->doIsEmpty() ) {
|
||||
$this->cache->add( $key, 'false', self::CACHE_TTL_LONG );
|
||||
return false;
|
||||
try {
|
||||
if ( !$queue->doIsEmpty() ) {
|
||||
$this->cache->add( $key, 'false', self::CACHE_TTL_LONG );
|
||||
return false;
|
||||
}
|
||||
} catch ( JobQueueError $e ) {
|
||||
wfDebugLog( 'exception', $e->getLogMessage() );
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -179,7 +183,11 @@ class JobQueueFederated extends JobQueue {
|
|||
|
||||
$count = 0;
|
||||
foreach ( $this->partitionQueues as $queue ) {
|
||||
$count += $queue->$method();
|
||||
try {
|
||||
$count += $queue->$method();
|
||||
} catch ( JobQueueError $e ) {
|
||||
wfDebugLog( 'exception', $e->getLogMessage() );
|
||||
}
|
||||
}
|
||||
|
||||
$this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
|
||||
|
|
@ -244,7 +252,13 @@ class JobQueueFederated extends JobQueue {
|
|||
// Insert the de-duplicated jobs into the queues...
|
||||
foreach ( $uJobsByPartition as $partition => $jobBatch ) {
|
||||
$queue = $this->partitionQueues[$partition];
|
||||
if ( $queue->doBatchPush( $jobBatch, $flags ) ) {
|
||||
try {
|
||||
$ok = $queue->doBatchPush( $jobBatch, $flags );
|
||||
} catch ( JobQueueError $e ) {
|
||||
$ok = false;
|
||||
wfDebugLog( 'exception', $e->getLogMessage() );
|
||||
}
|
||||
if ( $ok ) {
|
||||
$key = $this->getCacheKey( 'empty' );
|
||||
$this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
|
||||
} else {
|
||||
|
|
@ -259,7 +273,13 @@ class JobQueueFederated extends JobQueue {
|
|||
$jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
|
||||
} else {
|
||||
$queue = $this->partitionQueues[$partition];
|
||||
if ( $queue->doBatchPush( $jobBatch, $flags ) ) {
|
||||
try {
|
||||
$ok = $queue->doBatchPush( $jobBatch, $flags );
|
||||
} catch ( JobQueueError $e ) {
|
||||
$ok = false;
|
||||
wfDebugLog( 'exception', $e->getLogMessage() );
|
||||
}
|
||||
if ( $ok ) {
|
||||
$key = $this->getCacheKey( 'empty' );
|
||||
$this->cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
|
||||
} else {
|
||||
|
|
@ -288,7 +308,12 @@ class JobQueueFederated extends JobQueue {
|
|||
break; // all partitions at 0 weight
|
||||
}
|
||||
$queue = $this->partitionQueues[$partition];
|
||||
$job = $queue->pop();
|
||||
try {
|
||||
$job = $queue->pop();
|
||||
} catch ( JobQueueError $e ) {
|
||||
$job = false;
|
||||
wfDebugLog( 'exception', $e->getLogMessage() );
|
||||
}
|
||||
if ( $job ) {
|
||||
$job->metadata['QueuePartition'] = $partition;
|
||||
return $job;
|
||||
|
|
@ -336,13 +361,21 @@ class JobQueueFederated extends JobQueue {
|
|||
|
||||
protected function doDelete() {
|
||||
foreach ( $this->partitionQueues as $queue ) {
|
||||
$queue->doDelete();
|
||||
try {
|
||||
$queue->doDelete();
|
||||
} catch ( JobQueueError $e ) {
|
||||
wfDebugLog( 'exception', $e->getLogMessage() );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected function doWaitForBackups() {
|
||||
foreach ( $this->partitionQueues as $queue ) {
|
||||
$queue->waitForBackups();
|
||||
try {
|
||||
$queue->waitForBackups();
|
||||
} catch ( JobQueueError $e ) {
|
||||
wfDebugLog( 'exception', $e->getLogMessage() );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -310,9 +310,13 @@ class JobQueueGroup {
|
|||
} elseif ( !isset( $lastRuns[$type][$task] )
|
||||
|| $lastRuns[$type][$task] < ( time() - $definition['period'] ) )
|
||||
{
|
||||
if ( call_user_func( $definition['callback'] ) !== null ) {
|
||||
$tasksRun[$type][$task] = time();
|
||||
++$count;
|
||||
try {
|
||||
if ( call_user_func( $definition['callback'] ) !== null ) {
|
||||
$tasksRun[$type][$task] = time();
|
||||
++$count;
|
||||
}
|
||||
} catch ( JobQueueError $e ) {
|
||||
wfDebugLog( 'exception', $e->getLogMessage() );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -786,7 +786,7 @@ LUA;
|
|||
protected function getConnection() {
|
||||
$conn = $this->redisPool->getConnection( $this->server );
|
||||
if ( !$conn ) {
|
||||
throw new MWException( "Unable to connect to redis server." );
|
||||
throw new JobQueueConnectionError( "Unable to connect to redis server." );
|
||||
}
|
||||
return $conn;
|
||||
}
|
||||
|
|
@ -799,7 +799,7 @@ LUA;
|
|||
*/
|
||||
protected function throwRedisException( $server, RedisConnRef $conn, $e ) {
|
||||
$this->redisPool->handleException( $server, $conn, $e );
|
||||
throw new MWException( "Redis server error: {$e->getMessage()}\n" );
|
||||
throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" );
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
Loading…
Reference in a new issue