[JobQueue] Improved job recycle rate for small queues.

* Make the recycling a bit more periodic rather than based on
  how often pop() gets called essentially. This works better if
  a queue does not have jobs inserted very often.

Change-Id: I64fbc8afbb1cf096717ba4bfc6fe7b7715abdb72
This commit is contained in:
Aaron Schulz 2013-01-11 16:13:29 -08:00
parent d1f8c02d06
commit 0ca0e1296c
5 changed files with 132 additions and 9 deletions

View file

@ -324,4 +324,32 @@ abstract class JobQueue {
* @return void
*/
protected function doWaitForBackups() {}
/**
* Return a map of task names to task definition maps.
* A "task" is a fast periodic queue maintenance action.
* Mutually exclusive tasks must implement their own locking in the callback.
*
* Each task value is an associative array with:
* - name : the name of the task
* - callback : a PHP callable that performs the task
* - period : the period in seconds corresponding to the task frequency
*
* @return Array
*/
final public function getPeriodicTasks() {
$tasks = $this->doGetPeriodicTasks();
foreach ( $tasks as $name => &$def ) {
$def['name'] = $name;
}
return $tasks;
}
/**
* @see JobQueue::getPeriodicTasks()
* @return Array
*/
protected function doGetPeriodicTasks() {
return array();
}
}

View file

@ -28,6 +28,7 @@
* @since 1.21
*/
class JobQueueDB extends JobQueue {
const ROOTJOB_TTL = 1209600; // integer; seconds to remember root jobs (14 days)
const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
@ -215,10 +216,6 @@ class JobQueueDB extends JobQueue {
$uuid = wfRandomString( 32 ); // pop attempt
$job = false; // job popped off
// Occasionally recycle jobs back into the queue that have been claimed too long
if ( mt_rand( 0, 99 ) == 0 ) {
$this->recycleStaleJobs();
}
do { // retry when our row is invalid or deleted as a duplicate
// Try to reserve a row in the DB...
if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) {
@ -401,10 +398,10 @@ class JobQueueDB extends JobQueue {
*
* @return integer Number of jobs recycled/deleted
*/
protected function recycleStaleJobs() {
public function recycleAndDeleteStaleJobs() {
global $wgMemc;
$now = time();
$now = time();
list( $dbw, $scope ) = $this->getMasterDB();
$count = 0; // affected rows
@ -519,7 +516,7 @@ class JobQueueDB extends JobQueue {
}
// Update the timestamp of the last root job started at the location...
return $wgMemc->set( $key, $params['rootJobTimestamp'], 14*86400 ); // 2 weeks
return $wgMemc->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
} );
return true;
@ -557,6 +554,18 @@ class JobQueueDB extends JobQueue {
wfWaitForSlaves();
}
/**
* @return Array
*/
protected function doGetPeriodicTasks() {
return array(
'recycleAndDeleteStaleJobs' => array(
'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
'period' => ceil( $this->claimTTL / 2 )
)
);
}
/**
* @return Array (DatabaseBase, ScopedCallback)
*/

View file

@ -228,4 +228,58 @@ class JobQueueGroup {
}
return $types;
}
/**
* Execute any due periodic queue maintenance tasks for all queues.
*
* A task is "due" if the time ellapsed since the last run is greater than
* the defined run period. Concurrent calls to this function will cause tasks
* to be attempted twice, so they may need their own methods of mutual exclusion.
*
* @return integer Number of tasks run
*/
public function executeReadyPeriodicTasks() {
global $wgMemc;
list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
$key = wfForeignMemcKey( $db, $prefix, 'jobqueuegroup', 'taskruns', 'v1' );
$lastRuns = $wgMemc->get( $key ); // (queue => task => UNIX timestamp)
$count = 0;
$tasksRun = array(); // (queue => task => UNIX timestamp)
foreach ( $this->getQueueTypes() as $type ) {
$queue = $this->get( $type );
foreach ( $queue->getPeriodicTasks() as $task => $definition ) {
if ( $definition['period'] <= 0 ) {
continue; // disabled
} elseif ( !isset( $lastRuns[$type][$task] )
|| $lastRuns[$type][$task] < ( time() - $definition['period'] ) )
{
if ( call_user_func( $definition['callback'] ) !== null ) {
$tasksRun[$type][$task] = time();
++$count;
}
}
}
}
$wgMemc->merge( $key, function( $cache, $key, $lastRuns ) use ( $tasksRun ) {
if ( is_array( $lastRuns ) ) {
foreach ( $tasksRun as $type => $tasks ) {
foreach ( $tasks as $task => $timestamp ) {
if ( !isset( $lastRuns[$type][$task] )
|| $timestamp > $lastRuns[$type][$task] )
{
$lastRuns[$type][$task] = $timestamp;
}
}
}
} else {
$lastRuns = $tasksRun;
}
return $lastRuns;
} );
return $count;
}
}

View file

@ -48,6 +48,9 @@ class nextJobDB extends Maintenance {
$types = JobQueueGroup::singleton()->getDefaultQueueTypes();
}
// Handle any required periodic queue maintenance
$this->executeReadyPeriodicTasks();
$memcKey = 'jobqueue:dbs:v3';
$pendingDbInfo = $wgMemc->get( $memcKey );
@ -78,6 +81,7 @@ class nextJobDB extends Maintenance {
return; // no DBs with jobs or cache is both empty and locked
}
$type = $this->getOption( 'type', false );
$pendingDBs = $pendingDbInfo['pendingDBs']; // convenience
do {
$again = false;
@ -156,14 +160,36 @@ class nextJobDB extends Maintenance {
$pendingDBs = array(); // (job type => (db list))
foreach ( $wgLocalDatabases as $db ) {
$types = JobQueueGroup::singleton( $db )->getQueuesWithJobs();
foreach ( $types as $type ) {
foreach ( JobQueueGroup::singleton( $db )->getQueuesWithJobs() as $type ) {
$pendingDBs[$type][] = $db;
}
}
return $pendingDBs;
}
/**
* Do all ready periodic jobs for all databases every 5 minutes (and .1% of the time)
* @return integer
*/
private function executeReadyPeriodicTasks() {
global $wgLocalDatabases, $wgMemc;
$count = 0;
$memcKey = 'jobqueue:periodic:lasttime';
$timestamp = (int)$wgMemc->get( $memcKey ); // UNIX timestamp or 0
if ( ( time() - $timestamp ) > 300 || mt_rand( 0, 999 ) == 0 ) { // 5 minutes
if ( $wgMemc->add( "$memcKey:rebuild", 1, 1800 ) ) { // lock
foreach ( $wgLocalDatabases as $db ) {
$count += JobQueueGroup::singleton( $db )->executeReadyPeriodicTasks();
}
$wgMemc->set( $memcKey, time() );
$wgMemc->delete( "$memcKey:rebuild" ); // unlock
}
}
return $count;
}
}
$maintClass = "nextJobDb";

View file

@ -76,6 +76,12 @@ class RunJobs extends Maintenance {
$n = 0;
$group = JobQueueGroup::singleton();
// Handle any required periodic queue maintenance
$count = $group->executeReadyPeriodicTasks();
if ( $count > 0 ) {
$this->runJobsLog( "Executed $count periodic queue task(s)." );
}
do {
$job = ( $type === false )
? $group->pop( JobQueueGroup::TYPE_DEFAULT, JobQueueGroup::USE_CACHE )