[JobQueue] Cleanups for JobQueueRedis.
* Cleaned up 'server' option to not fragment the pool. Also made it actually match the documentation. * Made it use doGetPeriodicTasks() for job recycling. * Made it so that other job queue classes can be tested. * Renamed "redisConf" => "redisConfig". * Tweaked comments about the "random" order option. Change-Id: I7823d90010e6bc9d581435c3be92830c5ba68480
This commit is contained in:
parent
04e0d75f86
commit
9cc7e6a39a
5 changed files with 91 additions and 51 deletions
|
|
@ -62,6 +62,8 @@ abstract class JobQueue {
|
||||||
* by timestamp, allowing for some jobs to be popped off out of order.
|
* by timestamp, allowing for some jobs to be popped off out of order.
|
||||||
* If "random" is used, pop() will pick jobs in random order. This might be
|
* If "random" is used, pop() will pick jobs in random order. This might be
|
||||||
* useful for improving concurrency depending on the queue storage medium.
|
* useful for improving concurrency depending on the queue storage medium.
|
||||||
|
* Note that "random" really means "don't care", so it may actually be FIFO
|
||||||
|
* or only weakly random (e.g. pop() takes one of the first X jobs randomly).
|
||||||
* - claimTTL : If supported, the queue will recycle jobs that have been popped
|
* - claimTTL : If supported, the queue will recycle jobs that have been popped
|
||||||
* but not acknowledged as completed after this many seconds. Recycling
|
* but not acknowledged as completed after this many seconds. Recycling
|
||||||
* of jobs simple means re-inserting them into the queue. Jobs can be
|
* of jobs simple means re-inserting them into the queue. Jobs can be
|
||||||
|
|
@ -371,4 +373,15 @@ abstract class JobQueue {
|
||||||
* @return void
|
* @return void
|
||||||
*/
|
*/
|
||||||
protected function doFlushCaches() {}
|
protected function doFlushCaches() {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Namespace the queue with a key to isolate it for testing
|
||||||
|
*
|
||||||
|
* @param $key string
|
||||||
|
* @return void
|
||||||
|
* @throws MWException
|
||||||
|
*/
|
||||||
|
public function setTestingPrefix( $key ) {
|
||||||
|
throw new MWException( "Queue namespacing not support for this queue type." );
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -36,18 +36,20 @@ class JobQueueRedis extends JobQueue {
|
||||||
const ROOTJOB_TTL = 1209600; // integer; seconds to remember root jobs (14 days)
|
const ROOTJOB_TTL = 1209600; // integer; seconds to remember root jobs (14 days)
|
||||||
const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days)
|
const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days)
|
||||||
|
|
||||||
|
protected $key; // string; key to prefix the queue keys with (used for testing)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @params include:
|
* @params include:
|
||||||
* - redisConf : An array of parameters to RedisConnectionPool::__construct().
|
* - redisConfig : An array of parameters to RedisConnectionPool::__construct().
|
||||||
* - server : A hostname/port combination or the absolute path of a UNIX socket.
|
* - redisServer : A hostname/port combination or the absolute path of a UNIX socket.
|
||||||
* If a hostname is specified but no port, the standard port number
|
* If a hostname is specified but no port, the standard port number
|
||||||
* 6379 will be used. Required.
|
* 6379 will be used. Required.
|
||||||
* @param array $params
|
* @param array $params
|
||||||
*/
|
*/
|
||||||
public function __construct( array $params ) {
|
public function __construct( array $params ) {
|
||||||
parent::__construct( $params );
|
parent::__construct( $params );
|
||||||
$this->server = $params['redisConf']['server'];
|
$this->server = $params['redisServer'];
|
||||||
$this->redisPool = RedisConnectionPool::singleton( $params['redisConf'] );
|
$this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -56,10 +58,6 @@ class JobQueueRedis extends JobQueue {
|
||||||
* @throws MWException
|
* @throws MWException
|
||||||
*/
|
*/
|
||||||
protected function doIsEmpty() {
|
protected function doIsEmpty() {
|
||||||
if ( mt_rand( 0, 99 ) == 0 ) {
|
|
||||||
$this->doInternalMaintenance();
|
|
||||||
}
|
|
||||||
|
|
||||||
$conn = $this->getConnection();
|
$conn = $this->getConnection();
|
||||||
try {
|
try {
|
||||||
return ( $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ) == 0 );
|
return ( $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ) == 0 );
|
||||||
|
|
@ -74,10 +72,6 @@ class JobQueueRedis extends JobQueue {
|
||||||
* @throws MWException
|
* @throws MWException
|
||||||
*/
|
*/
|
||||||
protected function doGetSize() {
|
protected function doGetSize() {
|
||||||
if ( mt_rand( 0, 99 ) == 0 ) {
|
|
||||||
$this->doInternalMaintenance();
|
|
||||||
}
|
|
||||||
|
|
||||||
$conn = $this->getConnection();
|
$conn = $this->getConnection();
|
||||||
try {
|
try {
|
||||||
return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) );
|
return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) );
|
||||||
|
|
@ -92,17 +86,12 @@ class JobQueueRedis extends JobQueue {
|
||||||
* @throws MWException
|
* @throws MWException
|
||||||
*/
|
*/
|
||||||
protected function doGetAcquiredCount() {
|
protected function doGetAcquiredCount() {
|
||||||
if ( mt_rand( 0, 99 ) == 0 ) {
|
if ( $this->claimTTL <= 0 ) {
|
||||||
$this->doInternalMaintenance();
|
return 0; // no acknowledgements
|
||||||
}
|
}
|
||||||
|
|
||||||
$conn = $this->getConnection();
|
$conn = $this->getConnection();
|
||||||
try {
|
try {
|
||||||
if ( $this->claimTTL > 0 ) {
|
return $conn->lSize( $this->getQueueKey( 'l-claimed' ) );
|
||||||
return $conn->lSize( $this->getQueueKey( 'l-claimed' ) );
|
|
||||||
} else {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
} catch ( RedisException $e ) {
|
} catch ( RedisException $e ) {
|
||||||
$this->throwRedisException( $this->server, $conn, $e );
|
$this->throwRedisException( $this->server, $conn, $e );
|
||||||
}
|
}
|
||||||
|
|
@ -190,8 +179,8 @@ class JobQueueRedis extends JobQueue {
|
||||||
protected function doPop() {
|
protected function doPop() {
|
||||||
$job = false;
|
$job = false;
|
||||||
|
|
||||||
if ( mt_rand( 0, 99 ) == 0 ) {
|
if ( $this->claimTTL <= 0 && mt_rand( 0, 99 ) == 0 ) {
|
||||||
$this->doInternalMaintenance();
|
$this->cleanupClaimedJobs(); // prune jobs and IDs from the "garbage" list
|
||||||
}
|
}
|
||||||
|
|
||||||
$conn = $this->getConnection();
|
$conn = $this->getConnection();
|
||||||
|
|
@ -340,25 +329,16 @@ class JobQueueRedis extends JobQueue {
|
||||||
return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
|
return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Do any job recycling or queue cleanup as needed
|
|
||||||
*
|
|
||||||
* @return void
|
|
||||||
* @return integer Number of jobs recycled/deleted
|
|
||||||
* @throws MWException
|
|
||||||
*/
|
|
||||||
protected function doInternalMaintenance() {
|
|
||||||
return ( $this->claimTTL > 0 ) ?
|
|
||||||
$this->recycleAndDeleteStaleJobs() : $this->cleanupClaimedJobs();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Recycle or destroy any jobs that have been claimed for too long
|
* Recycle or destroy any jobs that have been claimed for too long
|
||||||
*
|
*
|
||||||
* @return integer Number of jobs recycled/deleted
|
* @return integer Number of jobs recycled/deleted
|
||||||
* @throws MWException
|
* @throws MWException
|
||||||
*/
|
*/
|
||||||
protected function recycleAndDeleteStaleJobs() {
|
public function recycleAndDeleteStaleJobs() {
|
||||||
|
if ( $this->claimTTL <= 0 ) { // sanity
|
||||||
|
throw new MWException( "Cannot recycle jobs since acknowledgements are disabled." );
|
||||||
|
}
|
||||||
$count = 0;
|
$count = 0;
|
||||||
// For each job item that can be retried, we need to add it back to the
|
// For each job item that can be retried, we need to add it back to the
|
||||||
// main queue and remove it from the list of currenty claimed job items.
|
// main queue and remove it from the list of currenty claimed job items.
|
||||||
|
|
@ -488,6 +468,22 @@ class JobQueueRedis extends JobQueue {
|
||||||
return $count;
|
return $count;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Array
|
||||||
|
*/
|
||||||
|
protected function doGetPeriodicTasks() {
|
||||||
|
if ( $this->claimTTL > 0 ) {
|
||||||
|
return array(
|
||||||
|
'recycleAndDeleteStaleJobs' => array(
|
||||||
|
'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
|
||||||
|
'period' => ceil( $this->claimTTL / 2 )
|
||||||
|
)
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
return array();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param $job Job
|
* @param $job Job
|
||||||
* @return array
|
* @return array
|
||||||
|
|
@ -560,7 +556,11 @@ class JobQueueRedis extends JobQueue {
|
||||||
*/
|
*/
|
||||||
private function getQueueKey( $prop ) {
|
private function getQueueKey( $prop ) {
|
||||||
list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
|
list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
|
||||||
return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $prop );
|
if ( strlen( $this->key ) ) { // namespaced queue (for testing)
|
||||||
|
return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $this->key, $prop );
|
||||||
|
} else {
|
||||||
|
return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $prop );
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -606,4 +606,12 @@ class JobQueueRedis extends JobQueue {
|
||||||
}
|
}
|
||||||
return $res;
|
return $res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param $key string
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
public function setTestingPrefix( $key ) {
|
||||||
|
$this->key = $key;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ class MediaWikiPHPUnitCommand extends PHPUnit_TextUI_Command {
|
||||||
'file=' => false,
|
'file=' => false,
|
||||||
'use-filebackend=' => false,
|
'use-filebackend=' => false,
|
||||||
'use-bagostuff=' => false,
|
'use-bagostuff=' => false,
|
||||||
|
'use-jobqueue=' => false,
|
||||||
'keep-uploads' => false,
|
'keep-uploads' => false,
|
||||||
'use-normal-tables' => false,
|
'use-normal-tables' => false,
|
||||||
'reuse-db' => false,
|
'reuse-db' => false,
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,8 @@
|
||||||
* @group Database
|
* @group Database
|
||||||
*/
|
*/
|
||||||
class JobQueueTest extends MediaWikiTestCase {
|
class JobQueueTest extends MediaWikiTestCase {
|
||||||
|
protected $key;
|
||||||
|
protected $queueRand, $queueRandTTL, $queueFifo, $queueFifoTTL;
|
||||||
protected $old = array();
|
protected $old = array();
|
||||||
|
|
||||||
function __construct( $name = null, array $data = array(), $dataName = '' ) {
|
function __construct( $name = null, array $data = array(), $dataName = '' ) {
|
||||||
|
|
@ -15,18 +17,34 @@ class JobQueueTest extends MediaWikiTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected function setUp() {
|
protected function setUp() {
|
||||||
global $wgMemc;
|
global $wgMemc, $wgJobTypeConf;
|
||||||
parent::setUp();
|
parent::setUp();
|
||||||
$this->old['wgMemc'] = $wgMemc;
|
$this->old['wgMemc'] = $wgMemc;
|
||||||
$wgMemc = new HashBagOStuff();
|
$wgMemc = new HashBagOStuff();
|
||||||
$this->queueRand = JobQueue::factory( array( 'class' => 'JobQueueDB',
|
if ( $this->getCliArg( 'use-jobqueue=' ) ) {
|
||||||
'wiki' => wfWikiID(), 'type' => 'null', 'order' => 'random' ) );
|
$name = $this->getCliArg( 'use-jobqueue=' );
|
||||||
$this->queueRandTTL = JobQueue::factory( array( 'class' => 'JobQueueDB',
|
if ( !isset( $wgJobTypeConf[$name] ) ) {
|
||||||
'wiki' => wfWikiID(), 'type' => 'null', 'order' => 'random', 'claimTTL' => 10 ) );
|
throw new MWException( "No \$wgJobTypeConf entry for '$name'." );
|
||||||
$this->queueFifo = JobQueue::factory( array( 'class' => 'JobQueueDB',
|
}
|
||||||
'wiki' => wfWikiID(), 'type' => 'null', 'order' => 'fifo' ) );
|
$baseConfig = $wgJobTypeConf[$name];
|
||||||
$this->queueFifoTTL = JobQueue::factory( array( 'class' => 'JobQueueDB',
|
} else {
|
||||||
'wiki' => wfWikiID(), 'type' => 'null', 'order' => 'fifo', 'claimTTL' => 10 ) );
|
$baseConfig = array( 'class' => 'JobQueueDB' );
|
||||||
|
}
|
||||||
|
$baseConfig['type'] = 'null';
|
||||||
|
$baseConfig['wiki'] = wfWikiID();
|
||||||
|
$this->queueRand = JobQueue::factory(
|
||||||
|
array( 'order' => 'random', 'claimTTL' => 0 ) + $baseConfig );
|
||||||
|
$this->queueRandTTL = JobQueue::factory(
|
||||||
|
array( 'order' => 'random', 'claimTTL' => 10 ) + $baseConfig );
|
||||||
|
$this->queueFifo = JobQueue::factory(
|
||||||
|
array( 'order' => 'fifo', 'claimTTL' => 0 ) + $baseConfig );
|
||||||
|
$this->queueFifoTTL = JobQueue::factory(
|
||||||
|
array( 'order' => 'fifo', 'claimTTL' => 10 ) + $baseConfig );
|
||||||
|
if ( $baseConfig['class'] !== 'JobQueueDB' ) { // DB namespace with prefix or temp tables
|
||||||
|
foreach ( array( 'queueRand', 'queueRandTTL', 'queueFifo', 'queueFifoTTL' ) as $q ) {
|
||||||
|
$this->$q->setTestingPrefix( 'unittests-' . wfRandomString( 32 ) );
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected function tearDown() {
|
protected function tearDown() {
|
||||||
|
|
@ -239,7 +257,7 @@ class JobQueueTest extends MediaWikiTestCase {
|
||||||
$queue->ack( $job );
|
$queue->ack( $job );
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
|
$this->assertFalse( $queue->pop(), "Queue is not empty ($desc)" );
|
||||||
|
|
||||||
$queue->flushCaches();
|
$queue->flushCaches();
|
||||||
$this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
|
$this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue