[JobQueue] Cleanups for JobQueueRedis.

* Cleaned up 'server' option to not fragment the pool.
  Also made it actually match the documentation.
* Made it use doGetPeriodicTasks() for job recycling.
* Made it so that other job queue classes can be tested.
* Renamed "redisConf" => "redisConfig".
* Tweaked comments about the "random" order option.

Change-Id: I7823d90010e6bc9d581435c3be92830c5ba68480
This commit is contained in:
Aaron Schulz 2013-02-08 10:52:54 -08:00 committed by Gerrit Code Review
parent 04e0d75f86
commit 9cc7e6a39a
5 changed files with 91 additions and 51 deletions

View file

@ -62,6 +62,8 @@ abstract class JobQueue {
* by timestamp, allowing for some jobs to be popped off out of order. * by timestamp, allowing for some jobs to be popped off out of order.
* If "random" is used, pop() will pick jobs in random order. This might be * If "random" is used, pop() will pick jobs in random order. This might be
* useful for improving concurrency depending on the queue storage medium. * useful for improving concurrency depending on the queue storage medium.
* Note that "random" really means "don't care", so it may actually be FIFO
* or only weakly random (e.g. pop() takes one of the first X jobs randomly).
* - claimTTL : If supported, the queue will recycle jobs that have been popped * - claimTTL : If supported, the queue will recycle jobs that have been popped
* but not acknowledged as completed after this many seconds. Recycling * but not acknowledged as completed after this many seconds. Recycling
* of jobs simple means re-inserting them into the queue. Jobs can be * of jobs simple means re-inserting them into the queue. Jobs can be
@ -371,4 +373,15 @@ abstract class JobQueue {
* @return void * @return void
*/ */
protected function doFlushCaches() {} protected function doFlushCaches() {}
/**
* Namespace the queue with a key to isolate it for testing
*
* @param $key string
* @return void
* @throws MWException
*/
public function setTestingPrefix( $key ) {
throw new MWException( "Queue namespacing not support for this queue type." );
}
} }

View file

@ -36,18 +36,20 @@ class JobQueueRedis extends JobQueue {
const ROOTJOB_TTL = 1209600; // integer; seconds to remember root jobs (14 days) const ROOTJOB_TTL = 1209600; // integer; seconds to remember root jobs (14 days)
const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days) const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days)
protected $key; // string; key to prefix the queue keys with (used for testing)
/** /**
* @params include: * @params include:
* - redisConf : An array of parameters to RedisConnectionPool::__construct(). * - redisConfig : An array of parameters to RedisConnectionPool::__construct().
* - server : A hostname/port combination or the absolute path of a UNIX socket. * - redisServer : A hostname/port combination or the absolute path of a UNIX socket.
* If a hostname is specified but no port, the standard port number * If a hostname is specified but no port, the standard port number
* 6379 will be used. Required. * 6379 will be used. Required.
* @param array $params * @param array $params
*/ */
public function __construct( array $params ) { public function __construct( array $params ) {
parent::__construct( $params ); parent::__construct( $params );
$this->server = $params['redisConf']['server']; $this->server = $params['redisServer'];
$this->redisPool = RedisConnectionPool::singleton( $params['redisConf'] ); $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
} }
/** /**
@ -56,10 +58,6 @@ class JobQueueRedis extends JobQueue {
* @throws MWException * @throws MWException
*/ */
protected function doIsEmpty() { protected function doIsEmpty() {
if ( mt_rand( 0, 99 ) == 0 ) {
$this->doInternalMaintenance();
}
$conn = $this->getConnection(); $conn = $this->getConnection();
try { try {
return ( $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ) == 0 ); return ( $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ) == 0 );
@ -74,10 +72,6 @@ class JobQueueRedis extends JobQueue {
* @throws MWException * @throws MWException
*/ */
protected function doGetSize() { protected function doGetSize() {
if ( mt_rand( 0, 99 ) == 0 ) {
$this->doInternalMaintenance();
}
$conn = $this->getConnection(); $conn = $this->getConnection();
try { try {
return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ); return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) );
@ -92,17 +86,12 @@ class JobQueueRedis extends JobQueue {
* @throws MWException * @throws MWException
*/ */
protected function doGetAcquiredCount() { protected function doGetAcquiredCount() {
if ( mt_rand( 0, 99 ) == 0 ) { if ( $this->claimTTL <= 0 ) {
$this->doInternalMaintenance(); return 0; // no acknowledgements
} }
$conn = $this->getConnection(); $conn = $this->getConnection();
try { try {
if ( $this->claimTTL > 0 ) { return $conn->lSize( $this->getQueueKey( 'l-claimed' ) );
return $conn->lSize( $this->getQueueKey( 'l-claimed' ) );
} else {
return 0;
}
} catch ( RedisException $e ) { } catch ( RedisException $e ) {
$this->throwRedisException( $this->server, $conn, $e ); $this->throwRedisException( $this->server, $conn, $e );
} }
@ -190,8 +179,8 @@ class JobQueueRedis extends JobQueue {
protected function doPop() { protected function doPop() {
$job = false; $job = false;
if ( mt_rand( 0, 99 ) == 0 ) { if ( $this->claimTTL <= 0 && mt_rand( 0, 99 ) == 0 ) {
$this->doInternalMaintenance(); $this->cleanupClaimedJobs(); // prune jobs and IDs from the "garbage" list
} }
$conn = $this->getConnection(); $conn = $this->getConnection();
@ -340,25 +329,16 @@ class JobQueueRedis extends JobQueue {
return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
} }
/**
* Do any job recycling or queue cleanup as needed
*
* @return void
* @return integer Number of jobs recycled/deleted
* @throws MWException
*/
protected function doInternalMaintenance() {
return ( $this->claimTTL > 0 ) ?
$this->recycleAndDeleteStaleJobs() : $this->cleanupClaimedJobs();
}
/** /**
* Recycle or destroy any jobs that have been claimed for too long * Recycle or destroy any jobs that have been claimed for too long
* *
* @return integer Number of jobs recycled/deleted * @return integer Number of jobs recycled/deleted
* @throws MWException * @throws MWException
*/ */
protected function recycleAndDeleteStaleJobs() { public function recycleAndDeleteStaleJobs() {
if ( $this->claimTTL <= 0 ) { // sanity
throw new MWException( "Cannot recycle jobs since acknowledgements are disabled." );
}
$count = 0; $count = 0;
// For each job item that can be retried, we need to add it back to the // For each job item that can be retried, we need to add it back to the
// main queue and remove it from the list of currenty claimed job items. // main queue and remove it from the list of currenty claimed job items.
@ -488,6 +468,22 @@ class JobQueueRedis extends JobQueue {
return $count; return $count;
} }
/**
* @return Array
*/
protected function doGetPeriodicTasks() {
if ( $this->claimTTL > 0 ) {
return array(
'recycleAndDeleteStaleJobs' => array(
'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
'period' => ceil( $this->claimTTL / 2 )
)
);
} else {
return array();
}
}
/** /**
* @param $job Job * @param $job Job
* @return array * @return array
@ -560,7 +556,11 @@ class JobQueueRedis extends JobQueue {
*/ */
private function getQueueKey( $prop ) { private function getQueueKey( $prop ) {
list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $prop ); if ( strlen( $this->key ) ) { // namespaced queue (for testing)
return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $this->key, $prop );
} else {
return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $prop );
}
} }
/** /**
@ -606,4 +606,12 @@ class JobQueueRedis extends JobQueue {
} }
return $res; return $res;
} }
/**
* @param $key string
* @return void
*/
public function setTestingPrefix( $key ) {
$this->key = $key;
}
} }

View file

@ -7,6 +7,7 @@ class MediaWikiPHPUnitCommand extends PHPUnit_TextUI_Command {
'file=' => false, 'file=' => false,
'use-filebackend=' => false, 'use-filebackend=' => false,
'use-bagostuff=' => false, 'use-bagostuff=' => false,
'use-jobqueue=' => false,
'keep-uploads' => false, 'keep-uploads' => false,
'use-normal-tables' => false, 'use-normal-tables' => false,
'reuse-db' => false, 'reuse-db' => false,

View file

@ -6,6 +6,8 @@
* @group Database * @group Database
*/ */
class JobQueueTest extends MediaWikiTestCase { class JobQueueTest extends MediaWikiTestCase {
protected $key;
protected $queueRand, $queueRandTTL, $queueFifo, $queueFifoTTL;
protected $old = array(); protected $old = array();
function __construct( $name = null, array $data = array(), $dataName = '' ) { function __construct( $name = null, array $data = array(), $dataName = '' ) {
@ -15,18 +17,34 @@ class JobQueueTest extends MediaWikiTestCase {
} }
protected function setUp() { protected function setUp() {
global $wgMemc; global $wgMemc, $wgJobTypeConf;
parent::setUp(); parent::setUp();
$this->old['wgMemc'] = $wgMemc; $this->old['wgMemc'] = $wgMemc;
$wgMemc = new HashBagOStuff(); $wgMemc = new HashBagOStuff();
$this->queueRand = JobQueue::factory( array( 'class' => 'JobQueueDB', if ( $this->getCliArg( 'use-jobqueue=' ) ) {
'wiki' => wfWikiID(), 'type' => 'null', 'order' => 'random' ) ); $name = $this->getCliArg( 'use-jobqueue=' );
$this->queueRandTTL = JobQueue::factory( array( 'class' => 'JobQueueDB', if ( !isset( $wgJobTypeConf[$name] ) ) {
'wiki' => wfWikiID(), 'type' => 'null', 'order' => 'random', 'claimTTL' => 10 ) ); throw new MWException( "No \$wgJobTypeConf entry for '$name'." );
$this->queueFifo = JobQueue::factory( array( 'class' => 'JobQueueDB', }
'wiki' => wfWikiID(), 'type' => 'null', 'order' => 'fifo' ) ); $baseConfig = $wgJobTypeConf[$name];
$this->queueFifoTTL = JobQueue::factory( array( 'class' => 'JobQueueDB', } else {
'wiki' => wfWikiID(), 'type' => 'null', 'order' => 'fifo', 'claimTTL' => 10 ) ); $baseConfig = array( 'class' => 'JobQueueDB' );
}
$baseConfig['type'] = 'null';
$baseConfig['wiki'] = wfWikiID();
$this->queueRand = JobQueue::factory(
array( 'order' => 'random', 'claimTTL' => 0 ) + $baseConfig );
$this->queueRandTTL = JobQueue::factory(
array( 'order' => 'random', 'claimTTL' => 10 ) + $baseConfig );
$this->queueFifo = JobQueue::factory(
array( 'order' => 'fifo', 'claimTTL' => 0 ) + $baseConfig );
$this->queueFifoTTL = JobQueue::factory(
array( 'order' => 'fifo', 'claimTTL' => 10 ) + $baseConfig );
if ( $baseConfig['class'] !== 'JobQueueDB' ) { // DB namespace with prefix or temp tables
foreach ( array( 'queueRand', 'queueRandTTL', 'queueFifo', 'queueFifoTTL' ) as $q ) {
$this->$q->setTestingPrefix( 'unittests-' . wfRandomString( 32 ) );
}
}
} }
protected function tearDown() { protected function tearDown() {
@ -239,7 +257,7 @@ class JobQueueTest extends MediaWikiTestCase {
$queue->ack( $job ); $queue->ack( $job );
} }
$this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" ); $this->assertFalse( $queue->pop(), "Queue is not empty ($desc)" );
$queue->flushCaches(); $queue->flushCaches();
$this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" ); $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );